Skip to main content

📋 Table of Contents

  1. Quick Start
  2. Prerequisites
  3. API Overview
  4. Authentication Flow
  5. Step-by-Step Tutorial
  6. Testing Tools
  7. API Reference
  8. Connector Configurations
  9. Common Patterns
  10. Troubleshooting

🚀 Quick Start

For the impatient: Here’s the minimum you need to know:
# 1. Create a user
curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{"email":"user@example.com","password":"secure123","is_active":true}' -k

# 2. Login to get your token
curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=user@example.com&password=secure123' -k

# 3. Use the token in all subsequent requests
curl -X GET "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer YOUR_TOKEN_HERE" -k

✅ Prerequisites

What You Need

  • Access to a Popsink server (e.g., https://popsink.your-company.com)
  • A terminal or API client (curl, Postman, HTTPie, etc.)
  • 5-15 minutes to complete this guide

Basic Concepts

ConceptDescription
EnvironmentA workspace that contains teams and their resources
TeamA group of users working together on pipelines
PipelineA data processing flow from source to target
ConnectorA connection to a data source or destination
SubscriptionConfiguration for how data flows through a pipeline
DataModelSchema and configuration for data transformation

🔍 API Overview

Base URL Structure

https://your-server/api/

Authentication

All API endpoints (except registration and login) require a Bearer token in the Authorization header:
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

Response Format

All responses are in JSON format:
{
  "id": "uuid-here",
  "name": "resource-name",
  "created_at": "2025-11-19T10:00:00Z"
}

HTTP Status Codes

CodeMeaningWhen You’ll See It
200SuccessGET, PATCH operations
201CreatedPOST operations that create resources
202AcceptedAsync operations (start/pause pipeline)
204No ContentDELETE operations
400Bad RequestInvalid input data
401UnauthorizedMissing or invalid token
403ForbiddenInsufficient permissions
404Not FoundResource doesn’t exist
422Validation ErrorData doesn’t meet requirements

🔐 Authentication Flow

┌─────────────┐
│   Register  │  POST /auth/register
└──────┬──────┘


┌─────────────┐
│    Login    │  POST /auth/jwt/login
└──────┬──────┘


┌─────────────┐
│  Get Token  │  Save this token!
└──────┬──────┘


┌─────────────┐
│  Use Token  │  All API requests
└─────────────┘

📖 Step-by-Step Tutorial

Step 1: User Registration

What: Create your user account When: First time using the API Required: Email and password

Request

curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{
    "email": "john.doe@company.com",
    "password": "MySecurePassword123!",
    "is_active": true,
    "is_superuser": false,
    "is_verified": false
  }' -k

Field Descriptions

FieldTypeRequiredDescription
emailstringYour email address (must be unique)
passwordstringYour password (min 8 characters recommended)
is_activebooleanSet to true to activate the account immediately
is_superuserbooleanAdmin privileges (default: false)
is_verifiedbooleanEmail verification status (default: false)

Response (201 Created)

{
  "id": "3b0c298a-062e-42d1-a01b-277fa66908da",
  "email": "john.doe@company.com",
  "is_active": true,
  "is_superuser": false,
  "is_verified": false,
  "active_env_id": null
}

📝 Save This

USER_ID=3b0c298a-062e-42d1-a01b-277fa66908da
EMAIL=john.doe@company.com
PASSWORD=MySecurePassword123!

Step 2: User Login

What: Authenticate and get your access token When: Before making any authenticated API calls Token Lifetime: Configurable (typically 24 hours)

Request

curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=john.doe@company.com&password=MySecurePassword123!' -k

Response (200 OK)

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIzYjBjMjk4YS0wNjJlLTQyZDEtYTAxYi0yNzdmYTY2OTA4ZGEiLCJhdWQiOlsiZmFzdGFwaS11c2VyczphdXRoIl0sImV4cCI6MTcwMDAwMDAwMH0.signature",
  "token_type": "bearer"
}

📝 Save This

TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

Using Your Token

From now on, include this header in every request:
-H "Authorization: Bearer $TOKEN"

Step 3: Environment Setup

What: Create a workspace for your teams and pipelines When: After logging in, before creating teams Permissions: Any authenticated user can create an environment

Request

curl -X POST "https://your-server/api/envs/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "production-environment",
    "description": "Production data processing environment",
    "use_retention": true
  }' -k

Field Descriptions

FieldTypeRequiredDescription
namestringEnvironment name (must be unique)
descriptionstringDescription of the environment’s purpose
use_retentionbooleanEnable data retention policies (default: false)

Response (201 Created)

{
  "id": "d455a981-be17-455e-a03c-4d9c3224db72",
  "name": "production-environment",
  "description": "Production data processing environment",
  "use_retention": true,
  "retention_configuration": null
}

📝 Save This

ENV_ID=d455a981-be17-455e-a03c-4d9c3224db72

Step 4: Team Creation

What: Create a team within your environment When: After creating an environment Ownership: The creator automatically becomes a team owner

Request

curl -X POST "https://your-server/api/teams/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Data Engineering Team",
    "description": "Team responsible for ETL pipelines"
  }' -k

Field Descriptions

FieldTypeRequiredDescription
namestringTeam name (unique within environment)
descriptionstringPurpose and responsibilities of the team

Response (201 Created)

{
  "id": "881e0fce-5caa-4f5c-82c6-5eb883c47100",
  "name": "Data Engineering Team",
  "description": "Team responsible for ETL pipelines",
  "env_id": "d455a981-be17-455e-a03c-4d9c3224db72"
}

📝 Save This

TEAM_ID=881e0fce-5caa-4f5c-82c6-5eb883c47100

Step 5: Team Member Management

What: Add users to your team When: After creating a team Permissions: Only team owners can add members

5.1: Create Additional Users (Optional)

If you need to invite teammates, first create their accounts:
curl -X POST "https://your-server/api/auth/register" \
  -H "Content-Type: application/json" \
  -d '{
    "email": "jane.smith@company.com",
    "password": "SecurePass456!",
    "is_active": true
  }' -k
Response:
{
  "id": "32d4b09d-4527-40d5-923b-e166d85cb459",
  "email": "jane.smith@company.com",
  "is_active": true
}

5.2: Add Members to Team

curl -X POST "https://your-server/api/teams/$TEAM_ID/members/bulk" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "members": [
      "32d4b09d-4527-40d5-923b-e166d85cb459"
    ]
  }' -k

Response (204 No Content)

Success! The members have been added to your team.

5.3: List Team Members

curl -X GET "https://your-server/api/teams/$TEAM_ID/members" \
  -H "Authorization: Bearer $TOKEN" -k
Response:
[
  {
    "id": "3b0c298a-062e-42d1-a01b-277fa66908da",
    "email": "john.doe@company.com",
    "role": "owner"
  },
  {
    "id": "32d4b09d-4527-40d5-923b-e166d85cb459",
    "email": "jane.smith@company.com",
    "role": "member"
  }
]

Step 6: Pipeline Creation

What: Create a data pipeline with source and target connectors When: After team setup Permissions: Team members with write access

🎯 Understanding Pipeline Structure (V2 - Flattened)

The new API uses a flattened structure instead of nested json_configuration. You can:
  1. Use existing connectors by specifying their IDs
  2. Create new connectors by providing name, type, and config
  3. Mix both (e.g., existing source + new target)

Option A: Create Pipeline with New Connectors

curl -X POST "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "kafka-to-oracle-pipeline",
    "team_id": "'$TEAM_ID'",

    "source_name": "kafka-events-source",
    "source_type": "KAFKA_SOURCE",
    "source_config": {
      "topic": "user-events",
      "bootstrap_servers": "kafka.company.com:9092",
      "consumer_group_id": "pipeline-consumer",
      "sasl_username": "kafka_user",
      "sasl_password": "kafka_pass",
      "sasl_mechanism": "PLAIN",
      "security_protocol": "SASL_SSL"
    },

    "target_name": "oracle-warehouse",
    "target_type": "ORACLE_TARGET",
    "target_config": {
      "host": "oracle.company.com",
      "port": "1521",
      "database": "ORCL",
      "server_name": "XE",
      "server_id": "oraclesrv01",
      "user": "etl_user",
      "password": "etl_pass"
    },

    "subscription_target_table_name": "events",
    "subscription_backfill": false,
    "subscription_mapper_config": [
      {
        "key": "user_id",
        "path": "$.user_id",
        "cast": "string",
        "primary_key": true,
        "nullable": false
      },
      {
        "key": "event_type",
        "path": "$.event_type",
        "cast": "string",
        "primary_key": false,
        "nullable": false
      }
    ]
  }' -k

Option B: Create Pipeline with Existing Connectors

curl -X POST "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "reuse-connectors-pipeline",
    "team_id": "'$TEAM_ID'",

    "existing_source_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
    "existing_target_id": "f6e5d4c3-b2a1-4c5d-9e8f-5d4c3b2a1f0e",

    "subscription_target_table_name": "events_copy",
    "subscription_backfill": true
  }' -k

Pipeline Configuration Fields

Core Fields
FieldTypeRequiredDescription
namestringPipeline name (unique within team, alphanumeric with - and _ only, max 255 chars)
team_idUUIDTeam that owns this pipeline
Source Connector (Choose ONE approach)
Approach 1: Use Existing Connector
FieldTypeDescription
existing_source_idUUIDID of an existing source connector
Approach 2: Create New Connector
FieldTypeDescription
source_namestringName for the new source connector
source_typestringConnector type: KAFKA_SOURCE
source_configobjectConfiguration specific to the connector type
Target Connector (Choose ONE approach)
Approach 1: Use Existing Connector
FieldTypeDescription
existing_target_idUUIDID of an existing target connector
Approach 2: Create New Connector
FieldTypeDescription
target_namestringName for the new target connector
target_typestringConnector type: KAFKA_TARGET, ORACLE_TARGET
target_configobjectConfiguration specific to the connector type
Data Model Configuration (Optional)
FieldTypeDescription
datamodel_source_topicstringSource topic for the data model
datamodel_target_topicstringTarget topic for the data model
datamodel_error_table_enabledbooleanEnable error table for data model
datamodel_error_table_namestringName of the error table
datamodel_error_table_target_idUUIDTarget connector for error table
Subscription Configuration (Optional)
FieldTypeDescription
subscription_target_table_namestringTarget table name in the destination
subscription_backfillbooleanWhether to backfill historical data
subscription_mapper_configarrayColumn mapping configuration (see Mapper Config below)
subscription_consumer_idstringConsumer ID (auto-generated if not provided)
subscription_error_table_enabledbooleanEnable error table for subscription
subscription_error_table_namestringName of the subscription error table
subscription_error_table_target_idUUIDTarget connector for subscription errors
Mapper Configuration
Each mapper config entry:
FieldTypeDescription
keystringColumn name
pathstringJSON path to extract value (e.g., $.field.nested)
staticstringStatic value (alternative to path)
caststringData type: string, int, float, bool, date, datetime, time
cast_formatstringFormat for casting (e.g., date format)
primary_keybooleanIs this a primary key column
nullablebooleanCan this column be null

Response (201 Created)

{
  "id": "5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820",
  "name": "kafka-to-oracle-pipeline",
  "state": "draft",
  "version": 1,
  "team_id": "881e0fce-5caa-4f5c-82c6-5eb883c47100",
  "team_name": "Data Engineering Team",

  "existing_source_id": null,
  "source_name": "kafka-events-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": { "topic": "user-events", ... },

  "existing_target_id": null,
  "target_name": "oracle-warehouse",
  "target_type": "ORACLE_TARGET",
  "target_config": { "host": "oracle.company.com", ... },

  "subscription_target_table_name": "events",
  "subscription_backfill": false,
  "subscription_mapper_config": [...],

  "created_at": "2025-11-19T10:30:00Z",
  "updated_at": "2025-11-19T10:30:00Z"
}

📝 Save This

PIPELINE_ID=5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820

🔍 Available Connector Types

Sources:
  • KAFKA_SOURCE - Apache Kafka
Targets:
  • KAFKA_TARGET - Apache Kafka
  • ORACLE_TARGET - Oracle database
Jobs:
  • JOB_SMT - Single Message Transform (for data transformations)

Step 7: Pipeline Updates

What: Modify an existing pipeline When: Need to change configuration or settings Permissions: Team members with write access

Update Pipeline Configuration

curl -X PATCH "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "updated-pipeline-name",
    "source_config": {
      "topic": "new-topic",
      "bootstrap_servers": "kafka.company.com:9092",
      "consumer_group_id": "updated-consumer"
    },
    "subscription_backfill": true
  }' -k

💡 Update Patterns

Pattern 1: Update Only Name
{
  "name": "new-pipeline-name"
}
Pattern 2: Switch to Existing Connector
{
  "existing_source_id": "new-connector-uuid",
  "source_name": null,
  "source_type": null,
  "source_config": null
}
Pattern 3: Update Subscription Config
{
  "subscription_target_table_name": "new_table",
  "subscription_backfill": true,
  "subscription_mapper_config": [...]
}

Response (200 OK)

Returns the updated pipeline with all fields.

Step 8: Pipeline Control

What: Start, pause, or check pipeline status When: After pipeline creation and configuration States: draftbuildinglivepaused / error

8.1: Start Pipeline

curl -X POST "https://your-server/api/pipelines/$PIPELINE_ID/start" \
  -H "Authorization: Bearer $TOKEN" -k
Response (202 Accepted):
"BUILDING"
The pipeline transitions through these states:
  1. draft - Initial state, configuration in progress
  2. building - Pipeline is being deployed
  3. live - Pipeline is running and processing data

8.2: Pause Pipeline

curl -X POST "https://your-server/api/pipelines/$PIPELINE_ID/pause" \
  -H "Authorization: Bearer $TOKEN" -k
Response (202 Accepted):
"PAUSED"

8.3: Check Pipeline Status

curl -X GET "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" -k
Response (200 OK):
{
  "id": "5efd3d63-a6eb-4b0c-afe9-6ee1d8f5f820",
  "name": "kafka-to-oracle-pipeline",
  "state": "live",
  "version": 1,
  "created_at": "2025-11-19T10:30:00Z",
  "updated_at": "2025-11-19T10:45:00Z"
}

8.4: Get Pipeline Logs (WebSocket)

// WebSocket connection example
const ws = new WebSocket(
  `wss://your-server/api/pipelines/${PIPELINE_ID}/logs`,
  ['Authorization', `Bearer ${TOKEN}`]
);

ws.onmessage = (event) => {
  console.log('Log:', event.data);
};

Pipeline State Diagram

     ┌──────┐
     │DRAFT │ (initial state)
     └───┬──┘
         │ start

   ┌─────────┐
   │BUILDING │ (deployment in progress)
   └────┬────┘
        │ (automatic)

     ┌──────┐
     │ LIVE │ (processing data)
     └───┬──┘

         │ pause

    ┌────────┐
    │ PAUSED │
    └───┬────┘

      resume


   ┌─────────┐
   │BUILDING │
   └─────────┘

🛠️ Testing Tools

Option 1: cURL (Command Line)

Pros: Available everywhere, scriptable Cons: Verbose, requires manual token management
# Set variables for easy reuse
export SERVER="https://your-server"
export TOKEN="your-token-here"

# Make a request
curl -X GET "$SERVER/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" -k

Option 2: HTTPie (Command Line - User Friendly)

Pros: Simpler syntax, better output Cons: Requires installation
# Install
pip install httpie

# Use
http GET https://your-server/api/pipelines/ \
  Authorization:"Bearer $TOKEN"

Option 3: Postman (GUI)

Pros: Visual interface, request collections Cons: Requires download
  1. Download Postman
  2. Create a new request
  3. Set method (GET, POST, etc.)
  4. Enter URL: https://your-server/api/pipelines/
  5. Add header: Authorization: Bearer YOUR_TOKEN
  6. Add JSON body for POST/PATCH
  7. Click “Send”

Option 4: Python Script

Pros: Full programming capabilities Cons: Requires Python knowledge
import requests

# Configuration
SERVER = "https://your-server"
TOKEN = "your-token-here"

headers = {
    "Authorization": f"Bearer {TOKEN}",
    "Content-Type": "application/json"
}

# Get all pipelines
response = requests.get(
    f"{SERVER}/api/pipelines/",
    headers=headers,
    verify=False  # Only for self-signed certificates
)

print(response.json())

# Create a pipeline
pipeline_data = {
    "name": "my-pipeline",
    "team_id": "your-team-id",
    "source_name": "my-source",
    "source_type": "KAFKA_SOURCE",
    "source_config": {...}
}

response = requests.post(
    f"{SERVER}/api/pipelines/",
    headers=headers,
    json=pipeline_data,
    verify=False
)

print(response.json())

📚 API Reference

Authentication Endpoints

MethodEndpointDescription
POST/auth/registerCreate a new user account
POST/auth/jwt/loginLogin and get access token
POST/auth/jwt/logoutLogout (invalidate token)
POST/auth/forgot-passwordRequest password reset
POST/auth/reset-passwordReset password with token

User Endpoints

MethodEndpointDescription
GET/usersList all users (paginated)
GET/users/{id}Get user details
PATCH/users/{id}Update user
DELETE/users/{id}Delete user
POST/users/me/change-passwordChange current user’s password
POST/users/me/service-account-tokenCreate service account token
GET/users/export-allExport all data (admin only)
POST/users/import-allImport all data (admin only)

Environment Endpoints

MethodEndpointDescription
GET/envs/List all environments (paginated)
POST/envs/Create a new environment
GET/envs/{id}Get environment details
GET/envs/filter-one?name={name}Get environment by name
PATCH/envs/{id}Update environment
DELETE/envs/{id}Delete environment
POST/envs/check-byok-credentialsCheck BYOK credentials

Team Endpoints

MethodEndpointDescription
GET/teams/List all teams (paginated)
POST/teams/Create a new team
GET/teams/{id}Get team details
GET/teams/filter-one?name={name}Get team by name
PATCH/teams/{id}Update team
DELETE/teams/{id}Delete team

Team Member Endpoints

MethodEndpointDescription
GET/teams/{id}/membersList team members
POST/teams/{id}/members/bulkAdd multiple members
DELETE/teams/{id}/members/{user_id}Remove a member

Pipeline Endpoints

MethodEndpointDescription
GET/pipelines/List all pipelines (paginated)
GET/pipelines/count-statusCount pipelines by status
POST/pipelines/Create a new pipeline
GET/pipelines/{id}Get pipeline details
GET/pipelines/filter-one?name={name}Get pipeline by name
PATCH/pipelines/{id}Update pipeline
DELETE/pipelines/{id}Delete pipeline
POST/pipelines/{id}/startStart pipeline (202)
POST/pipelines/{id}/pausePause pipeline (202)
WebSocket/pipelines/{id}/logsStream pipeline logs
GET/pipelines/{id}/statusGet pipeline status (deprecated)
GET/pipelines/{id}/configurationExport pipeline config
POST/pipelines/{id}/configurationImport pipeline config
GET/pipelines/{id}/worker-configGet worker config
GET/pipelines/status/all-non-draftList non-draft pipelines with status
GET/pipelines/{id}/latency-informationsGet pipeline latency info

Connector Endpoints

MethodEndpointDescription
GET/connectors/source-configList source configurations
GET/connectors/target-configList target configurations
GET/connectors/filter-one?name={name}Get connector by name
GET/connectors/{id}Get connector by ID
GET/connectors/{id}/source-worker-configGet source worker config
GET/connectors/{id}/target-worker-configGet target worker config
POST/connectors/Create connector
PATCH/connectors/{id}Update connector
DELETE/connectors/{id}Delete connector
POST/connectors/{id}/startStart connector worker (202)
POST/connectors/{id}/stopStop connector worker (202)
WebSocket/connectors/{id}/logsStream connector logs

Subscription Endpoints

MethodEndpointDescription
GET/subscriptions/List subscriptions (paginated)
GET/subscriptions/{id}Get subscription details
POST/subscriptions/Create subscription
POST/subscriptions/subscribe-allBulk create subscriptions
PATCH/subscriptions/{id}Update subscription
DELETE/subscriptions/{id}Delete subscription
POST/subscriptions/{id}/startStart subscription (202)
POST/subscriptions/{id}/pausePause subscription (202)

DataModel Endpoints

MethodEndpointDescription
GET/datamodels/List datamodels (paginated)
GET/datamodels/{id}Get datamodel by ID
GET/datamodels/{id}/monitoringGet datamodel monitoring
GET/datamodels/{id}/schemaGet datamodel schema
PATCH/datamodels/{id}/error-tableUpdate error table config
DELETE/datamodels/{id}Delete datamodel

SMT/Transformation Endpoints

MethodEndpointDescription
POST/smt/process_mapperProcess mapper transformation

Connector Type Specific Endpoints

Kafka Source

MethodEndpointDescription
POST/connector-types/kafka-source/check-credentialsCheck Kafka source credentials
POST/connector-types/kafka-source/fetch-messagesFetch messages from Kafka
POST/connector-types/kafka-source/list-topicsList Kafka topics

Oracle Target

MethodEndpointDescription
POST/connector-types/oracle-target/check-credentialsCheck Oracle target credentials

🔧 Connector Configurations

KAFKA_SOURCE Configuration

{
  "topic": "user-events",
  "consumer_group_id": "pipeline-consumer",
  "bootstrap_servers": "kafka1:9092,kafka2:9092",
  "security_protocol": "SASL_SSL",
  "sasl_mechanism": "PLAIN",
  "sasl_username": "kafka_user",
  "sasl_password": "kafka_pass",
  "include_metadata": false
}
Field Descriptions:
FieldTypeRequiredDescription
topicstringKafka topic to consume from
bootstrap_serversstringComma-separated list of Kafka brokers
consumer_group_idstringConsumer group ID (auto-generated if not provided)
security_protocolstringPLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL
sasl_mechanismstringPLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
sasl_usernamestringSASL username (required if using SASL)
sasl_passwordstringSASL password (required if using SASL)
include_metadatabooleanInclude Kafka metadata in messages (default: false)

KAFKA_TARGET Configuration

{
  "bootstrap_server": "kafka1:9092",
  "security_protocol": "SASL_SSL",
  "sasl_mechanism": "PLAIN",
  "sasl_username": "kafka_user",
  "sasl_password": "kafka_pass",
  "ca_cert": "",
  "group_id": "consumer-group"
}
Field Descriptions:
FieldTypeRequiredDescription
bootstrap_serverstringKafka broker address
security_protocolstringPLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL
sasl_mechanismstringPLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER
sasl_usernamestringSASL username
sasl_passwordstringSASL password
ca_certstringCA certificate for SSL
group_idstringConsumer group ID

ORACLE_TARGET Configuration

{
  "database": "ORCL",
  "user": "etl_user",
  "password": "etl_pass",
  "host": "oracle.company.com",
  "port": "1521",
  "server_name": "XE",
  "server_id": "oraclesrv01"
}
Field Descriptions:
FieldTypeRequiredDescription
databasestringOracle database name
userstringUsername for Oracle
passwordstringPassword for Oracle
hoststringHostname or IP address
portstringPort number (typically 1521)
server_namestringService name (e.g., XE)
server_idstringServer identifier/SID

Mapper Column Config :
FieldTypeDescription
keystringColumn name in target
pathstringJSON path to extract value (e.g., $.field.nested)
staticstringStatic value (alternative to path)
caststringCast to: string, int, float, bool, date, datetime, time
cast_formatstringFormat for casting (e.g., date format)
primary_keybooleanIs this a primary key column
nullablebooleanCan this column be null

🎯 Common Patterns

Pattern 1: List Resources with Filters

# Get all pipelines in a specific state
curl -X GET "https://your-server/api/pipelines/?state=live" \
  -H "Authorization: Bearer $TOKEN" -k

# Get pipelines for a specific team
curl -X GET "https://your-server/api/pipelines/?team_id=$TEAM_ID" \
  -H "Authorization: Bearer $TOKEN" -k

# Search pipelines by name
curl -X GET "https://your-server/api/pipelines/?search=kafka" \
  -H "Authorization: Bearer $TOKEN" -k

Pattern 2: Pagination

# Get first page (default 50 items)
curl -X GET "https://your-server/api/pipelines/?skip=0&limit=50" \
  -H "Authorization: Bearer $TOKEN" -k

# Get second page
curl -X GET "https://your-server/api/pipelines/?skip=50&limit=50" \
  -H "Authorization: Bearer $TOKEN" -k

Pattern 3: Error Handling

import requests

try:
    response = requests.get(
        "https://your-server/api/pipelines/",
        headers={"Authorization": f"Bearer {TOKEN}"},
        verify=False
    )
    response.raise_for_status()  # Raises exception for 4xx/5xx
    data = response.json()
except requests.exceptions.HTTPError as e:
    print(f"HTTP Error: {e}")
    print(f"Response: {e.response.text}")
except requests.exceptions.ConnectionError as e:
    print(f"Connection Error: {e}")
except Exception as e:
    print(f"Error: {e}")

Pattern 4: Bulk Operations

# Create multiple pipelines
pipeline_configs = [
    {"name": "pipeline-1", "team_id": TEAM_ID, ...},
    {"name": "pipeline-2", "team_id": TEAM_ID, ...},
    {"name": "pipeline-3", "team_id": TEAM_ID, ...}
]

for config in pipeline_configs:
    response = requests.post(
        f"{SERVER}/api/pipelines/",
        headers=headers,
        json=config
    )
    print(f"Created: {response.json()['id']}")

Pattern 5: Test Connector Credentials Before Creating

# Test Kafka source credentials
curl -X POST "https://your-server/api/connector-types/kafka-source/check-credentials" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "bootstrap_servers": "kafka:9092",
    "topic": "test-topic",
    "security_protocol": "PLAINTEXT"
  }' -k

# Test Oracle target credentials
curl -X POST "https://your-server/api/connector-types/oracle-target/check-credentials" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "host": "oracle.company.com",
    "port": "1521",
    "database": "ORCL",
    "user": "etl_user",
    "password": "etl_pass",
    "server_name": "XE",
    "server_id": "oraclesrv01"
  }' -k

🐛 Troubleshooting

Issue: “401 Unauthorized”

Cause: Token is missing, expired, or invalid Solution:
  1. Check that you included the Authorization header
  2. Verify the token format: Bearer YOUR_TOKEN
  3. Log in again to get a fresh token
curl -X POST "https://your-server/api/auth/jwt/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d 'username=YOUR_EMAIL&password=YOUR_PASSWORD' -k

Issue: “403 Forbidden”

Cause: You don’t have permission for this operation Solution:
  1. Verify you’re a member of the team
  2. Check if you have the required role (owner vs member)
  3. Contact the team owner to grant permissions

Issue: “422 Validation Error”

Cause: Request data doesn’t meet validation requirements Solution:
  1. Check the error response for specific field errors
  2. Verify all required fields are provided
  3. Ensure data types match (UUID, string, boolean, etc.)
Example Error Response:
{
  "detail": [
    {
      "loc": ["body", "source_name"],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}
Fix:
{
  "name": "my-pipeline",
  "team_id": "uuid-here",
  "source_name": "my-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": {...}
}

Issue: Invalid pipeline name

Cause: Pipeline name contains invalid characters Solution: Pipeline names must:
  • Contain only alphanumeric characters, hyphens (-), and underscores (_)
  • Be maximum 255 characters long
  • Not be empty
Valid names:
  • kafka-to-oracle-pipeline
  • user_events_pipeline_v2
  • Pipeline123
Invalid names:
  • pipeline name (contains space)
  • pipeline.name (contains dot)
  • pipeline@name (contains special character)

Issue: Cannot specify both existing and new connector

Cause: You provided both existing_source_id AND source_name/type/config Solution: Choose ONE approach: Option A (Existing):
{
  "existing_source_id": "uuid-here"
}
Option B (New):
{
  "source_name": "my-source",
  "source_type": "KAFKA_SOURCE",
  "source_config": {...}
}

Issue: Pipeline stuck in “BUILDING” state

Cause: Deployment failed or is taking longer than expected Solution:
  1. Check pipeline logs via WebSocket
  2. Verify connector configurations are correct
  3. Check infrastructure resources (CPU, memory)
# Get detailed status
curl -X GET "https://your-server/api/pipelines/$PIPELINE_ID" \
  -H "Authorization: Bearer $TOKEN" -k

Issue: SSL Certificate Error

Cause: Self-signed certificate or untrusted CA Solution for curl:
# Add -k or --insecure flag
curl -X GET "https://your-server/api/pipelines/" \
  -H "Authorization: Bearer $TOKEN" -k
Solution for Python:
requests.get(url, verify=False)
Solution for HTTPie:
http --verify=no GET https://your-server/api/pipelines/

📝 Complete Example Script

Here’s a complete Python script that performs all steps:
#!/usr/bin/env python3
"""
Complete Popsink API example script
"""
import requests
import json
from typing import Optional

# Configuration
SERVER = "https://your-server"
VERIFY_SSL = False  # Set to True in production

class PopsinkClient:
    def __init__(self, server: str, verify_ssl: bool = True):
        self.server = server.rstrip('/')
        self.verify_ssl = verify_ssl
        self.token: Optional[str] = None

    def register(self, email: str, password: str) -> dict:
        """Register a new user"""
        response = requests.post(
            f"{self.server}/api/auth/register",
            json={
                "email": email,
                "password": password,
                "is_active": True
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def login(self, email: str, password: str) -> str:
        """Login and get token"""
        response = requests.post(
            f"{self.server}/api/auth/jwt/login",
            data={
                "username": email,
                "password": password
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        data = response.json()
        self.token = data["access_token"]
        return self.token

    def _headers(self) -> dict:
        """Get headers with auth token"""
        if not self.token:
            raise ValueError("Not logged in. Call login() first.")
        return {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json"
        }

    def create_environment(self, name: str, description: str = "") -> dict:
        """Create an environment"""
        response = requests.post(
            f"{self.server}/api/envs/",
            headers=self._headers(),
            json={
                "name": name,
                "description": description,
                "use_retention": True
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def create_team(self, name: str, description: str = "") -> dict:
        """Create a team"""
        response = requests.post(
            f"{self.server}/api/teams/",
            headers=self._headers(),
            json={
                "name": name,
                "description": description
            },
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def create_pipeline(self, name: str, team_id: str, config: dict) -> dict:
        """Create a pipeline"""
        pipeline_data = {
            "name": name,
            "team_id": team_id,
            **config
        }
        response = requests.post(
            f"{self.server}/api/pipelines/",
            headers=self._headers(),
            json=pipeline_data,
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()

    def start_pipeline(self, pipeline_id: str) -> str:
        """Start a pipeline"""
        response = requests.post(
            f"{self.server}/api/pipelines/{pipeline_id}/start",
            headers=self._headers(),
            verify=self.verify_ssl
        )
        response.raise_for_status()
        return response.json()


def main():
    # Initialize client
    client = PopsinkClient(SERVER, verify_ssl=VERIFY_SSL)

    # 1. Register
    print("1. Registering user...")
    user = client.register("demo@example.com", "SecurePassword123!")
    print(f"   ✓ User created: {user['id']}")

    # 2. Login
    print("\n2. Logging in...")
    token = client.login("demo@example.com", "SecurePassword123!")
    print(f"   ✓ Logged in (token: {token[:20]}...)")

    # 3. Create environment
    print("\n3. Creating environment...")
    env = client.create_environment("demo-env", "Demo environment")
    print(f"   ✓ Environment created: {env['id']}")

    # 4. Create team
    print("\n4. Creating team...")
    team = client.create_team("Demo Team", "Demo team for testing")
    print(f"   ✓ Team created: {team['id']}")

    # 5. Create pipeline
    print("\n5. Creating pipeline...")
    pipeline_config = {
        "source_name": "demo-kafka-source",
        "source_type": "KAFKA_SOURCE",
        "source_config": {
            "topic": "demo-topic",
            "bootstrap_servers": "kafka:9092",
            "consumer_group_id": "demo-group",
            "security_protocol": "PLAINTEXT"
        },
        "target_name": "demo-kafka-target",
        "target_type": "KAFKA_TARGET",
        "target_config": {
            "bootstrap_server": "kafka:9092",
            "security_protocol": "PLAINTEXT"
        },
        "subscription_target_table_name": "events",
        "subscription_backfill": False
    }
    pipeline = client.create_pipeline("demo-pipeline", team['id'], pipeline_config)
    print(f"   ✓ Pipeline created: {pipeline['id']}")

    # 6. Start pipeline
    print("\n6. Starting pipeline...")
    state = client.start_pipeline(pipeline['id'])
    print(f"   ✓ Pipeline state: {state}")

    print("\n✓ All steps completed successfully!")


if __name__ == "__main__":
    main()

🎓 Next Steps

Now that you understand the basics:
  1. Explore the API: Try listing resources, filtering, pagination
  2. Monitor Pipelines: Use logs and status endpoints
  3. Handle Errors: Implement proper error handling
  4. Test Credentials: Use credential check endpoints before creating connectors
  5. Optimize: Reuse connectors, batch operations
  6. Automate: Create scripts or CI/CD pipelines

📞 Support

  • Documentation: This guide
  • API Schema: https://your-server/api/docs (Swagger UI)
  • OpenAPI Spec: https://your-server/api/openapi.json

Happy Data Processing! 🚀