How to stream Kafka topics to ClickHouse® in real-time
Apache Kafka excels at moving high-volume event streams between systems, while ClickHouse excels at analyzing those events with sub-second query latency. Connecting the two creates a real-time analytics pipeline that can ingest millions of events per second and make that data immediately queryable.
That said, handling streaming ingestion into ClickHouse requires some expertise: you have to balance throughput with the ClickHouse merge process and parts management as well as how writes impact query performance.
This guide covers three integration patterns for streaming Kafka topics into ClickHouse, from setup and configuration through production optimization and security. You'll learn when to use the native Kafka table engine versus Kafka Connect versus managed services, how to handle different message formats, and how to troubleshoot common issues that affect throughput and latency.
Choosing the right Kafka to ClickHouse integration pattern
Three main approaches exist for streaming Kafka topics into ClickHouse: the Kafka table engine, the official Kafka Connect sink connector, and managed services like Tinybird. Each pattern offers different tradeoffs in setup complexity, operational overhead, and developer experience.
The native Kafka table engine runs directly inside ClickHouse and automatically consumes messages from specified topics. This approach requires no external components but means your ClickHouse cluster handles both ingestion and query workloads. The Kafka Connect sink connector runs as a separate service and writes data into ClickHouse tables, separating ingestion from query processing but adding another component to manage. Managed services like Tinybird abstract away infrastructure entirely, handling cluster management, ingestion load balancing and back pressure, compute scaling, and monitoring while providing features like API generation and CI/CD integration.
Integration Pattern | Setup Complexity | Operational Overhead | Best For |
---|---|---|---|
Native Kafka Engine | Medium (SQL configuration) | High (cluster tuning, monitoring) | Self-hosted deployments with existing ClickHouse expertise |
Kafka Connect Sink | High (connector deployment, configuration) | High (separate service management) | Enterprise data pipelines with existing Kafka Connect infrastructure |
Managed Service (Tinybird) | Low (CLI commands, declarative config) | Low (fully managed) | Application developers who want to ship features quickly |
Prerequisites for a successful Kafka to ClickHouse integration
Before connecting Kafka to ClickHouse, you'll need access to a running Kafka cluster and either a ClickHouse server or a managed ClickHouse service. A few key Kafka concepts help with configuration and troubleshooting.
Kafka cluster requirements
Your Kafka cluster runs on version 0.10 or later for compatibility with ClickHouse integrations. You'll need the broker addresses (host and port), the topic names you want to consume, and authentication details if your cluster requires SASL or SSL.
ClickHouse server setup
ClickHouse version 20.3 or later supports the Kafka table engine and can parse common message formats like JSON, Avro, CSV, and Protobuf. For Kafka Connect, any recent ClickHouse version works since the connector writes data using standard SQL inserts.
Network and security considerations
The ClickHouse server or Kafka Connect workers need network access to your Kafka brokers, typically on port 9092 for plaintext or 9093 for SSL. If your Kafka cluster uses authentication, you'll need credentials ready before starting configuration.
Step-by-step streaming with the native Kafka table engine
The native Kafka table engine in ClickHouse creates a direct connection between a Kafka topic and a ClickHouse table. Messages consumed from Kafka flow through this engine table into a materialized view, which transforms and stores the data permanently.
1. Create the Kafka engine table
CREATE TABLE kafka_queue (
user_id String,
event_type String,
timestamp DateTime,
properties String
)
ENGINE = Kafka('localhost:9092', 'events', 'clickhouse_group', 'JSONEachRow');
The JSONEachRow
format expects one JSON object per line. Other formats like Avro
or Protobuf
work if you specify the appropriate format name and schema, with production deployments achieving 100,000 rows per second throughput.
2. Build a materialized view to a MergeTree table
CREATE TABLE events_storage (
user_id String,
event_type String,
timestamp DateTime,
properties String
)
ENGINE = MergeTree()
ORDER BY (user_id, timestamp);
CREATE MATERIALIZED VIEW events_mv TO events_storage AS
SELECT
user_id,
event_type,
timestamp,
properties
FROM kafka_queue;
Once created, the materialized view automatically processes new messages as they arrive. You can query events_storage
like any other ClickHouse table.
3. Verify offsets and commit strategy
ClickHouse commits Kafka offsets after successfully writing each batch to the target table. You can check consumer progress by querying system tables:
SELECT * FROM system.kafka_consumers;
The kafka_num_consumers
setting controls parallelism, while kafka_max_block_size
determines batch sizes.
Step-by-step streaming with the Kafka Connect sink
The official ClickHouse Kafka Connect sink connector runs as part of a Kafka Connect cluster and writes data from Kafka topics into ClickHouse tables. This approach works well when you already have Kafka Connect infrastructure or want to separate ingestion from query workloads.
1. Install the connector
Download the ClickHouse Kafka Connect JAR file from the official GitHub releases and place it in your Kafka Connect plugins directory. After restarting Kafka Connect, the connector appears in the available plugins list.
2. Configure topic-table mapping
{
"name": "clickhouse-sink",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "1",
"topics": "events",
"hostname": "localhost",
"port": "8123",
"database": "default",
"username": "default",
"password": "",
"ssl": "false",
"auto.create.tables": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
3. Launch and validate the task
Submit the connector configuration to your Kafka Connect cluster using the REST API:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @clickhouse-sink-config.json
Check the connector status to confirm it's running and consuming messages:
curl http://localhost:8083/connectors/clickhouse-sink/status
The connector batches messages and inserts them into ClickHouse at regular intervals.
Step-by-step streaming with a managed service like Tinybird
Tinybird is a managed ClickHouse platform that simplifies Kafka integration by handling infrastructure, providing built-in connectors for Kafka-compatible services (including Confluent, Redpanda, and Estuary), and generating APIs from your SQL queries. This approach works well for application developers who want to integrate real-time analytics without managing clusters.
Before you start: Consider HTTP streaming
If you're not yet committed to Kafka for streaming, you might consider Tinybird's Events API. It's a lightweight HTTP endpoint that accepts JSON payloads at up to 1,000 RPS, with support for microbatching to allow ingestion rates of up to 100k+ events per second. Many developers use the Events API to stream directly from the application backend (or client) without needing to set up additional streaming infrastructure.
Here's a good demo of the Events API in action:
1. Create a Tinybird workspace and data source
Install the Tinybird CLI and authenticate to your workspace:
curl -L tinybird.co | sh
tb login
Create a data source that connects to your Kafka cluster using a declarative .datasource
file format:
SCHEMA >
`user_id` String,
`event_type` String,
`timestamp` DateTime,
`properties` String
ENGINE "MergeTree"
ENGINE_SORTING_KEY "user_id, timestamp"
KAFKA_CONNECTION_NAME my_kafka_connection
KAFKA_TOPIC events
KAFKA_GROUP_ID tinybird_events_consumer
KAFKA_AUTO_OFFSET_RESET earliest
2. Use the Tinybird Kafka connector token
Tinybird manages Kafka authentication through connection objects that you create once and reference in multiple data sources. Create a connection in the Tinybird UI or CLI with your broker addresses and credentials, then reference it by name in your .datasource
file using KAFKA_CONNECTION_NAME
. This approach keeps credentials out of your data source definitions.
More info in the Tinybird Kafka Connector docs
3. Query and publish a low-latency API
Once data flows into your Tinybird data source, you can query it using SQL and publish the query as an API endpoint. Create a .pipe
file that defines your query logic:
TOKEN read_token READ
NODE events_by_type
SQL >
%
SELECT
event_type,
count() as event_count,
max(timestamp) as last_seen
FROM events
WHERE timestamp > now() - INTERVAL {{Int32(hours, 24)}} HOUR
GROUP BY event_type
ORDER BY event_count DESC
TYPE endpoint
Deploy the pipe to create a hosted API:
tb --cloud deploy
Tinybird generates a URL and authentication token for your API. You can call it from any application and the API runs your SQL query against live data, returning results in milliseconds.
Sample API usage and response
Once deployed, your API endpoint is ready to use. Here's how to call it:
curl "https://api.tinybird.co/v0/pipes/events_by_type.json?hours=24" \
-H "Authorization: Bearer $READ_TOKEN"
The API returns JSON data with your query results:
{
...
"data": [
{
"event_type": "page_view",
"event_count": 15420,
"last_seen": "2025-01-10 14:32:15"
},
{
"event_type": "click",
"event_count": 8932,
"last_seen": "2025-01-10 14:31:58"
},
{
"event_type": "signup",
"event_count": 234,
"last_seen": "2025-01-10 14:30:12"
}
],
"rows": 3,
"statistics": {
"elapsed": 0.001234,
"rows_read": 15420,
"bytes_read": 1234567
}
}
You can also use the API in your application code:
Javascript:
// JavaScript/Node.js example
const response = await fetch('https://api.tinybird.co/v0/pipes/events_by_type.json?hours=24', {
headers: {
'Authorization': 'Bearer YOUR_TOKEN'
}
});
const data = await response.json();
console.log(`Total events: ${data.data.reduce((sum, row) => sum + row.event_count, 0)}`);
Python:
# Python example
import requests
response = requests.get(
'https://api.tinybird.co/v0/pipes/events_by_type.json',
params={'hours': 24},
headers={'Authorization': 'Bearer YOUR_TOKEN'}
)
data = response.json()
print(f"Total events: {sum(row['event_count'] for row in data['data'])}")
Mapping Kafka schemas to ClickHouse tables
Kafka messages come in different formats, and each format requires specific handling when creating ClickHouse tables. The three most common formats are JSON, Avro, and Protobuf.
JSON and nested objects
JSON messages work well with ClickHouse's JSONEachRow
format, which expects one JSON object per line. For nested objects, you can either store them as String columns and parse them in queries, or use ClickHouse's Nested data type:
CREATE TABLE events (
user_id String,
event_type String,
properties Nested(
key String,
value String
)
)
ENGINE = MergeTree()
ORDER BY user_id;
The Nested type creates arrays of keys and values that you can query using array functions.
Avro and Confluent Schema Registry
Avro messages include schema information, which ClickHouse can use to automatically map fields to columns. When using the Kafka table engine with Avro, specify the format as Avro
and provide the schema registry URL:
CREATE TABLE kafka_avro_queue (
user_id String,
event_type String,
timestamp DateTime
)
ENGINE = Kafka('localhost:9092', 'events', 'clickhouse_group', 'Avro')
SETTINGS format_avro_schema_registry_url = 'http://localhost:8081';
ClickHouse fetches the schema from the registry and validates incoming messages against it, a pattern that works particularly well with Confluent streaming analytics.
Protobuf and optional fields
Protocol Buffer messages require a .proto
schema file. ClickHouse can parse Protobuf messages when you specify the format as Protobuf
and provide the schema. Optional fields in Protobuf map to Nullable columns in ClickHouse.
Configuration cheat sheet for high-throughput ingestion
Optimizing Kafka to ClickHouse ingestion involves tuning settings on both the Kafka consumer side and the ClickHouse storage side. The settings below balance throughput, latency, and resource usage.
Batch size and poll interval
The kafka_max_block_size
setting controls how many messages ClickHouse reads from Kafka before writing to storage:
CREATE TABLE kafka_queue (...)
ENGINE = Kafka(...)
SETTINGS kafka_max_block_size = 65536;
Larger values like 65536 improve throughput but increase latency. The kafka_poll_timeout_ms
setting determines how long the consumer waits for new messages.
Insert block size and compression
ClickHouse batches inserts into blocks before writing to disk. The min_insert_block_size_rows
and min_insert_block_size_bytes
settings control when blocks get flushed:
SET min_insert_block_size_rows = 1048576;
SET min_insert_block_size_bytes = 268435456;
Compression reduces storage size and improves query performance by reducing I/O. The ZSTD
codec offers good compression ratios with reasonable CPU overhead.
Parallel consumers and sharding
The kafka_num_consumers
setting controls how many consumer threads ClickHouse spawns per Kafka table:
CREATE TABLE kafka_queue (...)
ENGINE = Kafka(...)
SETTINGS kafka_num_consumers = 4;
More consumers increase parallelism but also increase resource usage. For very high throughput, you can create multiple Kafka engine tables that consume from different partitions of the same topic.
Monitoring, alerting, and troubleshooting lag
Production Kafka to ClickHouse pipelines need observability to catch issues before they affect users. Consumer lag is the primary metric to watch.
Tracking offsets and consumer lag
Consumer lag measures how far behind the consumer is from the latest message in each Kafka partition. High lag indicates the consumer can't keep up with the message rate, though properly tuned pipelines can handle 48+ billion records per day. For the native Kafka engine, query the system tables:
SELECT
database,
table,
partition_id,
current_offset,
lag
FROM system.kafka_consumers;
For Kafka Connect, use the Kafka Connect REST API or monitor the consumer group directly using Kafka tools. Set up alerts when lag exceeds a threshold that matters for your use case.
Common error messages and fixes
Connection errors like "Connection refused" or "Unknown host" indicate network issues or incorrect broker addresses. Verify the broker addresses and check firewall rules.
Authentication errors like "SASL authentication failed" mean your credentials are incorrect or your Kafka cluster requires different authentication settings. Parse errors like "Cannot parse input" suggest a mismatch between your message format and the format specified in ClickHouse.
Observability dashboards examples
Effective dashboards track both Kafka metrics and ClickHouse metrics. Key metrics to monitor include:
- Consumer lag by partition: Identifies if specific partitions fall behind
- Messages consumed per second: Shows ingestion throughput
- Insert queries per second: Indicates write load on ClickHouse
- Failed inserts: Catches data quality or schema issues
- Query latency p95: Measures user-facing performance Tools like Grafana work well for visualizing metrics when connected to Prometheus or other monitoring backends.
Securing the pipeline end-to-end
Production data pipelines handle sensitive information and need proper security controls. Both Kafka and ClickHouse offer authentication and encryption features.
TLS and SASL configuration
Enable TLS to encrypt data in transit between Kafka and ClickHouse. For the native Kafka engine, add SSL settings to your table definition:
CREATE TABLE kafka_queue (...)
ENGINE = Kafka(...)
SETTINGS kafka_security_protocol = 'SSL',
kafka_ssl_ca_location = '/path/to/ca-cert',
kafka_ssl_certificate_location = '/path/to/client-cert',
kafka_ssl_key_location = '/path/to/client-key';
SASL authentication requires additional settings like kafka_sasl_mechanism
and kafka_sasl_username
.
RBAC in ClickHouse and Tinybird
ClickHouse supports role-based access control to limit which users can read or write specific tables:
CREATE ROLE kafka_writer;
GRANT INSERT ON events TO kafka_writer;
CREATE USER kafka_user IDENTIFIED BY 'password';
GRANT kafka_writer TO kafka_user;
Tinybird handles RBAC through workspace permissions and API tokens. You can create read-only tokens for querying data and write tokens for ingestion.
Token management and rotation
API tokens and credentials eventually need rotation for security compliance. When rotating tokens, create new credentials before revoking old ones to avoid downtime. Tinybird simplifies token rotation by allowing you to create new tokens and revoke old ones without redeploying data sources or pipes.
Frequently asked questions about Kafka to ClickHouse streaming
What is the difference between at-least-once and exactly-once delivery when streaming to ClickHouse?
At-least-once delivery means ClickHouse might process the same Kafka message multiple times if a consumer restart happens before offset commits complete. This can create duplicate rows in your tables. Exactly-once delivery requires either idempotent inserts using ReplacingMergeTree or deduplication logic. Most production pipelines use at-least-once delivery with deduplication in queries using DISTINCT
or GROUP BY
.
Can I backfill historical Kafka data while streaming new events?
Yes. Configure your Kafka consumer to start from an earlier offset using kafka_auto_offset_reset = 'earliest'
or manually seek to a specific offset. This lets you reprocess historical messages while new messages continue arriving. Be aware that backfilling can increase consumer lag temporarily.
How do I partition ClickHouse tables when ingesting high-volume Kafka topics?
Partition by time intervals that align with your query patterns and data retention policies. Daily partitions using PARTITION BY toYYYYMMDD(timestamp)
work well for most use cases, letting you drop old partitions efficiently and improving query performance when filtering by date. Hourly partitions make sense for very high-volume streams where you need fine-grained data lifecycle management.
Does column compression affect Kafka consumer throughput in ClickHouse?
Compression adds CPU overhead during ingestion but reduces disk I/O and storage costs. The net effect is usually positive for throughput since modern CPUs compress data faster than disks can write uncompressed data. The LZ4
codec offers good compression with minimal CPU cost, while ZSTD
provides better compression ratios at the cost of more CPU usage.
Build real-time APIs on your Kafka data today
Managed ClickHouse platforms like Tinybird handle the infrastructure complexity of running production Kafka to ClickHouse pipelines. This lets you focus on building features and shipping analytics capabilities rather than tuning consumer lag and managing cluster scaling. Tinybird provides built-in Kafka connectors, automatic API generation from SQL queries, and deployment workflows that integrate with your existing development tools. The platform handles performance optimization, monitoring, and scaling automatically. Sign up for a free Tinybird plan to start streaming Kafka topics into ClickHouse and building real-time APIs in minutes rather than weeks.