Real-time data pipelines need to handle thousands or millions of events per second without falling behind, and ClickHouse's architecture makes this possible through several ingestion methods that balance throughput with query performance. The challenge is not just writing data fast, but doing so in a way that keeps queries responsive and prevents storage overhead from accumulating.
This guide covers the ingestion patterns that work best for streaming workloads, from Kafka engine tables to HTTP inserts, along with practical examples of schema design, monitoring, and deployment using both open-source ClickHouse and managed platforms like Tinybird.
Understanding real-time ingestion patterns in ClickHouse
Streaming ingestion means writing data to ClickHouse continuously as events arrive, rather than collecting batches and loading them every few hours. ClickHouse handles this through several methods: Kafka engine tables consume from message queues, HTTP inserts accept data over REST APIs, and the native TCP protocol maximizes throughput for client libraries.
The main challenge is balancing write frequency with ClickHouse's internal merge operations. Each insert creates a new "part" on disk, and ClickHouse merges these parts in the background to keep queries fast. Too many small inserts create excessive parts, which slows down queries and increases merge overhead.
ClickHouse's columnar storage gives it an edge for analytical queries because it only reads the columns each query actually needs. This makes aggregations and filtering fast, even when ingesting millions of rows per second. However, the architecture also means insert patterns matter more than they would in row-oriented databases.
Choosing the right table engine for streaming data
The MergeTree family offers different engines for different streaming scenarios. Standard MergeTree handles append-only data where rows never change. ReplacingMergeTree deduplicates rows with the same primary key during merges, which helps when the same event arrives multiple times. CollapsingMergeTree and VersionedCollapsingMergeTree handle updates by storing "cancellation" rows that mark previous versions as obsolete.
For most streaming use cases, start with MergeTree unless you have specific requirements. If your stream contains duplicate events due to at-least-once delivery, ReplacingMergeTree cleans them up automatically. When you need to update or delete rows based on new information, VersionedCollapsingMergeTree handles out-of-order updates correctly.
The engine choice affects query performance and storage efficiency:
- MergeTree: Fastest option because it never checks for duplicates or cancellations
- ReplacingMergeTree: Adds overhead during merges to identify and remove duplicates
- CollapsingMergeTree: Requires queries to sum "sign" columns to get current state, adding SQL complexity
MergeTree variants compared
| Engine | Use Case | Deduplication | Update Support | Query Complexity |
|---|---|---|---|---|
| MergeTree | Append-only logs, events | No | No | Low |
| ReplacingMergeTree | Streams with duplicates | Yes (during merges) | Replace only | Low |
| CollapsingMergeTree | Frequent updates/deletes | No | Yes (with sign column) | Medium |
| VersionedCollapsingMergeTree | Out-of-order updates | No | Yes (with version + sign) | High |
Engine selection cheat-sheet
Choose your table engine based on your streaming pattern:
- High-volume append-only streams: MergeTree provides maximum performance when events never change after arrival
- At-least-once delivery with duplicates: ReplacingMergeTree automatically deduplicates during background merges
- Frequent updates to existing rows: CollapsingMergeTree handles changes by storing cancellation rows alongside new data
- Out-of-order updates with versioning: VersionedCollapsingMergeTree maintains consistency when updates arrive in any order
Kafka engine with materialized views for high-throughput ingest
The Kafka engine in ClickHouse creates a special table that acts as a consumer for Kafka topics. This table doesn't store data permanently; instead, it reads messages from Kafka and makes them available for processing. You then use a materialized view to transform and insert the data into a permanent MergeTree table.
This pattern separates ingestion from storage. The Kafka engine table handles connection management, offset tracking, and message parsing. The materialized view contains your transformation logic and writes to the final table. If the transformation fails or you need to change it, you can drop and recreate the materialized view without losing data in Kafka.
End-to-end data flow
Data flows through three stages:
- Kafka topic contains raw events from your application or data pipeline
- Kafka engine table consumes messages and parses them into columns
- Materialized view transforms the data and inserts it into a MergeTree table
The Kafka engine table runs in the background, continuously polling the topic. When new messages arrive, ClickHouse triggers the materialized view, which processes them and writes to the target table.
Schema evolution strategies
When your Kafka message schema changes, you have several options. Adding new fields is straightforward: add the columns to your MergeTree table with default values, then update the materialized view to map the new fields. Existing data uses the defaults, and new inserts populate the actual values.
Removing fields or changing data types requires more care. You can create a new MergeTree table with the updated schema, then switch the materialized view to write to the new table. Keep the old table for historical queries until you've backfilled or no longer need that data.
1. Create Kafka engine table
CREATE TABLE events_kafka (
event_id String,
user_id String,
event_type String,
timestamp DateTime64(3),
properties String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2;
2. Create materialized view to MergeTree
The materialized view transforms data from the Kafka table and writes it to permanent storage:
CREATE TABLE events_final (
event_id String,
user_id String,
event_type String,
timestamp DateTime64(3),
properties String
) ENGINE = MergeTree()
ORDER BY (event_type, timestamp);
CREATE MATERIALIZED VIEW events_mv TO events_final AS
SELECT
event_id,
user_id,
event_type,
timestamp,
properties
FROM events_kafka;
3. Validate inserts in real time
Monitor ingestion progress by querying the target table and system tables:
-- Check recent insert rate
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS events
FROM events_final
WHERE timestamp > now() - INTERVAL 10 MINUTE
GROUP BY minute
ORDER BY minute DESC;
If the consumer lag grows continuously, ingestion isn't keeping up with the stream. You might need to increase kafka_num_consumers or optimize your materialized view query.
Direct HTTP and native inserts at scale
ClickHouse accepts inserts through its HTTP interface on port 8123. You send a POST request with data in formats like JSON, CSV, or Parquet. This works well for applications that can't use the Kafka engine, such as serverless functions or services that generate events directly.
The native TCP protocol on port 9000 provides higher throughput by using ClickHouse's binary format and compression. Client libraries for Python, Go, Java, and other languages use this protocol by default.
Batch vs. row inserts
Inserting one row at a time creates a new part for each insert, which quickly overwhelms ClickHouse's merge system. Instead, batch rows together before inserting. A good starting point is 10,000-100,000 rows per insert, or inserting every 1-5 seconds, whichever comes first.
-- Bad: one row per insert
INSERT INTO events VALUES ('event1', 'user1', 'click', now());
-- Good: batch multiple rows
INSERT INTO events VALUES
('event1', 'user1', 'click', now()),
('event2', 'user2', 'view', now()),
('event3', 'user3', 'purchase', now());
Client libraries often provide automatic batching. For example, the ClickHouse Python client can buffer rows in memory and flush them periodically.
Compression and block size tuning
ClickHouse compresses data automatically using LZ4 by default, achieving up to 30× compression ratios. For streaming inserts over HTTP, you can also compress the request body before sending it. This reduces network transfer time, especially for large batches.
The max_insert_block_size setting controls how many rows ClickHouse processes together during an insert. The default is 1,048,576 rows. For streaming workloads with smaller batches, you can lower this to 10,000-100,000 to reduce memory usage.
Handling late and out-of-order events safely
Streaming systems often deliver events out of order due to network delays, retries, or parallel processing. An event with timestamp 10:00:05 might arrive after an event with timestamp 10:00:10. ClickHouse doesn't enforce ordering during inserts, so you handle this at the schema level.
Late-arriving events can affect aggregations and time-based queries. If you compute hourly metrics and then receive events from a previous hour, those metrics become stale.
VersionedCollapsingMergeTree pattern
VersionedCollapsingMergeTree handles out-of-order updates by using a version number and sign column. When an update arrives, you insert a row with sign = -1 to cancel the previous version, then insert the new version with sign = 1. The version number ensures that even if rows arrive out of order, ClickHouse can correctly determine which version is current.
CREATE TABLE user_state (
user_id String,
state String,
updated_at DateTime64(3),
version UInt64,
sign Int8
) ENGINE = VersionedCollapsingMergeTree(sign, version)
ORDER BY (user_id, updated_at);
Queries against VersionedCollapsingMergeTree tables sum the sign column to get the current state. Rows with sign = -1 cancel out rows with sign = 1, leaving only the latest version.
Deduplication keys and watermarks
For deduplication, ReplacingMergeTree uses the ORDER BY key to identify duplicate rows. When multiple rows have the same key, ClickHouse keeps the last one during merges. However, merges happen asynchronously, so duplicates might exist temporarily.
CREATE TABLE events_dedupe (
event_id String,
user_id String,
timestamp DateTime64(3)
) ENGINE = ReplacingMergeTree()
ORDER BY event_id;
To query deduplicated data immediately, use the FINAL modifier. This forces ClickHouse to deduplicate at query time, but it's slower than regular queries.
Use Tinybird to stream 10k+ EPS to ClickHouse with HTTP
Tinybird provides a managed ClickHouse platform and hosted event streaming endpoint that handles streaming infrastructure scaling and operations. Here's how to stream events into ClickHouse using Tinybird's Events API.
1. Initialize local container
curl -L tinybird.co | sh
tb login
tb local start
The local environment runs Tinybird's version of ClickHouse in Docker and creates a workspace directory for your project.
2. Define data source with JSONpaths
The Events API accepts JSON or NDJSON payloads, so you can define a schema to receive and store events posted to the API.
SCHEMA >
`event_id` String `json:$.event_id`,
`user_id` String `json:$.user_id`,
`event_type` String `json:$.event_type`,
`timestamp` DateTime64(3) `json:$.timestamp`,
`properties` String `json:$.properties`,
`user_agent` String `json:$.properties.user_agent`
ENGINE "MergeTree"
ENGINE_SORTING_KEY "timestamp, user_id"
Tinybird manages load balancing, backpressure, and replica routing to the database. You define the final schema, and Tinybird handles the ingestion plumbing.
3. Write a SQL pipe for transformations
Tinybird pipes are multi-node declaritive transformations written in SQL with optional templating for query parameters and advanced logic.
TOKEN events_api_read READ
NODE events_by_type
SQL >
%
SELECT
event_type,
count() AS event_count
FROM events_source
WHERE timestamp >= now() - INTERVAL {{Int32(hours, 24)}} HOUR
GROUP BY event_type
TYPE endpoint
The {{Int32(hours, 24)}} syntax creates a query parameter with a default value.
4. Deploy to cloud in CI/CD
Deploy your data sources and pipes to Tinybird Cloud, either directly or in a CI/CD pipeline:
tb --cloud deploy
This creates your ClickHouse tables and queries (query lambdas) and publishes your pipes as a REST API endpoint.
The whole process simplifies streaming ingestion into ClickHouse by limiting your infra needs and handling all of the necessary mechanism to ensure high throughput writes to the ClickHouse without impacting reads. Tinybird ensures that parts are written to ClickHouse carefully to avoid TOO_MANY_PARTS errors or backed up merge queues.
Observability for throughput lag and merge debt
Monitoring streaming pipelines requires tracking both ingestion health and ClickHouse internal operations. Key metrics include insert rate, consumer lag, merge activity, and part count.
system.merges and system.parts queries
ClickHouse exposes internal state through system tables. The system.parts table shows all data parts for each table:
-- Check part count per table
SELECT
database,
table,
count() AS part_count,
sum(rows) AS total_rows
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY part_count DESC;
A high part count (more than 100-200 parts per partition) indicates that inserts are too frequent or too small.
Grafana and Tinybird dashboard examples
Key metrics to track:
- Ingestion rate: Rows per second being inserted into each table
- Consumer lag: How far behind the Kafka consumer is from the latest offset
- Part count: Number of active parts per table and partition
- Query latency: P50, P95, and P99 query response times
Tinybird provides built-in observability for pipes and data sources. The Tinybird console shows request rates, latency percentiles, and error rates for each API endpoint.
Benchmarking and tuning for sub-second latency
Performance tuning starts with measurement. Before optimizing, establish baseline metrics for insert throughput and query latency. Common bottlenecks include small insert batches, missing indexes, and inefficient query patterns.
Insert benchmark script
Test insert throughput with a benchmark that measures rows per second (production systems achieve 2 million events per second with proper batching):
import clickhouse_connect
import time
client = clickhouse_connect.get_client(host='localhost')
rows = [('event' + str(i), 'user' + str(i % 1000))
for i in range(100000)]
start = time.time()
client.insert('events', rows)
duration = time.time() - start
print(f"Throughput: {len(rows) / duration:.0f} rows/sec")
Run this benchmark with different batch sizes to find the sweet spot for your workload.
Query profiling with system.trace\_log
ClickHouse can log detailed execution traces for queries. Enable trace logging to see where queries spend time:
SET send_logs_level = 'trace';
SET log_queries = 1;
SELECT event_type, count()
FROM events
WHERE timestamp > now() - INTERVAL 1 HOUR
GROUP BY event_type;
This shows which operations (reading, decompression, aggregation) consume the most resources.
Managed versus self-hosted cost and operations trade-offs
Running ClickHouse yourself gives you full control over configuration and infrastructure. You can optimize for your specific workload and choose exactly which hardware to use. However, this comes with operational overhead: provisioning servers, configuring replication, setting up monitoring, and handling upgrades.
Managed services like Tinybird or ClickHouse Cloud handle infrastructure automatically. You focus on schema design and queries while the platform manages scaling and backups.
Infrastructure overhead checklist
Self-hosting ClickHouse requires handling:
- Cluster management: Provisioning nodes, configuring ZooKeeper for replication
- Monitoring setup: Installing Prometheus, Grafana, and alerting rules
- Security configuration: Setting up authentication, network isolation, encryption
- Backup and recovery: Implementing automated backups, testing restore procedures
- Upgrades and patches: Planning version upgrades, testing compatibility
Each area requires specialized knowledge of ClickHouse internals.
Next steps for production-ready streaming analytics
Building a production streaming pipeline involves several key decisions: choosing between Kafka engine and HTTP inserts, selecting the right table engine, tuning batch sizes, and setting up monitoring. Start with simple patterns like MergeTree tables and HTTP inserts, then add complexity when needed.
Test your pipeline under realistic load before going to production. Generate synthetic data at your expected peak rate and measure query performance.
Sign up for a free Tinybird account to start building streaming pipelines without provisioning servers or configuring clusters. The Tinybird CLI and documentation provide step-by-step guides for creating your first data source and API endpoint.
FAQs about ClickHouse streaming ingestion
How do I migrate a table schema without stopping the stream?
Use ALTER TABLE commands to add columns or modify types while maintaining continuous ingestion. ClickHouse supports online schema changes for most operations, though some changes like modifying the ORDER BY key require recreating the table.
What happens to poison messages that fail parsing in Kafka engine?
ClickHouse skips malformed messages and logs errors to system tables, allowing the stream to continue processing valid messages. You can query system.kafka_consumers to see error counts and adjust your parsing logic.
How can I secure ingest endpoints with row-level policies?
Configure row-level security policies using CREATE ROW POLICY to restrict data access based on user attributes or query parameters. This lets you share a single table across multiple tenants while ensuring each can only see their own data.
/
