Kafka Direct is for pulling data into Streamkap from an external Kafka cluster. If you need to push data to Streamkap’s internal Kafka, see Kafka (Writing). If you need to replicate topics from Streamkap to another Kafka cluster, see Kafka (Push).
Prerequisites
- An external Kafka cluster accessible from Streamkap
- Kafka bootstrap server URLs (external listener addresses, not internal) e.g.
broker-1.mycompany.com:9094,broker-2.mycompany.com:9094 - The names of the Kafka topics you want to consume
- Configure one of the Connection Options to ensure Streamkap can reach your Kafka cluster
Supported Platforms
Kafka Direct works with any Kafka-compatible cluster, including:| Platform | Notes |
|---|---|
| Apache Kafka | Open-source Apache Kafka clusters (self-hosted or managed) |
| Confluent Cloud / Confluent Platform | Fully compatible; use SASL_SSL authentication with API key and secret |
| Amazon MSK | Amazon Managed Streaming for Apache Kafka; supports IAM and SASL_SSL authentication |
| Azure Event Hubs (Kafka protocol) | Azure Event Hubs exposes a Kafka-compatible endpoint; use the Event Hubs connection string as the SASL password |
| Redpanda | Kafka API-compatible streaming platform; works with standard Kafka authentication methods |
Authentication
Kafka Direct supports several authentication mechanisms to connect securely to your external Kafka cluster. Configure authentication in the connector settings when creating or editing your Kafka Direct source.SASL_PLAIN
Username and password authentication transmitted in plaintext. Suitable when the connection is already secured by a private network or VPN.- Security protocol:
SASL_PLAINTEXT - SASL mechanism:
PLAIN - Provide the username and password in the connector settings
SASL_SSL
Username and password authentication over a TLS-encrypted connection. This is the recommended authentication method for most managed Kafka services (Confluent Cloud, Amazon MSK, etc.).- Security protocol:
SASL_SSL - SASL mechanism:
PLAIN,SCRAM-SHA-256, orSCRAM-SHA-512depending on your cluster configuration - Provide the username and password in the connector settings
SSL / mTLS (Mutual TLS)
Client certificate authentication where both the client and broker verify each other’s identity using TLS certificates. Use this when your Kafka cluster requires certificate-based authentication.- Security protocol:
SSL - Provide the client certificate, client key, and CA certificate in the connector settings
For detailed information on network connectivity options (SSH tunnels, VPN, AWS PrivateLink, IP allowlisting), see Connection Options.
Key Differences from CDC Sources
Kafka Direct behaves differently from CDC-based sources (such as PostgreSQL, MySQL, or MongoDB):- No transaction log — Kafka Direct reads messages directly from Kafka topics rather than tailing a database transaction log.
- No snapshots — There is no initial snapshot or backfill mechanism. Consumption starts based on the configured offset policy.
- No heartbeats — Heartbeat monitoring is not applicable to Kafka Direct sources.
- Topic naming — Kafka Direct uses the user-defined
topic.prefixvalue (not thesource_{id}prefix used by CDC sources).
Streamkap Setup
Follow these steps to configure your new connector:1. Create the Source
- Navigate to Add Connectors.
- Choose Kafka Direct.
2. Connection Settings
- Name: Enter a unique and memorable name for your connector.
-
Kafka Bootstrap Servers: A comma-separated list of
host:portpairs for the external Kafka brokers, such asbroker1.kafka.company.com:9092,broker2.kafka.company.com:9092. - Topic Prefix: A prefix applied to topic names inside Streamkap. This helps organise and identify topics originating from this source within your Streamkap pipelines.
3. Topic Selection
- Topics to include: A comma-separated list of the external Kafka topic names you want to consume, such as
orders,events,user-activity.
4. Format Settings
-
Format: The message format of the external Kafka topics.
- string (default) — Messages are treated as plain strings.
- json — Messages are parsed as JSON.
- Schemas Enable (only when Format is set to json): Toggle this to enable schema support. When enabled, Streamkap uses its internal Schema Registry to validate and enforce message structure.
Enabling schema support is recommended when your JSON messages have a consistent structure. Streamkap’s internal Schema Registry handles schema management automatically — you do not need to provide an external Schema Registry URL.
Consumer Group Configuration
Streamkap manages consumer groups automatically for Kafka Direct sources. Each Kafka Direct connector uses a dedicated consumer group to track its position (offset) in each topic partition.The consumer group ID is managed by Streamkap and is tied to the connector instance. If you delete and recreate a Kafka Direct source, a new consumer group is created and consumption restarts according to the configured offset policy. If you need to reset offsets for an existing connector, contact Streamkap support.
- Offset policy: When the connector starts for the first time (or when the consumer group has no committed offsets), it uses the configured offset policy to determine where to begin reading. Typical options are
earliest(read from the beginning of the topic) orlatest(read only new messages). - Parallel consumption: Streamkap assigns consumer group members to topic partitions automatically. The level of parallelism is determined by the number of partitions in the source topic.
- Rebalancing: If partitions are added to a source topic, the consumer group rebalances automatically to include the new partitions.
Adding Topics Programmatically
You can add topics to an existing Kafka Direct source via the API or Terraform using thetopic.include.list.user.defined parameter (Terraform: topic_include_list_user_defined).
This lets you append topics to the source without modifying the original topic list configured through the UI.
Troubleshooting
Connection refused or timeout
Connection refused or timeout
Symptoms: The connector fails to start or reports connection errors referencing broker addresses.Possible Causes:
- Firewall rules are blocking traffic from Streamkap to the Kafka brokers
- The broker address or port is incorrect (e.g., using an internal DNS name instead of an external listener)
- The Kafka broker port is not open or the broker is not running
- Verify that the bootstrap server addresses use externally reachable hostnames and the correct listener port
- Ensure your firewall or security group allows inbound connections from Streamkap’s IP addresses (see Connection Options)
- Test connectivity to the broker address and port from a machine on the same network as Streamkap
Authentication failure
Authentication failure
Symptoms: The connector fails with SASL authentication errors or SSL handshake failures.Possible Causes:
- Incorrect username or password
- Wrong SASL mechanism selected (e.g., using
PLAINwhen the broker expectsSCRAM-SHA-256) - Expired or invalid client certificates (for mTLS)
- The security protocol does not match the broker’s listener configuration
- Verify that the username, password, and SASL mechanism match the broker configuration
- For mTLS, ensure the client certificate and key are valid and not expired
- Confirm the security protocol (
SASL_PLAINTEXT,SASL_SSL, orSSL) matches the Kafka listener you are connecting to
No messages received
No messages received
Symptoms: The connector is running but no data appears in the pipeline or destination.Possible Causes:
- The topic name is incorrect or misspelled
- The topic has no new messages and the offset policy is set to
latest - The consumer group has already consumed all available messages
- The topic has no partitions with data
- Verify the topic names in the connector configuration match the exact topic names on the Kafka cluster
- Check the offset policy — if set to
latest, only new messages produced after the connector started will be consumed - Confirm that the source Kafka cluster is actively producing messages to the configured topics
- Inspect the topic partitions on the source cluster to verify they contain data
Schema or format errors
Schema or format errors
Symptoms: Messages are routed to the DLQ or the connector reports deserialization errors.Possible Causes:
- The selected format (string or JSON) does not match the actual message format on the topic
- Schema support is enabled but messages do not conform to a consistent JSON structure
- Messages contain unsupported or malformed data
- Verify that the Format setting matches the actual format of messages on the source topic
- If using JSON with schema support enabled, ensure all messages on the topic share a consistent structure
- Inspect a sample of messages on the source topic to confirm the format before configuring the connector
- Check the DLQ for detailed error information on failed messages