📋 Table of Contents
- Quick Start
- Prerequisites
- API Overview
- Authentication Flow
- Step-by-Step Tutorial
- Testing Tools
- API Reference
- Connector Configurations
- Common Patterns
- Troubleshooting
🚀 Quick Start
For the impatient: Here’s the minimum you need to know:✅ Prerequisites
What You Need
- Access to a Popsink server (e.g.,
https://popsink.your-company.com) - A terminal or API client (curl, Postman, HTTPie, etc.)
- 5-15 minutes to complete this guide
Basic Concepts
| Concept | Description |
|---|---|
| Environment | A workspace that contains teams and their resources |
| Team | A group of users working together on pipelines |
| Pipeline | A data processing flow from source to target |
| Connector | A connection to a data source or destination |
| Subscription | Configuration for how data flows through a pipeline |
| DataModel | Schema and configuration for data transformation |
🔍 API Overview
Base URL Structure
Authentication
All API endpoints (except registration and login) require a Bearer token in theAuthorization header:
Response Format
All responses are in JSON format:HTTP Status Codes
| Code | Meaning | When You’ll See It |
|---|---|---|
200 | Success | GET, PATCH operations |
201 | Created | POST operations that create resources |
202 | Accepted | Async operations (start/pause pipeline) |
204 | No Content | DELETE operations |
400 | Bad Request | Invalid input data |
401 | Unauthorized | Missing or invalid token |
403 | Forbidden | Insufficient permissions |
404 | Not Found | Resource doesn’t exist |
422 | Validation Error | Data doesn’t meet requirements |
🔐 Authentication Flow
📖 Step-by-Step Tutorial
Step 1: User Registration
What: Create your user account When: First time using the API Required: Email and passwordRequest
Field Descriptions
| Field | Type | Required | Description |
|---|---|---|---|
email | string | ✅ | Your email address (must be unique) |
password | string | ✅ | Your password (min 8 characters recommended) |
is_active | boolean | ✅ | Set to true to activate the account immediately |
is_superuser | boolean | ⬜ | Admin privileges (default: false) |
is_verified | boolean | ⬜ | Email verification status (default: false) |
Response (201 Created)
📝 Save This
Step 2: User Login
What: Authenticate and get your access token When: Before making any authenticated API calls Token Lifetime: Configurable (typically 24 hours)Request
Response (200 OK)
📝 Save This
Using Your Token
From now on, include this header in every request:Step 3: Environment Setup
What: Create a workspace for your teams and pipelines When: After logging in, before creating teams Permissions: Any authenticated user can create an environmentRequest
Field Descriptions
| Field | Type | Required | Description |
|---|---|---|---|
name | string | ✅ | Environment name (must be unique) |
description | string | ⬜ | Description of the environment’s purpose |
use_retention | boolean | ⬜ | Enable data retention policies (default: false) |
Response (201 Created)
📝 Save This
Step 4: Team Creation
What: Create a team within your environment When: After creating an environment Ownership: The creator automatically becomes a team ownerRequest
Field Descriptions
| Field | Type | Required | Description |
|---|---|---|---|
name | string | ✅ | Team name (unique within environment) |
description | string | ⬜ | Purpose and responsibilities of the team |
Response (201 Created)
📝 Save This
Step 5: Team Member Management
What: Add users to your team When: After creating a team Permissions: Only team owners can add members5.1: Create Additional Users (Optional)
If you need to invite teammates, first create their accounts:5.2: Add Members to Team
Response (204 No Content)
Success! The members have been added to your team.5.3: List Team Members
Step 6: Pipeline Creation
What: Create a data pipeline with source and target connectors When: After team setup Permissions: Team members with write access🎯 Understanding Pipeline Structure (V2 - Flattened)
The new API uses a flattened structure instead of nestedjson_configuration. You can:
- Use existing connectors by specifying their IDs
- Create new connectors by providing name, type, and config
- Mix both (e.g., existing source + new target)
Option A: Create Pipeline with New Connectors
Option B: Create Pipeline with Existing Connectors
Pipeline Configuration Fields
Core Fields
| Field | Type | Required | Description |
|---|---|---|---|
name | string | ✅ | Pipeline name (unique within team, alphanumeric with - and _ only, max 255 chars) |
team_id | UUID | ✅ | Team that owns this pipeline |
Source Connector (Choose ONE approach)
Approach 1: Use Existing Connector| Field | Type | Description |
|---|---|---|
existing_source_id | UUID | ID of an existing source connector |
| Field | Type | Description |
|---|---|---|
source_name | string | Name for the new source connector |
source_type | string | Connector type: KAFKA_SOURCE |
source_config | object | Configuration specific to the connector type |
Target Connector (Choose ONE approach)
Approach 1: Use Existing Connector| Field | Type | Description |
|---|---|---|
existing_target_id | UUID | ID of an existing target connector |
| Field | Type | Description |
|---|---|---|
target_name | string | Name for the new target connector |
target_type | string | Connector type: KAFKA_TARGET, ORACLE_TARGET |
target_config | object | Configuration specific to the connector type |
Data Model Configuration (Optional)
| Field | Type | Description |
|---|---|---|
datamodel_source_topic | string | Source topic for the data model |
datamodel_target_topic | string | Target topic for the data model |
datamodel_error_table_enabled | boolean | Enable error table for data model |
datamodel_error_table_name | string | Name of the error table |
datamodel_error_table_target_id | UUID | Target connector for error table |
Subscription Configuration (Optional)
| Field | Type | Description |
|---|---|---|
subscription_target_table_name | string | Target table name in the destination |
subscription_backfill | boolean | Whether to backfill historical data |
subscription_mapper_config | array | Column mapping configuration (see Mapper Config below) |
subscription_consumer_id | string | Consumer ID (auto-generated if not provided) |
subscription_error_table_enabled | boolean | Enable error table for subscription |
subscription_error_table_name | string | Name of the subscription error table |
subscription_error_table_target_id | UUID | Target connector for subscription errors |
Mapper Configuration
Each mapper config entry:| Field | Type | Description |
|---|---|---|
key | string | Column name |
path | string | JSON path to extract value (e.g., $.field.nested) |
static | string | Static value (alternative to path) |
cast | string | Data type: string, int, float, bool, date, datetime, time |
cast_format | string | Format for casting (e.g., date format) |
primary_key | boolean | Is this a primary key column |
nullable | boolean | Can this column be null |
Response (201 Created)
📝 Save This
🔍 Available Connector Types
Sources:KAFKA_SOURCE- Apache Kafka
KAFKA_TARGET- Apache KafkaORACLE_TARGET- Oracle database
JOB_SMT- Single Message Transform (for data transformations)
Step 7: Pipeline Updates
What: Modify an existing pipeline When: Need to change configuration or settings Permissions: Team members with write accessUpdate Pipeline Configuration
💡 Update Patterns
Pattern 1: Update Only NameResponse (200 OK)
Returns the updated pipeline with all fields.Step 8: Pipeline Control
What: Start, pause, or check pipeline status When: After pipeline creation and configuration States:draft → building → live → paused / error
8.1: Start Pipeline
draft- Initial state, configuration in progressbuilding- Pipeline is being deployedlive- Pipeline is running and processing data
8.2: Pause Pipeline
8.3: Check Pipeline Status
8.4: Get Pipeline Logs (WebSocket)
Pipeline State Diagram
🛠️ Testing Tools
Option 1: cURL (Command Line)
Pros: Available everywhere, scriptable Cons: Verbose, requires manual token managementOption 2: HTTPie (Command Line - User Friendly)
Pros: Simpler syntax, better output Cons: Requires installationOption 3: Postman (GUI)
Pros: Visual interface, request collections Cons: Requires download- Download Postman
- Create a new request
- Set method (GET, POST, etc.)
- Enter URL:
https://your-server/api/pipelines/ - Add header:
Authorization: Bearer YOUR_TOKEN - Add JSON body for POST/PATCH
- Click “Send”
Option 4: Python Script
Pros: Full programming capabilities Cons: Requires Python knowledge📚 API Reference
Authentication Endpoints
| Method | Endpoint | Description |
|---|---|---|
POST | /auth/register | Create a new user account |
POST | /auth/jwt/login | Login and get access token |
POST | /auth/jwt/logout | Logout (invalidate token) |
POST | /auth/forgot-password | Request password reset |
POST | /auth/reset-password | Reset password with token |
User Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /users | List all users (paginated) |
GET | /users/{id} | Get user details |
PATCH | /users/{id} | Update user |
DELETE | /users/{id} | Delete user |
POST | /users/me/change-password | Change current user’s password |
POST | /users/me/service-account-token | Create service account token |
GET | /users/export-all | Export all data (admin only) |
POST | /users/import-all | Import all data (admin only) |
Environment Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /envs/ | List all environments (paginated) |
POST | /envs/ | Create a new environment |
GET | /envs/{id} | Get environment details |
GET | /envs/filter-one?name={name} | Get environment by name |
PATCH | /envs/{id} | Update environment |
DELETE | /envs/{id} | Delete environment |
POST | /envs/check-byok-credentials | Check BYOK credentials |
Team Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /teams/ | List all teams (paginated) |
POST | /teams/ | Create a new team |
GET | /teams/{id} | Get team details |
GET | /teams/filter-one?name={name} | Get team by name |
PATCH | /teams/{id} | Update team |
DELETE | /teams/{id} | Delete team |
Team Member Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /teams/{id}/members | List team members |
POST | /teams/{id}/members/bulk | Add multiple members |
DELETE | /teams/{id}/members/{user_id} | Remove a member |
Pipeline Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /pipelines/ | List all pipelines (paginated) |
GET | /pipelines/count-status | Count pipelines by status |
POST | /pipelines/ | Create a new pipeline |
GET | /pipelines/{id} | Get pipeline details |
GET | /pipelines/filter-one?name={name} | Get pipeline by name |
PATCH | /pipelines/{id} | Update pipeline |
DELETE | /pipelines/{id} | Delete pipeline |
POST | /pipelines/{id}/start | Start pipeline (202) |
POST | /pipelines/{id}/pause | Pause pipeline (202) |
WebSocket | /pipelines/{id}/logs | Stream pipeline logs |
GET | /pipelines/{id}/status | Get pipeline status (deprecated) |
GET | /pipelines/{id}/configuration | Export pipeline config |
POST | /pipelines/{id}/configuration | Import pipeline config |
GET | /pipelines/{id}/worker-config | Get worker config |
GET | /pipelines/status/all-non-draft | List non-draft pipelines with status |
GET | /pipelines/{id}/latency-informations | Get pipeline latency info |
Connector Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /connectors/source-config | List source configurations |
GET | /connectors/target-config | List target configurations |
GET | /connectors/filter-one?name={name} | Get connector by name |
GET | /connectors/{id} | Get connector by ID |
GET | /connectors/{id}/source-worker-config | Get source worker config |
GET | /connectors/{id}/target-worker-config | Get target worker config |
POST | /connectors/ | Create connector |
PATCH | /connectors/{id} | Update connector |
DELETE | /connectors/{id} | Delete connector |
POST | /connectors/{id}/start | Start connector worker (202) |
POST | /connectors/{id}/stop | Stop connector worker (202) |
WebSocket | /connectors/{id}/logs | Stream connector logs |
Subscription Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /subscriptions/ | List subscriptions (paginated) |
GET | /subscriptions/{id} | Get subscription details |
POST | /subscriptions/ | Create subscription |
POST | /subscriptions/subscribe-all | Bulk create subscriptions |
PATCH | /subscriptions/{id} | Update subscription |
DELETE | /subscriptions/{id} | Delete subscription |
POST | /subscriptions/{id}/start | Start subscription (202) |
POST | /subscriptions/{id}/pause | Pause subscription (202) |
DataModel Endpoints
| Method | Endpoint | Description |
|---|---|---|
GET | /datamodels/ | List datamodels (paginated) |
GET | /datamodels/{id} | Get datamodel by ID |
GET | /datamodels/{id}/monitoring | Get datamodel monitoring |
GET | /datamodels/{id}/schema | Get datamodel schema |
PATCH | /datamodels/{id}/error-table | Update error table config |
DELETE | /datamodels/{id} | Delete datamodel |
SMT/Transformation Endpoints
| Method | Endpoint | Description |
|---|---|---|
POST | /smt/process_mapper | Process mapper transformation |
Connector Type Specific Endpoints
Kafka Source
| Method | Endpoint | Description |
|---|---|---|
POST | /connector-types/kafka-source/check-credentials | Check Kafka source credentials |
POST | /connector-types/kafka-source/fetch-messages | Fetch messages from Kafka |
POST | /connector-types/kafka-source/list-topics | List Kafka topics |
Oracle Target
| Method | Endpoint | Description |
|---|---|---|
POST | /connector-types/oracle-target/check-credentials | Check Oracle target credentials |
🔧 Connector Configurations
KAFKA_SOURCE Configuration
| Field | Type | Required | Description |
|---|---|---|---|
topic | string | ✅ | Kafka topic to consume from |
bootstrap_servers | string | ✅ | Comma-separated list of Kafka brokers |
consumer_group_id | string | ⬜ | Consumer group ID (auto-generated if not provided) |
security_protocol | string | ⬜ | PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL |
sasl_mechanism | string | ⬜ | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER |
sasl_username | string | ⬜ | SASL username (required if using SASL) |
sasl_password | string | ⬜ | SASL password (required if using SASL) |
include_metadata | boolean | ⬜ | Include Kafka metadata in messages (default: false) |
KAFKA_TARGET Configuration
| Field | Type | Required | Description |
|---|---|---|---|
bootstrap_server | string | ✅ | Kafka broker address |
security_protocol | string | ⬜ | PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL |
sasl_mechanism | string | ⬜ | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER |
sasl_username | string | ⬜ | SASL username |
sasl_password | string | ⬜ | SASL password |
ca_cert | string | ⬜ | CA certificate for SSL |
group_id | string | ⬜ | Consumer group ID |
ORACLE_TARGET Configuration
| Field | Type | Required | Description |
|---|---|---|---|
database | string | ✅ | Oracle database name |
user | string | ✅ | Username for Oracle |
password | string | ✅ | Password for Oracle |
host | string | ✅ | Hostname or IP address |
port | string | ✅ | Port number (typically 1521) |
server_name | string | ✅ | Service name (e.g., XE) |
server_id | string | ✅ | Server identifier/SID |
Mapper Column Config :
| Field | Type | Description |
|---|---|---|
key | string | Column name in target |
path | string | JSON path to extract value (e.g., $.field.nested) |
static | string | Static value (alternative to path) |
cast | string | Cast to: string, int, float, bool, date, datetime, time |
cast_format | string | Format for casting (e.g., date format) |
primary_key | boolean | Is this a primary key column |
nullable | boolean | Can this column be null |
🎯 Common Patterns
Pattern 1: List Resources with Filters
Pattern 2: Pagination
Pattern 3: Error Handling
Pattern 4: Bulk Operations
Pattern 5: Test Connector Credentials Before Creating
🐛 Troubleshooting
Issue: “401 Unauthorized”
Cause: Token is missing, expired, or invalid Solution:- Check that you included the
Authorizationheader - Verify the token format:
Bearer YOUR_TOKEN - Log in again to get a fresh token
Issue: “403 Forbidden”
Cause: You don’t have permission for this operation Solution:- Verify you’re a member of the team
- Check if you have the required role (owner vs member)
- Contact the team owner to grant permissions
Issue: “422 Validation Error”
Cause: Request data doesn’t meet validation requirements Solution:- Check the error response for specific field errors
- Verify all required fields are provided
- Ensure data types match (UUID, string, boolean, etc.)
Issue: Invalid pipeline name
Cause: Pipeline name contains invalid characters Solution: Pipeline names must:- Contain only alphanumeric characters, hyphens (
-), and underscores (_) - Be maximum 255 characters long
- Not be empty
kafka-to-oracle-pipelineuser_events_pipeline_v2Pipeline123
pipeline name(contains space)pipeline.name(contains dot)pipeline@name(contains special character)
Issue: Cannot specify both existing and new connector
Cause: You provided bothexisting_source_id AND source_name/type/config
Solution: Choose ONE approach:
Option A (Existing):
Issue: Pipeline stuck in “BUILDING” state
Cause: Deployment failed or is taking longer than expected Solution:- Check pipeline logs via WebSocket
- Verify connector configurations are correct
- Check infrastructure resources (CPU, memory)
Issue: SSL Certificate Error
Cause: Self-signed certificate or untrusted CA Solution for curl:📝 Complete Example Script
Here’s a complete Python script that performs all steps:🎓 Next Steps
Now that you understand the basics:- Explore the API: Try listing resources, filtering, pagination
- Monitor Pipelines: Use logs and status endpoints
- Handle Errors: Implement proper error handling
- Test Credentials: Use credential check endpoints before creating connectors
- Optimize: Reuse connectors, batch operations
- Automate: Create scripts or CI/CD pipelines
📞 Support
- Documentation: This guide
- API Schema:
https://your-server/api/docs(Swagger UI) - OpenAPI Spec:
https://your-server/api/openapi.json
Happy Data Processing! 🚀