> ## Documentation Index
> Fetch the complete documentation index at: https://docs.popsink.com/llms.txt
> Use this file to discover all available pages before exploring further.

# On prem

## 📋 Table of Contents

1. [Quick Start](#quick-start)
2. [Prerequisites](#prerequisites)
3. [API Overview](#api-overview)
4. [Authentication Flow](#authentication-flow)
5. [Step-by-Step Tutorial](#step-by-step-tutorial)
   * [1. User Registration](#step-1-user-registration)
   * [2. User Login](#step-2-user-login)
   * [3. Environment Setup](#step-3-environment-setup)
   * [4. Team Creation](#step-4-team-creation)
   * [5. Team Member Management](#step-5-team-member-management)
   * [6. Pipeline Creation](#step-6-pipeline-creation)
   * [7. Pipeline Updates](#step-7-pipeline-updates)
   * [8. Pipeline Control](#step-8-pipeline-control)
6. [Testing Tools](#testing-tools)
7. [API Reference](#api-reference)
8. [Connector Configurations](#connector-configurations)
9. [Common Patterns](#common-patterns)
10. [Troubleshooting](#troubleshooting)

***

## 🚀 Quick Start

**For the impatient:** Here's the minimum you need to know:

```bash theme={null}
# 1. Create a user
curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{"email":"user@example.com","password":"secure123","is_active":true}' -k

# 2. Login to get your token
curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=user@example.com&password=secure123' -k

# 3. Use the token in all subsequent requests
curl -X GET "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer YOUR_TOKEN_HERE" -k
```

***

## ✅ Prerequisites

### What You Need

* **Access to a Popsink server** (e.g., `https://popsink.your-company.com`)
* **A terminal or API client** (curl, Postman, HTTPie, etc.)
* **5-15 minutes** to complete this guide

### Basic Concepts

| Concept          | Description                                         |
| ---------------- | --------------------------------------------------- |
| **Environment**  | A workspace that contains teams and their resources |
| **Team**         | A group of users working together on pipelines      |
| **Pipeline**     | A data processing flow from source to target        |
| **Connector**    | A connection to a data source or destination        |
| **Subscription** | Configuration for how data flows through a pipeline |
| **DataModel**    | Schema and configuration for data transformation    |

***

## 🔍 API Overview

### Base URL Structure

```
https://your-server/api/
```

### Authentication

All API endpoints (except registration and login) require a **Bearer token** in the `Authorization` header:

```
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
```

### Response Format

All responses are in **JSON** format:

```json theme={null}
{
  "id": "uuid-here",
  "name": "resource-name",
  "created_at": "2025-11-19T10:00:00Z"
}
```

### HTTP Status Codes

| Code  | Meaning          | When You'll See It                      |
| ----- | ---------------- | --------------------------------------- |
| `200` | Success          | GET, PATCH operations                   |
| `201` | Created          | POST operations that create resources   |
| `202` | Accepted         | Async operations (start/pause pipeline) |
| `204` | No Content       | DELETE operations                       |
| `400` | Bad Request      | Invalid input data                      |
| `401` | Unauthorized     | Missing or invalid token                |
| `403` | Forbidden        | Insufficient permissions                |
| `404` | Not Found        | Resource doesn't exist                  |
| `422` | Validation Error | Data doesn't meet requirements          |

***

## 🔐 Authentication Flow

```
┌─────────────┐
│   Register  │  POST /auth/register
└──────┬──────┘
       │
       ▼
┌─────────────┐
│    Login    │  POST /auth/jwt/login
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  Get Token  │  Save this token!
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  Use Token  │  All API requests
└─────────────┘
```

***

## 📖 Step-by-Step Tutorial

### Step 1: User Registration

**What:** Create your user account
**When:** First time using the API
**Required:** Email and password

#### Request

```bash theme={null}
curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{
    "email": "john.doe@company.com",
    "password": "MySecurePassword123!",
    "is_active": true,
    "is_superuser": false,
    "is_verified": false
  }' -k
```

#### Field Descriptions

| Field          | Type    | Required | Description                                       |
| -------------- | ------- | -------- | ------------------------------------------------- |
| `email`        | string  | ✅        | Your email address (must be unique)               |
| `password`     | string  | ✅        | Your password (min 8 characters recommended)      |
| `is_active`    | boolean | ✅        | Set to `true` to activate the account immediately |
| `is_superuser` | boolean | ⬜        | Admin privileges (default: `false`)               |
| `is_verified`  | boolean | ⬜        | Email verification status (default: `false`)      |

#### Response (201 Created)

```json theme={null}
{
  "id": "3b0c298a-062e-42d1-a01b-277fa66908da",
  "email": "john.doe@company.com",
  "is_active": true,
  "is_superuser": false,
  "is_verified": false,
  "active_env_id": null
}
```

#### 📝 Save This

```
USER_ID=3b0c298a-062e-42d1-a01b-277fa66908da
EMAIL=john.doe@company.com
PASSWORD=MySecurePassword123!
```

***

### Step 2: User Login

**What:** Authenticate and get your access token
**When:** Before making any authenticated API calls
**Token Lifetime:** Configurable (typically 24 hours)

#### Request

```bash theme={null}
curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=john.doe@company.com&password=MySecurePassword123!' -k
```

#### Response (200 OK)

```json theme={null}
{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIzYjBjMjk4YS0wNjJlLTQyZDEtYTAxYi0yNzdmYTY2OTA4ZGEiLCJhdWQiOlsiZmFzdGFwaS11c2VyczphdXRoIl0sImV4cCI6MTcwMDAwMDAwMH0.signature",
  "token_type": "bearer"
}
```

#### 📝 Save This

```
TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
```

#### Using Your Token

From now on, include this header in **every request**:

```bash theme={null}
-H "Authorization: Bearer $TOKEN"
```

***

### Step 3: Environment Setup

**What:** Create a workspace for your teams and pipelines
**When:** After logging in, before creating teams
**Permissions:** Any authenticated user can create an environment

#### Request

```bash theme={null}
curl -X POST "https://your-server/api/envs/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "production-environment",
    "description": "Production data processing environment",
    "use_retention": true
  }' -k
```

#### Field Descriptions

| Field           | Type    | Required | Description                                       |
| --------------- | ------- | -------- | ------------------------------------------------- |
| `name`          | string  | ✅        | Environment name (must be unique)                 |
| `description`   | string  | ⬜        | Description of the environment's purpose          |
| `use_retention` | boolean | ⬜        | Enable data retention policies (default: `false`) |

#### Response (201 Created)

```json theme={null}
{
  "id": "d455a981-be17-455e-a03c-4d9c3224db72",
  "name": "production-environment",
  "description": "Production data processing environment",
  "use_retention": true,
  "retention_configuration": null
}
```

#### 📝 Save This

```
ENV_ID=d455a981-be17-455e-a03c-4d9c3224db72
```

***

### Step 4: Team Creation

**What:** Create a team within your environment
**When:** After creating an environment
**Ownership:** The creator automatically becomes a team owner

#### Request

```bash theme={null}
curl -X POST "https://your-server/api/teams/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Data Engineering Team",
    "description": "Team responsible for ETL pipelines"
  }' -k
```

#### Field Descriptions

| Field         | Type   | Required | Description                              |
| ------------- | ------ | -------- | ---------------------------------------- |
| `name`        | string | ✅        | Team name (unique within environment)    |
| `description` | string | ⬜        | Purpose and responsibilities of the team |

#### Response (201 Created)

```json theme={null}
{
  "id": "881e0fce-5caa-4f5c-82c6-5eb883c47100",
  "name": "Data Engineering Team",
  "description": "Team responsible for ETL pipelines",
  "env_id": "d455a981-be17-455e-a03c-4d9c3224db72"
}
```

#### 📝 Save This

```
TEAM_ID=881e0fce-5caa-4f5c-82c6-5eb883c47100
```

***

### Step 5: Team Member Management

**What:** Add users to your team
**When:** After creating a team
**Permissions:** Only team owners can add members

#### 5.1: Create Additional Users (Optional)

If you need to invite teammates, first create their accounts:

```bash theme={null}
curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{
    "email": "jane.smith@company.com",
    "password": "SecurePass456!",
    "is_active": true
  }' -k
```

**Response:**

```json theme={null}
{
  "id": "32d4b09d-4527-40d5-923b-e166d85cb459",
  "email": "jane.smith@company.com",
  "is_active": true
}
```

#### 5.2: Add Members to Team

```bash theme={null}
curl -X POST "https://your-server/api/teams/$TEAM_ID/members/bulk" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "members": [
      "32d4b09d-4527-40d5-923b-e166d85cb459"
    ]
  }' -k
```

#### Response (204 No Content)

Success! The members have been added to your team.

#### 5.3: List Team Members

```bash theme={null}
curl -X GET "https://your-server/api/teams/$TEAM_ID/members" \
  -H "Authorization: Bearer $TOKEN" -k
```

**Response:**

```json theme={null}
[
  {
    "id": "3b0c298a-062e-42d1-a01b-277fa66908da",
    "email": "john.doe@company.com",
    "role": "owner"
  },
  {
    "id": "32d4b09d-4527-40d5-923b-e166d85cb459",
    "email": "jane.smith@company.com",
    "role": "member"
  }
]
```

***

### Step 6: Pipeline Creation

**What:** Create a data pipeline with source and target connectors
**When:** After team setup
**Permissions:** Team members with write access

#### 🎯 Understanding Pipeline Structure (V2 - Flattened)

The new API uses a **flattened structure** instead of nested `json_configuration`. You can:

1. **Use existing connectors** by specifying their IDs
2. **Create new connectors** by providing name, type, and config
3. **Mix both** (e.g., existing source + new target)

#### Option A: Create Pipeline with New Connectors

```bash theme={null}
curl -X POST "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-to-oracle-pipeline",
    "team_id": "'$TEAM_ID'",

    "source_name": "kafka-events-source",
    "source_type": "KAFKA_SOURCE",
    "source_config": {
      "topic": "user-events",
      "bootstrap_servers": "kafka.company.com:9092",
      "consumer_group_id": "pipeline-consumer",
      "sasl_username": "kafka_user",
      "sasl_password": "kafka_pass",
      "sasl_mechanism": "PLAIN",
      "security_protocol": "SASL_SSL"
    },

    "target_name": "oracle-warehouse",
    "target_type": "ORACLE_TARGET",
    "target_config": {
      "host": "oracle.company.com",
      "port": "1521",
      "database": "ORCL",
      "server_name": "XE",
      "server_id": "oraclesrv01",
      "user": "etl_user",
      "password": "etl_pass"
    },

    "subscription_target_table_name": "events",
    "subscription_backfill": false,
    "subscription_mapper_config": [
      {
        "key": "user_id",
        "path": "$.user_id",
        "cast": "string",
        "primary_key": true,
        "nullable": false
      },
      {
        "key": "event_type",
        "path": "$.event_type",
        "cast": "string",
        "primary_key": false,
        "nullable": false
      }
    ]
  }' -k
```

#### Option B: Create Pipeline with Existing Connectors

```bash theme={null}
curl -X POST "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "reuse-connectors-pipeline",
    "team_id": "'$TEAM_ID'",

    "existing_source_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
    "existing_target_id": "f6e5d4c3-b2a1-4c5d-9e8f-5d4c3b2a1f0e",

    "subscription_target_table_name": "events_copy",
    "subscription_backfill": true
  }' -k
```

#### Pipeline Configuration Fields

##### Core Fields

| Field     | Type   | Required | Description                                                                           |
| --------- | ------ | -------- | ------------------------------------------------------------------------------------- |
| `name`    | string | ✅        | Pipeline name (unique within team, alphanumeric with `-` and `_` only, max 255 chars) |
| `team_id` | UUID   | ✅        | Team that owns this pipeline                                                          |

##### Source Connector (Choose ONE approach)

**Approach 1: Use Existing Connector**

| Field                | Type | Description                        |
| -------------------- | ---- | ---------------------------------- |
| `existing_source_id` | UUID | ID of an existing source connector |

**Approach 2: Create New Connector**

| Field           | Type   | Description                                  |
| --------------- | ------ | -------------------------------------------- |
| `source_name`   | string | Name for the new source connector            |
| `source_type`   | string | Connector type: `KAFKA_SOURCE`               |
| `source_config` | object | Configuration specific to the connector type |

##### Target Connector (Choose ONE approach)

**Approach 1: Use Existing Connector**

| Field                | Type | Description                        |
| -------------------- | ---- | ---------------------------------- |
| `existing_target_id` | UUID | ID of an existing target connector |

**Approach 2: Create New Connector**

| Field           | Type   | Description                                     |
| --------------- | ------ | ----------------------------------------------- |
| `target_name`   | string | Name for the new target connector               |
| `target_type`   | string | Connector type: `KAFKA_TARGET`, `ORACLE_TARGET` |
| `target_config` | object | Configuration specific to the connector type    |

##### Data Model Configuration (Optional)

| Field                             | Type    | Description                       |
| --------------------------------- | ------- | --------------------------------- |
| `datamodel_source_topic`          | string  | Source topic for the data model   |
| `datamodel_target_topic`          | string  | Target topic for the data model   |
| `datamodel_error_table_enabled`   | boolean | Enable error table for data model |
| `datamodel_error_table_name`      | string  | Name of the error table           |
| `datamodel_error_table_target_id` | UUID    | Target connector for error table  |

##### Subscription Configuration (Optional)

| Field                                | Type    | Description                                            |
| ------------------------------------ | ------- | ------------------------------------------------------ |
| `subscription_target_table_name`     | string  | Target table name in the destination                   |
| `subscription_backfill`              | boolean | Whether to backfill historical data                    |
| `subscription_mapper_config`         | array   | Column mapping configuration (see Mapper Config below) |
| `subscription_consumer_id`           | string  | Consumer ID (auto-generated if not provided)           |
| `subscription_error_table_enabled`   | boolean | Enable error table for subscription                    |
| `subscription_error_table_name`      | string  | Name of the subscription error table                   |
| `subscription_error_table_target_id` | UUID    | Target connector for subscription errors               |

##### Mapper Configuration

Each mapper config entry:

| Field         | Type    | Description                                                             |
| ------------- | ------- | ----------------------------------------------------------------------- |
| `key`         | string  | Column name                                                             |
| `path`        | string  | JSON path to extract value (e.g., `$.field.nested`)                     |
| `static`      | string  | Static value (alternative to path)                                      |
| `cast`        | string  | Data type: `string`, `int`, `float`, `bool`, `date`, `datetime`, `time` |
| `cast_format` | string  | Format for casting (e.g., date format)                                  |
| `primary_key` | boolean | Is this a primary key column                                            |
| `nullable`    | boolean | Can this column be null                                                 |

#### Response (201 Created)

```json theme={null}
{
  "id": "5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820",
  "name": "kafka-to-oracle-pipeline",
  "state": "draft",
  "version": 1,
  "team_id": "881e0fce-5caa-4f5c-82c6-5eb883c47100",
  "team_name": "Data Engineering Team",

  "existing_source_id": null,
  "source_name": "kafka-events-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": { "topic": "user-events", ... },

  "existing_target_id": null,
  "target_name": "oracle-warehouse",
  "target_type": "ORACLE_TARGET",
  "target_config": { "host": "oracle.company.com", ... },

  "subscription_target_table_name": "events",
  "subscription_backfill": false,
  "subscription_mapper_config": [...],

  "created_at": "2025-11-19T10:30:00Z",
  "updated_at": "2025-11-19T10:30:00Z"
}
```

#### 📝 Save This

```
PIPELINE_ID=5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820
```

#### 🔍 Available Connector Types

**Sources:**

* `KAFKA_SOURCE` - Apache Kafka

**Targets:**

* `KAFKA_TARGET` - Apache Kafka
* `ORACLE_TARGET` - Oracle database

**Jobs:**

* `JOB_SMT` - Single Message Transform (for data transformations)

***

### Step 7: Pipeline Updates

**What:** Modify an existing pipeline
**When:** Need to change configuration or settings
**Permissions:** Team members with write access

#### Update Pipeline Configuration

```bash theme={null}
curl -X PATCH "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "updated-pipeline-name",
    "source_config": {
      "topic": "new-topic",
      "bootstrap_servers": "kafka.company.com:9092",
      "consumer_group_id": "updated-consumer"
    },
    "subscription_backfill": true
  }' -k
```

#### 💡 Update Patterns

**Pattern 1: Update Only Name**

```json theme={null}
{
  "name": "new-pipeline-name"
}
```

**Pattern 2: Switch to Existing Connector**

```json theme={null}
{
  "existing_source_id": "new-connector-uuid",
  "source_name": null,
  "source_type": null,
  "source_config": null
}
```

**Pattern 3: Update Subscription Config**

```json theme={null}
{
  "subscription_target_table_name": "new_table",
  "subscription_backfill": true,
  "subscription_mapper_config": [...]
}
```

#### Response (200 OK)

Returns the updated pipeline with all fields.

***

### Step 8: Pipeline Control

**What:** Start, pause, or check pipeline status
**When:** After pipeline creation and configuration
**States:** `draft` → `building` → `live` → `paused` / `error`

#### 8.1: Start Pipeline

```bash theme={null}
curl -X POST "https://your-server/api/pipelines/$PIPELINE_ID/start" \
  -H "Authorization: Bearer $TOKEN" -k
```

**Response (202 Accepted):**

```json theme={null}
"BUILDING"
```

The pipeline transitions through these states:

1. `draft` - Initial state, configuration in progress
2. `building` - Pipeline is being deployed
3. `live` - Pipeline is running and processing data

#### 8.2: Pause Pipeline

```bash theme={null}
curl -X POST "https://your-server/api/pipelines/$PIPELINE_ID/pause" \
  -H "Authorization: Bearer $TOKEN" -k
```

**Response (202 Accepted):**

```json theme={null}
"PAUSED"
```

#### 8.3: Check Pipeline Status

```bash theme={null}
curl -X GET "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" -k
```

**Response (200 OK):**

```json theme={null}
{
  "id": "5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820",
  "name": "kafka-to-oracle-pipeline",
  "state": "live",
  "version": 1,
  "created_at": "2025-11-19T10:30:00Z",
  "updated_at": "2025-11-19T10:45:00Z"
}
```

#### 8.4: Get Pipeline Logs (WebSocket)

```javascript theme={null}
// WebSocket connection example
const ws = new WebSocket(
  `wss://your-server/api/pipelines/${PIPELINE_ID}/logs`,
  ['Authorization', `Bearer ${TOKEN}`]
);

ws.onmessage = (event) => {
  console.log('Log:', event.data);
};
```

#### Pipeline State Diagram

```
     ┌──────┐
     │DRAFT │ (initial state)
     └───┬──┘
         │ start
         ▼
   ┌─────────┐
   │BUILDING │ (deployment in progress)
   └────┬────┘
        │ (automatic)
        ▼
     ┌──────┐
     │ LIVE │ (processing data)
     └───┬──┘
         │
         │ pause
         ▼
    ┌────────┐
    │ PAUSED │
    └───┬────┘
        │
      resume
        │
        ▼
   ┌─────────┐
   │BUILDING │
   └─────────┘
```

***

## 🛠️ Testing Tools

### Option 1: cURL (Command Line)

**Pros:** Available everywhere, scriptable
**Cons:** Verbose, requires manual token management

```bash theme={null}
# Set variables for easy reuse
export SERVER="https://your-server"
export TOKEN="your-token-here"

# Make a request
curl -X GET "$SERVER/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" -k
```

### Option 2: HTTPie (Command Line - User Friendly)

**Pros:** Simpler syntax, better output
**Cons:** Requires installation

```bash theme={null}
# Install
pip install httpie

# Use
http GET https://your-server/api/pipelines/ \
  Authorization:"Bearer $TOKEN"
```

### Option 3: Postman (GUI)

**Pros:** Visual interface, request collections
**Cons:** Requires download

1. Download Postman
2. Create a new request
3. Set method (GET, POST, etc.)
4. Enter URL: `https://your-server/api/pipelines/`
5. Add header: `Authorization: Bearer YOUR_TOKEN`
6. Add JSON body for POST/PATCH
7. Click "Send"

### Option 4: Python Script

**Pros:** Full programming capabilities
**Cons:** Requires Python knowledge

```python theme={null}
import requests

# Configuration
SERVER = "https://your-server"
TOKEN = "your-token-here"

headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json"
}

# Get all pipelines
response = requests.get(
    f"{SERVER}/api/pipelines/",
    headers=headers,
    verify=False  # Only for self-signed certificates
)

print(response.json())

# Create a pipeline
pipeline_data = {
    "name": "my-pipeline",
    "team_id": "your-team-id",
    "source_name": "my-source",
    "source_type": "KAFKA_SOURCE",
    "source_config": {...}
}

response = requests.post(
    f"{SERVER}/api/pipelines/",
    headers=headers,
    json=pipeline_data,
    verify=False
)

print(response.json())
```

***

## 📚 API Reference

### Authentication Endpoints

| Method | Endpoint                | Description                |
| ------ | ----------------------- | -------------------------- |
| `POST` | `/auth/register`        | Create a new user account  |
| `POST` | `/auth/jwt/login`       | Login and get access token |
| `POST` | `/auth/jwt/logout`      | Logout (invalidate token)  |
| `POST` | `/auth/forgot-password` | Request password reset     |
| `POST` | `/auth/reset-password`  | Reset password with token  |

### User Endpoints

| Method   | Endpoint                          | Description                    |
| -------- | --------------------------------- | ------------------------------ |
| `GET`    | `/users`                          | List all users (paginated)     |
| `GET`    | `/users/{id}`                     | Get user details               |
| `PATCH`  | `/users/{id}`                     | Update user                    |
| `DELETE` | `/users/{id}`                     | Delete user                    |
| `POST`   | `/users/me/change-password`       | Change current user's password |
| `POST`   | `/users/me/service-account-token` | Create service account token   |
| `GET`    | `/users/export-all`               | Export all data (admin only)   |
| `POST`   | `/users/import-all`               | Import all data (admin only)   |

### Environment Endpoints

| Method   | Endpoint                       | Description                       |
| -------- | ------------------------------ | --------------------------------- |
| `GET`    | `/envs/`                       | List all environments (paginated) |
| `POST`   | `/envs/`                       | Create a new environment          |
| `GET`    | `/envs/{id}`                   | Get environment details           |
| `GET`    | `/envs/filter-one?name={name}` | Get environment by name           |
| `PATCH`  | `/envs/{id}`                   | Update environment                |
| `DELETE` | `/envs/{id}`                   | Delete environment                |
| `POST`   | `/envs/check-byok-credentials` | Check BYOK credentials            |

### Team Endpoints

| Method   | Endpoint                        | Description                |
| -------- | ------------------------------- | -------------------------- |
| `GET`    | `/teams/`                       | List all teams (paginated) |
| `POST`   | `/teams/`                       | Create a new team          |
| `GET`    | `/teams/{id}`                   | Get team details           |
| `GET`    | `/teams/filter-one?name={name}` | Get team by name           |
| `PATCH`  | `/teams/{id}`                   | Update team                |
| `DELETE` | `/teams/{id}`                   | Delete team                |

### Team Member Endpoints

| Method   | Endpoint                        | Description          |
| -------- | ------------------------------- | -------------------- |
| `GET`    | `/teams/{id}/members`           | List team members    |
| `POST`   | `/teams/{id}/members/bulk`      | Add multiple members |
| `DELETE` | `/teams/{id}/members/{user_id}` | Remove a member      |

### Pipeline Endpoints

| Method      | Endpoint                               | Description                          |
| ----------- | -------------------------------------- | ------------------------------------ |
| `GET`       | `/pipelines/`                          | List all pipelines (paginated)       |
| `GET`       | `/pipelines/count-status`              | Count pipelines by status            |
| `POST`      | `/pipelines/`                          | Create a new pipeline                |
| `GET`       | `/pipelines/{id}`                      | Get pipeline details                 |
| `GET`       | `/pipelines/filter-one?name={name}`    | Get pipeline by name                 |
| `PATCH`     | `/pipelines/{id}`                      | Update pipeline                      |
| `DELETE`    | `/pipelines/{id}`                      | Delete pipeline                      |
| `POST`      | `/pipelines/{id}/start`                | Start pipeline (202)                 |
| `POST`      | `/pipelines/{id}/pause`                | Pause pipeline (202)                 |
| `WebSocket` | `/pipelines/{id}/logs`                 | Stream pipeline logs                 |
| `GET`       | `/pipelines/{id}/status`               | Get pipeline status (deprecated)     |
| `GET`       | `/pipelines/{id}/configuration`        | Export pipeline config               |
| `POST`      | `/pipelines/{id}/configuration`        | Import pipeline config               |
| `GET`       | `/pipelines/{id}/worker-config`        | Get worker config                    |
| `GET`       | `/pipelines/status/all-non-draft`      | List non-draft pipelines with status |
| `GET`       | `/pipelines/{id}/latency-informations` | Get pipeline latency info            |

### Connector Endpoints

| Method      | Endpoint                                | Description                  |
| ----------- | --------------------------------------- | ---------------------------- |
| `GET`       | `/connectors/source-config`             | List source configurations   |
| `GET`       | `/connectors/target-config`             | List target configurations   |
| `GET`       | `/connectors/filter-one?name={name}`    | Get connector by name        |
| `GET`       | `/connectors/{id}`                      | Get connector by ID          |
| `GET`       | `/connectors/{id}/source-worker-config` | Get source worker config     |
| `GET`       | `/connectors/{id}/target-worker-config` | Get target worker config     |
| `POST`      | `/connectors/`                          | Create connector             |
| `PATCH`     | `/connectors/{id}`                      | Update connector             |
| `DELETE`    | `/connectors/{id}`                      | Delete connector             |
| `POST`      | `/connectors/{id}/start`                | Start connector worker (202) |
| `POST`      | `/connectors/{id}/stop`                 | Stop connector worker (202)  |
| `WebSocket` | `/connectors/{id}/logs`                 | Stream connector logs        |

### Subscription Endpoints

| Method   | Endpoint                       | Description                    |
| -------- | ------------------------------ | ------------------------------ |
| `GET`    | `/subscriptions/`              | List subscriptions (paginated) |
| `GET`    | `/subscriptions/{id}`          | Get subscription details       |
| `POST`   | `/subscriptions/`              | Create subscription            |
| `POST`   | `/subscriptions/subscribe-all` | Bulk create subscriptions      |
| `PATCH`  | `/subscriptions/{id}`          | Update subscription            |
| `DELETE` | `/subscriptions/{id}`          | Delete subscription            |
| `POST`   | `/subscriptions/{id}/start`    | Start subscription (202)       |
| `POST`   | `/subscriptions/{id}/pause`    | Pause subscription (202)       |

### DataModel Endpoints

| Method   | Endpoint                       | Description                 |
| -------- | ------------------------------ | --------------------------- |
| `GET`    | `/datamodels/`                 | List datamodels (paginated) |
| `GET`    | `/datamodels/{id}`             | Get datamodel by ID         |
| `GET`    | `/datamodels/{id}/monitoring`  | Get datamodel monitoring    |
| `GET`    | `/datamodels/{id}/schema`      | Get datamodel schema        |
| `PATCH`  | `/datamodels/{id}/error-table` | Update error table config   |
| `DELETE` | `/datamodels/{id}`             | Delete datamodel            |

### SMT/Transformation Endpoints

| Method | Endpoint              | Description                   |
| ------ | --------------------- | ----------------------------- |
| `POST` | `/smt/process_mapper` | Process mapper transformation |

### Connector Type Specific Endpoints

#### Kafka Source

| Method | Endpoint                                          | Description                    |
| ------ | ------------------------------------------------- | ------------------------------ |
| `POST` | `/connector-types/kafka-source/check-credentials` | Check Kafka source credentials |
| `POST` | `/connector-types/kafka-source/fetch-messages`    | Fetch messages from Kafka      |
| `POST` | `/connector-types/kafka-source/list-topics`       | List Kafka topics              |

#### Oracle Target

| Method | Endpoint                                           | Description                     |
| ------ | -------------------------------------------------- | ------------------------------- |
| `POST` | `/connector-types/oracle-target/check-credentials` | Check Oracle target credentials |

***

## 🔧 Connector Configurations

### KAFKA\_SOURCE Configuration

```json theme={null}
{
  "topic": "user-events",
  "consumer_group_id": "pipeline-consumer",
  "bootstrap_servers": "kafka1:9092,kafka2:9092",
  "security_protocol": "SASL_SSL",
  "sasl_mechanism": "PLAIN",
  "sasl_username": "kafka_user",
  "sasl_password": "kafka_pass",
  "include_metadata": false
}
```

**Field Descriptions:**

| Field               | Type    | Required | Description                                                        |
| ------------------- | ------- | -------- | ------------------------------------------------------------------ |
| `topic`             | string  | ✅        | Kafka topic to consume from                                        |
| `bootstrap_servers` | string  | ✅        | Comma-separated list of Kafka brokers                              |
| `consumer_group_id` | string  | ⬜        | Consumer group ID (auto-generated if not provided)                 |
| `security_protocol` | string  | ⬜        | `PLAINTEXT`, `SASL_PLAINTEXT`, `SASL_SSL`, `SSL`                   |
| `sasl_mechanism`    | string  | ⬜        | `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `GSSAPI`, `OAUTHBEARER` |
| `sasl_username`     | string  | ⬜        | SASL username (required if using SASL)                             |
| `sasl_password`     | string  | ⬜        | SASL password (required if using SASL)                             |
| `include_metadata`  | boolean | ⬜        | Include Kafka metadata in messages (default: false)                |

***

### KAFKA\_TARGET Configuration

```json theme={null}
{
  "bootstrap_server": "kafka1:9092",
  "security_protocol": "SASL_SSL",
  "sasl_mechanism": "PLAIN",
  "sasl_username": "kafka_user",
  "sasl_password": "kafka_pass",
  "ca_cert": "",
  "group_id": "consumer-group"
}
```

**Field Descriptions:**

| Field               | Type   | Required | Description                                                        |
| ------------------- | ------ | -------- | ------------------------------------------------------------------ |
| `bootstrap_server`  | string | ✅        | Kafka broker address                                               |
| `security_protocol` | string | ⬜        | `PLAINTEXT`, `SASL_PLAINTEXT`, `SASL_SSL`, `SSL`                   |
| `sasl_mechanism`    | string | ⬜        | `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `GSSAPI`, `OAUTHBEARER` |
| `sasl_username`     | string | ⬜        | SASL username                                                      |
| `sasl_password`     | string | ⬜        | SASL password                                                      |
| `ca_cert`           | string | ⬜        | CA certificate for SSL                                             |
| `group_id`          | string | ⬜        | Consumer group ID                                                  |

***

### ORACLE\_TARGET Configuration

```json theme={null}
{
  "database": "ORCL",
  "user": "etl_user",
  "password": "etl_pass",
  "host": "oracle.company.com",
  "port": "1521",
  "server_name": "XE",
  "server_id": "oraclesrv01"
}
```

**Field Descriptions:**

| Field         | Type   | Required | Description                    |
| ------------- | ------ | -------- | ------------------------------ |
| `database`    | string | ✅        | Oracle database name           |
| `user`        | string | ✅        | Username for Oracle            |
| `password`    | string | ✅        | Password for Oracle            |
| `host`        | string | ✅        | Hostname or IP address         |
| `port`        | string | ✅        | Port number (typically `1521`) |
| `server_name` | string | ✅        | Service name (e.g., `XE`)      |
| `server_id`   | string | ✅        | Server identifier/SID          |

***

**Mapper Column Config :**

| Field         | Type    | Description                                                           |
| ------------- | ------- | --------------------------------------------------------------------- |
| `key`         | string  | Column name in target                                                 |
| `path`        | string  | JSON path to extract value (e.g., `$.field.nested`)                   |
| `static`      | string  | Static value (alternative to path)                                    |
| `cast`        | string  | Cast to: `string`, `int`, `float`, `bool`, `date`, `datetime`, `time` |
| `cast_format` | string  | Format for casting (e.g., date format)                                |
| `primary_key` | boolean | Is this a primary key column                                          |
| `nullable`    | boolean | Can this column be null                                               |

***

## 🎯 Common Patterns

### Pattern 1: List Resources with Filters

```bash theme={null}
# Get all pipelines in a specific state
curl -X GET "https://your-server/api/pipelines/?state=live" \
  -H "Authorization: Bearer $TOKEN" -k

# Get pipelines for a specific team
curl -X GET "https://your-server/api/pipelines/?team_id=$TEAM_ID" \
  -H "Authorization: Bearer $TOKEN" -k

# Search pipelines by name
curl -X GET "https://your-server/api/pipelines/?search=kafka" \
  -H "Authorization: Bearer $TOKEN" -k
```

### Pattern 2: Pagination

```bash theme={null}
# Get first page (default 50 items)
curl -X GET "https://your-server/api/pipelines/?skip=0&limit=50" \
  -H "Authorization: Bearer $TOKEN" -k

# Get second page
curl -X GET "https://your-server/api/pipelines/?skip=50&limit=50" \
  -H "Authorization: Bearer $TOKEN" -k
```

### Pattern 3: Error Handling

```python theme={null}
import requests

try:
    response = requests.get(
        "https://your-server/api/pipelines/",
        headers={"Authorization": f"Bearer {TOKEN}"},
        verify=False
    )
    response.raise_for_status()  # Raises exception for 4xx/5xx
    data = response.json()
except requests.exceptions.HTTPError as e:
    print(f"HTTP Error: {e}")
    print(f"Response: {e.response.text}")
except requests.exceptions.ConnectionError as e:
    print(f"Connection Error: {e}")
except Exception as e:
    print(f"Error: {e}")
```

### Pattern 4: Bulk Operations

```python theme={null}
# Create multiple pipelines
pipeline_configs = [
    {"name": "pipeline-1", "team_id": TEAM_ID, ...},
    {"name": "pipeline-2", "team_id": TEAM_ID, ...},
    {"name": "pipeline-3", "team_id": TEAM_ID, ...}
]

for config in pipeline_configs:
    response = requests.post(
        f"{SERVER}/api/pipelines/",
        headers=headers,
        json=config
    )
    print(f"Created: {response.json()['id']}")
```

### Pattern 5: Test Connector Credentials Before Creating

```bash theme={null}
# Test Kafka source credentials
curl -X POST "https://your-server/api/connector-types/kafka-source/check-credentials" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "bootstrap_servers": "kafka:9092",
    "topic": "test-topic",
    "security_protocol": "PLAINTEXT"
  }' -k

# Test Oracle target credentials
curl -X POST "https://your-server/api/connector-types/oracle-target/check-credentials" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "host": "oracle.company.com",
    "port": "1521",
    "database": "ORCL",
    "user": "etl_user",
    "password": "etl_pass",
    "server_name": "XE",
    "server_id": "oraclesrv01"
  }' -k
```

***

## 🐛 Troubleshooting

### Issue: "401 Unauthorized"

**Cause:** Token is missing, expired, or invalid

**Solution:**

1. Check that you included the `Authorization` header
2. Verify the token format: `Bearer YOUR_TOKEN`
3. Log in again to get a fresh token

```bash theme={null}
curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=YOUR_EMAIL&password=YOUR_PASSWORD' -k
```

***

### Issue: "403 Forbidden"

**Cause:** You don't have permission for this operation

**Solution:**

1. Verify you're a member of the team
2. Check if you have the required role (owner vs member)
3. Contact the team owner to grant permissions

***

### Issue: "422 Validation Error"

**Cause:** Request data doesn't meet validation requirements

**Solution:**

1. Check the error response for specific field errors
2. Verify all required fields are provided
3. Ensure data types match (UUID, string, boolean, etc.)

**Example Error Response:**

```json theme={null}
{
  "detail": [
    {
      "loc": ["body", "source_name"],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}
```

**Fix:**

```json theme={null}
{
  "name": "my-pipeline",
  "team_id": "uuid-here",
  "source_name": "my-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": {...}
}
```

***

### Issue: Invalid pipeline name

**Cause:** Pipeline name contains invalid characters

**Solution:** Pipeline names must:

* Contain only alphanumeric characters, hyphens (`-`), and underscores (`_`)
* Be maximum 255 characters long
* Not be empty

**Valid names:**

* `kafka-to-oracle-pipeline`
* `user_events_pipeline_v2`
* `Pipeline123`

**Invalid names:**

* `pipeline name` (contains space)
* `pipeline.name` (contains dot)
* `pipeline@name` (contains special character)

***

### Issue: Cannot specify both existing and new connector

**Cause:** You provided both `existing_source_id` AND `source_name/type/config`

**Solution:** Choose ONE approach:

**Option A (Existing):**

```json theme={null}
{
  "existing_source_id": "uuid-here"
}
```

**Option B (New):**

```json theme={null}
{
  "source_name": "my-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": {...}
}
```

***

### Issue: Pipeline stuck in "BUILDING" state

**Cause:** Deployment failed or is taking longer than expected

**Solution:**

1. Check pipeline logs via WebSocket
2. Verify connector configurations are correct
3. Check infrastructure resources (CPU, memory)

```bash theme={null}
# Get detailed status
curl -X GET "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" -k
```

***

### Issue: SSL Certificate Error

**Cause:** Self-signed certificate or untrusted CA

**Solution for curl:**

```bash theme={null}
# Add -k or --insecure flag
curl -X GET "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" -k
```

**Solution for Python:**

```python theme={null}
requests.get(url, verify=False)
```

**Solution for HTTPie:**

```bash theme={null}
http --verify=no GET https://your-server/api/pipelines/
```

***

## 📝 Complete Example Script

Here's a complete Python script that performs all steps:

```python theme={null}
#!/usr/bin/env python3
"""
Complete Popsink API example script
"""
import requests
import json
from typing import Optional

# Configuration
SERVER = "https://your-server"
VERIFY_SSL = False  # Set to True in production

class PopsinkClient:
    def __init__(self, server: str, verify_ssl: bool = True):
        self.server = server.rstrip('/')
        self.verify_ssl = verify_ssl
        self.token: Optional[str] = None

    def register(self, email: str, password: str) -> dict:
        """Register a new user"""
        response = requests.post(
            f"{self.server}/api/auth/register",
            json={
                "email": email,
                "password": password,
                "is_active": True
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def login(self, email: str, password: str) -> str:
        """Login and get token"""
        response = requests.post(
            f"{self.server}/api/auth/jwt/login",
            data={
                "username": email,
                "password": password
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        data = response.json()
        self.token = data["access_token"]
        return self.token

    def _headers(self) -> dict:
        """Get headers with auth token"""
        if not self.token:
            raise ValueError("Not logged in. Call login() first.")
        return {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json"
        }

    def create_environment(self, name: str, description: str = "") -> dict:
        """Create an environment"""
        response = requests.post(
            f"{self.server}/api/envs/",
            headers=self._headers(),
            json={
                "name": name,
                "description": description,
                "use_retention": True
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def create_team(self, name: str, description: str = "") -> dict:
        """Create a team"""
        response = requests.post(
            f"{self.server}/api/teams/",
            headers=self._headers(),
            json={
                "name": name,
                "description": description
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def create_pipeline(self, name: str, team_id: str, config: dict) -> dict:
        """Create a pipeline"""
        pipeline_data = {
            "name": name,
            "team_id": team_id,
            **config
        }
        response = requests.post(
            f"{self.server}/api/pipelines/",
            headers=self._headers(),
            json=pipeline_data,
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def start_pipeline(self, pipeline_id: str) -> str:
        """Start a pipeline"""
        response = requests.post(
            f"{self.server}/api/pipelines/{pipeline_id}/start",
            headers=self._headers(),
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()


def main():
    # Initialize client
    client = PopsinkClient(SERVER, verify_ssl=VERIFY_SSL)

    # 1. Register
    print("1. Registering user...")
    user = client.register("demo@example.com", "SecurePassword123!")
    print(f"   ✓ User created: {user['id']}")

    # 2. Login
    print("\n2. Logging in...")
    token = client.login("demo@example.com", "SecurePassword123!")
    print(f"   ✓ Logged in (token: {token[:20]}...)")

    # 3. Create environment
    print("\n3. Creating environment...")
    env = client.create_environment("demo-env", "Demo environment")
    print(f"   ✓ Environment created: {env['id']}")

    # 4. Create team
    print("\n4. Creating team...")
    team = client.create_team("Demo Team", "Demo team for testing")
    print(f"   ✓ Team created: {team['id']}")

    # 5. Create pipeline
    print("\n5. Creating pipeline...")
    pipeline_config = {
        "source_name": "demo-kafka-source",
        "source_type": "KAFKA_SOURCE",
        "source_config": {
            "topic": "demo-topic",
            "bootstrap_servers": "kafka:9092",
            "consumer_group_id": "demo-group",
            "security_protocol": "PLAINTEXT"
        },
        "target_name": "demo-kafka-target",
        "target_type": "KAFKA_TARGET",
        "target_config": {
            "bootstrap_server": "kafka:9092",
            "security_protocol": "PLAINTEXT"
        },
        "subscription_target_table_name": "events",
        "subscription_backfill": False
    }
    pipeline = client.create_pipeline("demo-pipeline", team['id'], pipeline_config)
    print(f"   ✓ Pipeline created: {pipeline['id']}")

    # 6. Start pipeline
    print("\n6. Starting pipeline...")
    state = client.start_pipeline(pipeline['id'])
    print(f"   ✓ Pipeline state: {state}")

    print("\n✓ All steps completed successfully!")


if __name__ == "__main__":
    main()
```

***

## 🎓 Next Steps

Now that you understand the basics:

1. **Explore the API:** Try listing resources, filtering, pagination
2. **Monitor Pipelines:** Use logs and status endpoints
3. **Handle Errors:** Implement proper error handling
4. **Test Credentials:** Use credential check endpoints before creating connectors
5. **Optimize:** Reuse connectors, batch operations
6. **Automate:** Create scripts or CI/CD pipelines

***

## 📞 Support

* **Documentation:** This guide
* **API Schema:** `https://your-server/api/docs` (Swagger UI)
* **OpenAPI Spec:** `https://your-server/api/openapi.json`

***

**Happy Data Processing! 🚀**
