This guide shows how to monitor Kafka consumption using ClickHouse® native Kafka engine and system tables. These queries help you track consumer lag, throughput, errors, and performance when using ClickHouse's Kafka engine directly.
The ClickHouse Kafka engine uses a poll-based mechanism and requires active management by an engineering team. It works well in production for teams with ClickHouse expertise, but requires monitoring consumer lag, handling errors, and managing consumer groups manually. For teams looking for a serverless solution with automatic consumer scaling and simplified monitoring, managed connectors like Tinybird's Kafka connector provide equivalent monitoring capabilities through Service Data Sources with reduced operational overhead.
How to Monitor Kafka with ClickHouse System Tables
When using ClickHouse's Kafka engine table, you can monitor consumer lag, throughput, and errors using ClickHouse system tables.
The main system tables for Kafka monitoring include:
system.kafka_consumers- Information about Kafka consumerssystem.tables- Table metadata (filter byengine = 'Kafka'for Kafka engine tables)system.metrics- System metrics including Kafka-related metricssystem.events- System eventssystem.query_log- Query execution logssystem.part_log- Part operations log
How to Monitor Kafka Consumer Lag in ClickHouse
Kafka consumer lag is the difference between the latest offset in a partition (high-water mark) and the last committed offset by your consumer.
With ClickHouse Kafka engine, you can track current offsets using system.kafka_consumers. Note: The system.kafka_consumers table in ClickHouse 25 doesn't include high water mark, so to calculate lag you'll need to:
- Query Kafka directly for high water marks, or
- Parse the
rdkafka_statJSON field which contains librdkafka statistics including high water mark information
Find current Kafka consumer lag by partition
Query the system.kafka_consumers table to get current offset for each partition.
Note: To calculate lag, you need to get the high water mark from Kafka directly or parse the rdkafka_stat JSON field, as system.kafka_consumers doesn't include high water mark:
SELECT
database,
table,
consumer_id,
assignments.topic as topic,
assignments.partition_id as partition_id,
assignments.current_offset as current_offset,
last_poll_time,
num_messages_read,
rdkafka_stat
FROM system.kafka_consumers
ARRAY JOIN assignments.topic, assignments.partition_id, assignments.current_offset
WHERE database = 'your_database'
AND table = 'your_kafka_table'
ORDER BY current_offset DESC
Kafka consumer offset trend over time
Track offset changes over time by querying system metrics or using materialized views that aggregate offset data.
Note: To calculate lag, you'll need to join with high water mark data from Kafka:
SELECT
toStartOfHour(last_poll_time) as hour,
database,
table,
assignments.partition_id as partition_id,
max(assignments.current_offset) as max_offset,
avg(assignments.current_offset) as avg_offset,
sum(num_messages_read) as total_messages_read
FROM system.kafka_consumers
ARRAY JOIN assignments.partition_id, assignments.current_offset
WHERE database = 'your_database'
AND table = 'your_kafka_table'
AND last_poll_time > now() - INTERVAL 24 hour
GROUP BY hour, database, table, partition_id
ORDER BY hour DESC
Find Kafka partitions with low message processing
Identify partitions that may require attention by checking message read rates:
SELECT
database,
table,
consumer_id,
assignments.topic as topic,
assignments.partition_id as partition_id,
assignments.current_offset as current_offset,
num_messages_read,
last_poll_time,
now() - last_poll_time as time_since_last_poll
FROM system.kafka_consumers
ARRAY JOIN assignments.topic, assignments.partition_id, assignments.current_offset
WHERE database = 'your_database'
AND table = 'your_kafka_table'
AND now() - last_poll_time > INTERVAL 10 minute
ORDER BY time_since_last_poll DESC
How to Track Kafka Throughput and Performance in ClickHouse
Monitor message processing rates using system tables and query logs.
Messages processed per hour by Kafka topic
Track throughput by analyzing query logs or using metrics.
Note: In ClickHouse 25, system.query_log uses current_database instead of database, and doesn't have a direct table column. Filter by query text or use Kafka consumer metrics instead:
SELECT
toStartOfHour(event_time) as hour,
current_database,
count() as operations,
sum(read_rows) as total_rows_read,
sum(read_bytes) as total_bytes_read
FROM system.query_log
WHERE current_database = 'your_database'
AND query LIKE '%your_kafka_table%'
AND type = 'QueryFinish'
AND event_time > now() - INTERVAL 24 hour
GROUP BY hour, current_database
ORDER BY hour DESC
Kafka partition processing rate comparison
Compare processing rates across partitions:
SELECT
database,
table,
assignments.partition_id as partition_id,
assignments.topic as topic,
count() as consumer_count,
sum(num_messages_read) as total_messages_read,
sum(num_commits) as total_commits,
max(last_poll_time) as last_activity,
max(last_commit_time) as last_commit
FROM system.kafka_consumers
ARRAY JOIN assignments.partition_id, assignments.topic
WHERE database = 'your_database'
AND table = 'your_kafka_table'
GROUP BY database, table, partition_id, topic
ORDER BY total_messages_read DESC
Kafka connector throughput efficiency
Monitor ingestion success by comparing processed vs committed messages. With ClickHouse Kafka engine, you can track this through part operations:
SELECT
database,
table,
count() as total_parts,
sum(rows) as total_rows,
sum(size_in_bytes) as total_bytes,
min(event_time) as first_ingestion,
max(event_time) as last_ingestion
FROM system.part_log
WHERE database = 'your_database'
AND table = 'your_kafka_table'
AND event_time > now() - INTERVAL 24 hour
GROUP BY database, table
How to Monitor Kafka Errors and Warnings in ClickHouse
Track errors and warnings using system tables and query logs.
Recent Kafka connector errors and warnings
Query the system for Kafka-related errors:
SELECT
event_time,
current_database,
exception,
query,
query_start_time
FROM system.query_log
WHERE current_database = 'your_database'
AND query LIKE '%your_kafka_table%'
AND type = 'ExceptionWhileProcessing'
AND event_time > now() - INTERVAL 24 hour
ORDER BY event_time DESC
Error count by Kafka table
Aggregate error counts by database. To filter by specific table, use query text matching:
SELECT
current_database,
count() as error_count,
uniqExact(exception) as unique_errors
FROM system.query_log
WHERE current_database = 'your_database'
AND query LIKE '%your_kafka_table%'
AND type = 'ExceptionWhileProcessing'
AND event_time > now() - INTERVAL 24 hour
GROUP BY current_database
ORDER BY error_count DESC
Group Kafka errors by exception type
Identify recurring error patterns:
SELECT
exception,
count() as occurrence_count,
count(DISTINCT current_database) as affected_databases,
min(event_time) as first_occurrence,
max(event_time) as last_occurrence
FROM system.query_log
WHERE current_database = 'your_database'
AND query LIKE '%kafka%'
AND type = 'ExceptionWhileProcessing'
AND event_time > now() - INTERVAL 7 day
GROUP BY exception
ORDER BY occurrence_count DESC
How to Perform Kafka Health Checks in ClickHouse
Assess overall health of your Kafka engine tables.
Kafka connector health summary
Get a comprehensive health overview:
SELECT
database,
table,
count(DISTINCT consumer_id) as consumer_count,
length(arrayDistinct(arrayFlatten(groupArray(assignments.partition_id)))) as partition_count,
sum(num_messages_read) as total_messages_read,
sum(num_commits) as total_commits,
min(last_poll_time) as oldest_poll,
max(last_poll_time) as newest_poll,
now() - max(last_poll_time) as time_since_last_poll,
sum(num_rebalance_revocations) as total_revocations,
sum(num_rebalance_assignments) as total_assignments
FROM system.kafka_consumers
WHERE database = 'your_database'
GROUP BY database, table
ORDER BY time_since_last_poll DESC
Find inactive Kafka consumers
Identify consumers that haven't polled recently:
SELECT
database,
table,
consumer_id,
assignments.topic as topic,
assignments.partition_id as partition_id,
last_poll_time,
is_currently_used,
now() - last_poll_time as time_since_last_poll
FROM system.kafka_consumers
ARRAY JOIN assignments.topic, assignments.partition_id
WHERE database = 'your_database'
AND now() - last_poll_time > INTERVAL 1 hour
ORDER BY time_since_last_poll DESC
How to Analyze Kafka Partition Performance in ClickHouse
Analyze partition-level performance metrics.
Kafka partition performance comparison
Compare performance across all partitions:
SELECT
database,
table,
assignments.partition_id as partition_id,
assignments.topic as topic,
max(assignments.current_offset) as max_offset,
min(assignments.current_offset) as min_offset,
sum(num_messages_read) as total_messages_read,
count() as consumer_count,
min(last_poll_time) as first_poll,
max(last_poll_time) as last_poll
FROM system.kafka_consumers
ARRAY JOIN assignments.partition_id, assignments.current_offset, assignments.topic
WHERE database = 'your_database'
AND table = 'your_kafka_table'
GROUP BY database, table, partition_id, topic
ORDER BY total_messages_read DESC
Kafka partition offset distribution analysis
Understand offset distribution across partitions. Note: To calculate lag, you need high water mark from Kafka:
SELECT
database,
table,
quantile(0.5)(assignments.current_offset) as median_offset,
quantile(0.95)(assignments.current_offset) as p95_offset,
quantile(0.99)(assignments.current_offset) as p99_offset,
max(assignments.current_offset) as max_offset,
min(assignments.current_offset) as min_offset,
avg(assignments.current_offset) as avg_offset,
sum(num_messages_read) as total_messages_read
FROM system.kafka_consumers
ARRAY JOIN assignments.current_offset
WHERE database = 'your_database'
AND table = 'your_kafka_table'
GROUP BY database, table
ORDER BY total_messages_read DESC
How to Set Up Kafka Monitoring Alerts in ClickHouse
Set up automated alerts based on these queries.
High Kafka consumer inactivity alert
Alert when consumers haven't polled recently or have low message processing:
SELECT
database,
table,
consumer_id,
assignments.partition_id as partition_id,
assignments.current_offset as current_offset,
num_messages_read,
last_poll_time,
now() - last_poll_time as time_since_last_poll
FROM system.kafka_consumers
ARRAY JOIN assignments.partition_id, assignments.current_offset
WHERE database = 'your_database'
AND (now() - last_poll_time > INTERVAL 10 minute OR num_messages_read = 0)
ORDER BY time_since_last_poll DESC
Kafka connector error rate alert
Alert when error rate is high:
SELECT
current_database,
countIf(type = 'ExceptionWhileProcessing') as error_count,
count() as total_queries,
(countIf(type = 'ExceptionWhileProcessing') * 100.0 / count()) as error_rate_pct
FROM system.query_log
WHERE current_database = 'your_database'
AND query LIKE '%your_kafka_table%'
AND event_time > now() - INTERVAL 15 minute
GROUP BY current_database
HAVING error_rate_pct > 5
ORDER BY error_rate_pct DESC
Kafka processing stall alert
Alert when no messages are being processed:
SELECT
database,
table,
max(last_poll_time) as last_activity,
now() - max(last_poll_time) as time_since_last_activity
FROM system.kafka_consumers
WHERE database = 'your_database'
GROUP BY database, table
HAVING time_since_last_activity > INTERVAL 10 minute
ORDER BY time_since_last_activity DESC
Combine Kafka monitoring with other system tables
Join Kafka consumer data with other system tables for deeper insights.
Correlate Kafka operations with part operations
Understand the complete pipeline from Kafka consumption to part creation:
SELECT
k.database,
k.table,
k.consumer_id,
k.assignments.partition_id as partition_id,
k.assignments.current_offset as current_offset,
k.num_messages_read,
k.num_commits,
p.rows as rows_in_part,
p.size_in_bytes,
p.event_time as part_creation_time
FROM system.kafka_consumers k
ARRAY JOIN k.assignments.partition_id, k.assignments.current_offset
LEFT JOIN (
SELECT
database,
table,
rows,
size_in_bytes,
event_time
FROM system.part_log
WHERE event_type = 'NewPart'
AND event_time > now() - INTERVAL 1 hour
) p ON k.database = p.database
AND k.table = p.table
WHERE k.database = 'your_database'
AND k.table = 'your_kafka_table'
ORDER BY k.last_poll_time DESC
Analyze ingestion using Kafka table metadata
Use Kafka engine table metadata to analyze ingestion patterns:
SELECT
database,
name as table_name,
engine,
engine_full,
total_rows,
total_bytes,
formatReadableSize(total_bytes) as total_size
FROM system.tables
WHERE engine = 'Kafka'
AND database = 'your_database'
Using Kafka meta columns in your tables
When using ClickHouse Kafka engine, you can access Kafka message metadata through virtual columns or by storing them explicitly in your table schema.
Track message gaps using offsets
If you store _offset in your table, you can detect gaps:
SELECT
_partition,
_topic,
min(_offset) as min_offset,
max(_offset) as max_offset,
count() as message_count,
(max(_offset) - min(_offset) + 1) - count() as missing_messages
FROM your_kafka_table
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY _partition, _topic
HAVING missing_messages > 0
Analyze partition distribution
Understand how messages are distributed across partitions:
SELECT
_partition,
count() as message_count,
count(DISTINCT _key) as unique_keys,
min(_timestamp) as first_message,
max(_timestamp) as last_message
FROM your_kafka_table
WHERE timestamp > now() - INTERVAL 24 hour
GROUP BY _partition
ORDER BY _partition
Comparison: ClickHouse Kafka engine vs managed connectors
This section compares the native ClickHouse Kafka engine with managed connectors like Tinybird's Kafka connector to help you choose the right approach for your use case.
Pricing and Support
| Feature | Tinybird Kafka Connector | ClickHouse OSS Kafka Engine |
|---|---|---|
| Pricing Model | Included in platform pricing | No additional cost (uses cluster resources) |
| Topic Limit | Unlimited | Unlimited topics (limited by infrastructure capacity) |
| Support | Enterprise support included | Community support (Telegram, GitHub), commercial support available from ClickHouse Inc. |
Scaling & Performance
| Feature | Tinybird Kafka Connector | ClickHouse OSS Kafka Engine |
|---|---|---|
| Deployment Model | Serverless (managed) | Self-hosted (requires cluster management) |
| Consumer scaling | Automatic (fully managed) | Manual (configure kafka_num_consumers per table) |
| Resource isolation | Separate compute for ingestion | Shared cluster resources (ingestion + queries) |
| Ingestion Model | Push-based | Pull-based (polling) |
| Flush latency | Sub-second (on-demand) | Configurable (via kafka_flush_interval_ms, typically 1-10 seconds) |
| Throughput | Scales automatically | High throughput (100K+ rows/sec per consumer with proper tuning) |
Operations & Failure Handling
| Feature | Tinybird Kafka Connector | ClickHouse OSS Kafka Engine |
|---|---|---|
| Circuit breaker & Backpressure | Built-in | Manual (requires monitoring and tuning) |
| Dead letter queue | Automatic quarantine for problematic messages | Manual (can skip broken messages via kafka_skip_broken_messages) |
| Failure recovery | Automated with clear recovery paths | Manual (requires monitoring system tables and manual intervention) |
| Deployment handling | Graceful shutdown during deploys | Coupled to ClickHouse cluster lifecycle |
| High Availability | Built-in (default) | Available via ClickHouse cluster replication (requires cluster setup) |
Developer Experience
| Feature | Tinybird Kafka Connector | ClickHouse OSS Kafka Engine |
|---|---|---|
| Configuration management | CLI + declarative config (data as code) | SQL DDL statements |
| Environment management | Built-in environment branching | Manual (separate databases/clusters for environments) |
| Data type mapping | Assisted (automatic suggestions) | Manual (reference ClickHouse documentation) |
| Schema evolution | Automatic handling | Manual (ALTER TABLE statements, may require table recreation) |
| Learning curve | Lower (managed service) | Higher (requires ClickHouse and Kafka expertise) |
When to use ClickHouse Kafka engine vs managed connectors
Both approaches have their place depending on your team's needs and constraints.
Use ClickHouse Kafka engine when
- You have ClickHouse expertise: Your team is comfortable managing ClickHouse clusters, tuning performance, and monitoring system tables
- Full control is required: You need fine-grained control over consumer configuration, offset management, and resource allocation
- Cost optimization: You want to use existing ClickHouse infrastructure without additional service costs
- Custom requirements: You need specific Kafka consumer configurations that managed services don't support
- Self-hosted infrastructure: You're running ClickHouse on-premises or in your own cloud infrastructure
Consider managed connectors (like Tinybird) when
- Reduced operational overhead: You want to focus on analytics rather than infrastructure management
- Automatic scaling: Your workload varies and you need consumers to scale automatically
- Faster time to production: You want to get started quickly without deep ClickHouse expertise
- Built-in observability: You prefer unified monitoring dashboards over querying multiple system tables
- Error handling: You want automatic dead letter queues and error recovery without manual intervention
Benefits of managed Kafka connectors
Managed connectors like Tinybird's Kafka connector provide:
- Automatic consumer scaling: Consumers scale based on load without manual configuration
- Simplified monitoring: Single Service Data Source (
tinybird.kafka_ops_log) instead of querying multiple system tables - Built-in error handling: Automatic quarantine for problematic messages with clear recovery paths
- Serverless infrastructure: No need to manage ClickHouse servers or consumer processes
- Better developer experience: Declarative configuration, environment branching, and assisted schema mapping
The monitoring queries in this guide help you understand what metrics matter regardless of which approach you choose. For teams using managed connectors, equivalent monitoring is available through Service Data Sources. Learn more about Tinybird's Kafka connector and monitoring guide.
Best practices for ClickHouse Kafka engine monitoring
If you continue using ClickHouse Kafka engine directly, follow these best practices:
- Monitor system.kafka_consumers regularly: Set up scheduled queries to track lag and consumer health
- Use materialized views: Create materialized views to aggregate Kafka metrics over time for historical analysis
- Track query_log exceptions: Monitor
system.query_logfor Kafka-related errors and exceptions - Analyze part_log: Use
system.part_logto understand part creation patterns and ingestion performance - Set up alerts: Use the alerting queries above with external monitoring tools
- Monitor system resources: Track memory and CPU usage related to Kafka consumption using
system.metrics
