Prerequisites
A Snowflake account grantedACCOUNTADMIN system-defined role or custom role with privileges to:
CREATE WAREHOUSE, DATABASE, SCHEMACREATE ROLE, USERCREATE NETWORK POLICY
Snowflake Setup
It’s recommended to create a separate user and role for Streamkap to access your Snowflake database. Below is an example script that does that.We do not use
CREATE OR REPLACE in our scripts. This is to avoid destroying something by mistake that already exists in your Snowflake account.Key Pair Authentication
The connector relies on an RSA key pair for authentication which you can generate using OpenSSH. Below are example scripts that do that. You can modify them to suit your security policies, but please ensure the key pair meets these minimum requirements:- RSA 2048-bit
- PKCS#8 key format
SSH key generation on WindowsSnowflake does not support keys generated by PuTTY Key Generator.One of the easiest and quickest ways to generate a valid OpenSSL key is via Git Bash which is installed by default with Git for Windows. After installation, you can open a Git Bash prompt by Left Shift + Right Clicking on your Desktop, choosing “Open Git Bash here” and then executing the OpenSSL commands below.If you have any issues following these instructions or are unable to install Git for Windows, please contact us.
.p8) and the other public (usually has the extension .pub). Store both files in a secure place.
Once generated, the public key needs to be assigned to the Snowflake database user created for Streamkap earlier.
This command will copy the public key you generated to your clipboard.
Streamkap Setup
Follow these steps to configure your new connector:1. Create the Destination
- Navigate to Add Connectors.
- Choose Snowflake.
2. Connection Settings
- Name: Enter a name for your connector.
-
Snowflake URL: The URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (
https://) and port number are optional. - Username: User login name for the Snowflake account (Case sensitive).
-
Private Key: Provide the private key you generated by using the command below.
- Key secured with passphrase?: If checked (default), provide your SSH key’s passphrase, otherwise, uncheck for SSH keys without passphrase.
- Private Key Passphrase: The passphrase is used to decrypt the private key.
- Key secured with passphrase?: If checked (default), provide your SSH key’s passphrase, otherwise, uncheck for SSH keys without passphrase.
- Database Name: The name of the database to use (Case sensitive).
- Schema Name: The name of the schema where tables will be created (Case sensitive).
- Snowflake Role: The name of an existing role with necessary privileges (for Streamkap) assigned to the user specified by Username (Case sensitive).
3. Ingestion Settings
-
Ingestion Mode: How the Connector loads data into the Snowflake tables. See Upsert mode for further details.
Changing ingestion mode
appendandupsertmodes use different, incompatible methods for loading data into the Snowflake tables. If - for whatever reason - you want to change modes for an existing Snowflake Connector, please create a new Snowflake Destination instead i.e. a separate destination forappend, and forupsert.-
appendmode:- Use Dynamic Tables: Specifies whether the connector should create Dynamic Tables & Cleanup Tasks. See Dynamic Tables.
- Custom SQL Template - Dynamic Table Creation: These template queries run for each table the first time a record is streamed for them.
- Custom SQL Template - Dynamic Table Name: Can be used as
{{dynamicTableName}}in dynamic table creation SQL. It can use input JSON data for more complex mappings and logic. - Custom SQL Template - Input JSON data: Use
{"TABLE_DATA": {"{table_name}": {"{key}": "{value}"}, ...}, ...}to set table specific data. This data will be available in the custom SQL templates e.g.SELECT {{key}}. - Auto QA Deduplication Table Mapping: Mapping between the tables that store append-only data and the deduplicated tables. The dedupeTable in mapping will be used for QA scripts. If dedupeSchema is not specified, the deduplicated table will be created in the same schema as the raw table.
- Use Dynamic Tables: Specifies whether the connector should create Dynamic Tables & Cleanup Tasks. See Dynamic Tables.
-
upsertmode:- Delete Mode: Specifies whether the connector processes deletions (or tombstone events) and removes the corresponding row from the database.
- Use Hybrid Tables: Specifies whether the connector should create Hybrid Tables.
-
Troubleshooting
Dynamic Tables
Snowflake Dynamic Tables are materialized views which consist of the latest records inserted into Snowflake. Streamkap’s Snowflake Connector creates them—if enabled—for each table the first time a record is streamed for them. A Snowflake Task is also created for each dynamic table to clean up older entries periodically. Below is the default template—shown in the Streamkap UI. You can modify it there to suit your requirements.Offset Management (Append Mode)
Streamkap retains topic data based on your service’s retention policy (typically 7 days by default). You can only replay messages that are still within the retention window.
Understanding Dual Offset Tracking
When using append mode with Snowflake destinations, there are two, separate offset systems to manage:-
Consumer Group Offsets (
connect-{connector-id})- Tracks which messages the connector has consumed from the source topic
- Visible in Streamkap UI under Consumer Groups
-
Snowflake Channel Offsets
- Tracks which messages have been successfully ingested into Snowflake via Snowpipe Streaming
- Each topic partition creates a Snowflake channel (e.g.,
TOPIC_0for partition 0) - Managed within Snowflake using
SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN
Replaying Messages to Snowflake
To replay messages (resend data to Snowflake), you must coordinate resets across both offset systems: Before you start:- Ensure you have the required Snowflake privileges (e.g., OWNER on the table)
- Note the offset position you want to replay to (earliest, specific offset, or timestamp)
- Plan for the replay window within your topic retention period
- Go to Consumer Groups
- Find the consumer group for the destination connector (e.g.
connect-{connector-id}) - Follow the Consumer Groups Reset Procedure to reset to your desired position
-1. This tells Snowflake to defer to the Consumer Group offset position.
- Connector status shows as active/running
- Consumer lag decreases as messages are re-ingested
- Data appears in the destination table
Offset Reset Strategies
The offset position is controlled by the Consumer Group. When resetting offsets, choose one of the following strategies in Streamkap UI (see Consumer Groups Reset Procedure):| Strategy | Description | Use Case |
|---|---|---|
| Earliest | Reset to the beginning of the partition | Replay all available messages within retention window |
| Latest | Reset to the end of the partition | Skip all existing messages and start fresh |
| Specific Timestamp | Reset to the first offset after a given timestamp | Replay messages from a specific point in time |
| Specific Offset | Set a custom offset position | Precise control over where to resume |
-1 so it defers to the Consumer Group position:
Upsert mode
Snowflake destination connector can run in upsert mode. This mode switches off the use of snowpipe streaming and connector uses periodicMERGE INTO statements to upsert data into target snowflake tables. Dynamic tables or other de-duplication mechanisms will not be necessary when using upsert mode.
Snowflake costsCurrently upsert mode requires a warehouse to be running so overall the costs will be higher compared to append mode which uses snowpipe streaming.
Getting the Snowflake URL
You can also run the script below in a Snowflake worksheet to return the Snowflake URL. You need to be logged into Snowflake with an account grantedORGADMIN system-defined role to run this script.
Snowflake Setup scripts failing
There can be many reasons for them to fail, but the scripts below can help you diagnose the issues. You need to be logged into Snowflake with an account grantedACCOUNTADMIN system-defined role or custom role with equivalent privileges to run these scripts.
Copy paste the scripts below into Snowflake worksheets. Change the object names at the top as required and run all queries.
- Check in the top right corner (next to Share and Run buttons) of the Snowflake Worksheets that the role is set to
ACCOUNTADMIN, or a custom role with equivalent privileges - Depending on which query failed or returned no results, check the object names at the top of the script are correct
- If a query returns
"Object does not exist or is not authorized"error, go to the Snowsight UI Admin page and see if the object is showing there. For example, ifDESC WAREHOUSE ...failed, go to Admin -> Warehouses page and check if the Warehouse is shown on that page
Troubleshooting Common Issues
503 / NullPointerException errors
503 / NullPointerException errors
Transient Snowflake Streaming API overload. Do NOT restart the connector — the built-in retry mechanism with exponential backoff handles recovery automatically, typically within 5-30 minutes.If the error persists beyond 30 minutes, contact Streamkap support.
Records going to DLQ (16 MB limit)
Records going to DLQ (16 MB limit)
Snowflake has a maximum record size of 16 MB. Records exceeding this limit are routed to the dead letter queue (DLQ).Resolution:
- Identify the oversized columns in the DLQ message payload
- Exclude large columns from replication if they are not needed at the destination
- Add a transform to truncate large fields before they reach the destination
VARCHAR overflow — column value exceeds size
VARCHAR overflow — column value exceeds size
When a source column value exceeds the VARCHAR size defined at the destination, the record is routed to the DLQ.Resolution:
- Increase the column size at the destination:
ALTER TABLE ... ALTER COLUMN ... TYPE VARCHAR(n) - Configure a transform (SMT) to truncate values before delivery if increasing the column size is not feasible
Schema Evolution Permissions
Streamkap’s Snowflake connector supports schema evolution — automatically adding new columns to destination tables when the source schema changes. For this to work, the Snowflake role used by the connector must have the correct privileges on the target tables.Related Documentation
- Ingestion Modes - Understand insert vs. upsert ingestion modes
- Delete Handling - Configure how delete events are processed at the destination