Monitor Kafka to ClickHouse® connector and consumer lag¶
Tinybird's Kafka connector automatically consumes data from Kafka topics and ingests it into ClickHouse® Data Sources. The connector features consumer autoscaling and multi-broker connectivity. The tinybird.kafka_ops_log Service Data Source provides operational logs for monitoring your integration, tracking consumer lag, analyzing performance, and optimizing your connector configuration.
Use kafka_ops_log to monitor consumer lag, track throughput, identify errors, and analyze performance across your Data Sources. The connector's serverless architecture automatically scales consumers based on load, and kafka_ops_log provides visibility into this autoscaling behavior along with error handling and debugging information.
Logs are retained for 30 days and include information about each partition, topic, and Data Source in your workspace. This supports analysis of partition assignment, rebalancing, and consumer group scaling behavior.
New to the Kafka connector? If you're setting up the Kafka connector for the first time, see the Kafka connector documentation for setup instructions, configuration options, and troubleshooting. This monitoring guide focuses on operational monitoring once your connector is running.
Kafka meta columns in your Data Sources¶
In addition to monitoring via kafka_ops_log, each Data Source connected to Kafka includes Kafka meta columns that store metadata from Kafka messages. These columns are automatically added to your Data Source schema and can be used for analysis, debugging, and monitoring.
The available meta columns include:
__topic: The Kafka topic the message was read from__partition: The partition number the message was read from__offset: The message offset within the partition__timestamp: The timestamp of the Kafka message__key: The message key (if present)__value: The raw message value (ifKAFKA_STORE_RAW_VALUEis set toTrue)__headers: Kafka headers as a Map (ifKAFKA_STORE_HEADERSis set toTrue)
Use cases for Kafka meta columns¶
Track message ordering and detect gaps: Use __offset to identify missing messages or gaps in your data ingestion. Compare consecutive offsets to detect when messages are skipped.
Analyze distribution: Use __partition to understand how messages are distributed across partitions. This helps verify that your partition key design is working as expected and that messages are evenly distributed.
Correlate with kafka_ops_log: Join your Data Source with kafka_ops_log using __partition and __offset to correlate ingested messages with connector operations, helping you understand ingestion latency and identify bottlenecks.
Message timestamp analysis: Compare __timestamp (Kafka message timestamp) with your Data Source's ingestion timestamp to understand end-to-end latency from message production to ClickHouse ingestion.
Debug with message keys: Use __key to track specific messages or understand how keys map to partitions, which is essential for partition key design optimization.
Leverage headers for routing and tracing: If KAFKA_STORE_HEADERS is set to True, use __headers to access custom metadata, trace IDs, routing information, or other context stored in Kafka headers. This is useful for distributed tracing, A/B testing, or routing logic based on header values.
| Field | Type | Description |
|---|---|---|
timestamp | DateTime | Date and time when the operation took place. |
datasource_id | String | ID of your Data Source. The Data Source ID is consistent after renaming operations. |
topic | String | Kafka topic. |
partition | Int16 | Partition number, or -1 for all partitions. Use this field to analyze partition assignment and identify which partitions are assigned to your consumer. |
msg_type | String | 'info' for regular messages, 'warning' for issues related to the user's Kafka cluster, deserialization or Materialized Views, and 'error' for other issues. |
lag | Int64 | Number of messages behind for the partition. This is the difference between the high-water mark and the last commit offset. This is the primary metric for consumer lag monitoring. |
processed_messages | Int32 | Messages processed for a topic and partition. |
processed_bytes | Int32 | Amount of bytes processed. |
committed_messages | Int32 | Messages ingested for a topic and partition. |
msg | String | Information in the case of warnings or errors. Empty otherwise. |
The tinybird.kafka_ops_log Service Data Source includes operational metrics from your serverless Kafka connector. Each row represents a snapshot of the connector's state for a specific partition at a point in time, providing visibility into autoscaling consumer behavior, error handling, and debugging information.
Monitor consumer lag¶
Consumer lag is the difference between the latest offset in a partition (high-water mark) and the last committed offset by your consumer. High lag values indicate that your consumer is falling behind the producer, which can lead to delayed data ingestion. With Tinybird's serverless connector, consumer autoscaling automatically adjusts to handle increased load, but monitoring lag helps you understand when scaling occurs and verify that autoscaling is working effectively.
Monitoring consumer lag is critical for performance tuning. When lag increases, Tinybird's autoscaling infrastructure automatically scales consumers to handle the load. The kafka_ops_log provides visibility into this autoscaling behavior, helping you understand partition assignment patterns and verify that your integration is performing optimally. For more details on consumer configuration, see the Kafka connector documentation.
Find current consumer lag by partition¶
Get the most recent lag measurement for each partition:
SELECT
datasource_id,
topic,
partition,
lag,
timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
AND partition >= 0
AND msg_type = 'info'
ORDER BY timestamp DESC
LIMIT 1 BY datasource_id, topic, partition
Consumer lag trend over time¶
Track how lag changes over time to identify patterns and trends:
SELECT
toStartOfHour(timestamp) as hour,
datasource_id,
topic,
partition,
max(lag) as max_lag,
avg(lag) as avg_lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND partition >= 0
AND msg_type = 'info'
GROUP BY hour, datasource_id, topic, partition
ORDER BY hour DESC, max_lag DESC
Rising lag trends may indicate that your current consumer group configuration cannot keep up with the message rate. This is particularly useful for understanding how Tinybird's consumer autoscaling responds to varying load patterns.
Find partitions with high lag¶
Identify partitions that need immediate attention:
SELECT
datasource_id,
topic,
partition,
max(lag) as current_lag,
timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
AND partition >= 0
AND msg_type = 'info'
GROUP BY datasource_id, topic, partition, timestamp
HAVING current_lag > 10000
ORDER BY current_lag DESC
High lag on specific partitions may indicate uneven message distribution, where some partitions receive more messages than others. This is valuable information for partition key design and scaling decisions.
Track throughput and performance¶
Throughput monitoring helps you understand how efficiently your connector is processing messages. This is essential for performance tuning and understanding how the autoscaling infrastructure responds to varying message rates.
Messages processed per hour by topic¶
Track message processing rates over time:
SELECT
toStartOfHour(timestamp) as hour,
datasource_id,
topic,
sum(processed_messages) as total_messages,
sum(processed_bytes) as total_bytes,
sum(committed_messages) as total_committed
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND msg_type = 'info'
GROUP BY hour, datasource_id, topic
ORDER BY hour DESC
Use this to track throughput trends and identify peak processing periods. This data helps you understand how consumer autoscaling responds to load changes and supports capacity planning.
Processing rate comparison across partitions¶
Compare processing rates to identify bottlenecks:
SELECT
datasource_id,
topic,
partition,
sum(processed_messages) as total_processed,
sum(committed_messages) as total_committed,
count(*) as operation_count,
sum(processed_messages) / count(*) as avg_messages_per_operation
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
AND partition >= 0
AND msg_type = 'info'
GROUP BY datasource_id, topic, partition
ORDER BY total_processed DESC
Uneven processing rates may indicate that you need to review your partition key design or adjust your partition assignment strategy.
Throughput efficiency¶
Compare processed messages to committed messages to understand your ingestion success rate:
SELECT
datasource_id,
topic,
sum(processed_messages) as processed,
sum(committed_messages) as committed,
(sum(committed_messages) * 100.0 / sum(processed_messages)) as success_rate_pct
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND msg_type = 'info'
AND processed_messages > 0
GROUP BY datasource_id, topic
ORDER BY success_rate_pct ASC
A low success rate indicates that messages are being processed but not successfully ingested into ClickHouse, which may point to schema issues, data quality problems, or configuration errors. The connector's quarantine feature automatically handles problematic messages, and error details in kafka_ops_log help you debug and resolve issues. Learn more about Quarantine Data Sources.
Monitor errors and warnings¶
Errors and warnings can indicate issues with your Kafka cluster connectivity, deserialization problems, Materialized View errors, or configuration issues. The serverless connector includes comprehensive error handling with quarantine for problematic messages. Regular monitoring of kafka_ops_log helps you catch and resolve issues before they impact your data pipeline. For troubleshooting guidance, see the Kafka connector troubleshooting guide.
Recent errors and warnings¶
View all errors and warnings from your connector operations:
SELECT
timestamp,
datasource_id,
topic,
partition,
msg_type,
msg,
lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND msg_type IN ('warning', 'error')
ORDER BY timestamp DESC
Review these messages to identify patterns and root causes. Common issues include partition rebalancing events, deserialization failures, and connectivity problems.
Error count by Data Source and topic¶
Identify which Data Sources or topics are experiencing the most issues:
SELECT
datasource_id,
topic,
msg_type,
count(*) as error_count
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND msg_type IN ('warning', 'error')
GROUP BY datasource_id, topic, msg_type
ORDER BY error_count DESC
This helps prioritize troubleshooting efforts and identify if specific topics or partitions are problematic.
Group errors by message type¶
Identify recurring issues and patterns:
SELECT
msg,
msg_type,
count(*) as occurrence_count,
count(DISTINCT datasource_id) as affected_datasources
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 7 day
AND msg_type IN ('warning', 'error')
AND msg != ''
GROUP BY msg, msg_type
ORDER BY occurrence_count DESC
This analysis helps you understand common failure modes in your connector.
Health checks¶
Regular health checks help you assess the overall status of your integration. These queries provide a comprehensive view of connector health, including consumer lag, throughput, and error rates. You can also set up automated health checks for continuous monitoring.
Connector health summary¶
Get a comprehensive health overview of all your connectors:
SELECT
datasource_id,
topic,
max(lag) as max_lag,
sum(processed_messages) as total_processed,
sum(committed_messages) as total_committed,
countIf(msg_type = 'error') as error_count,
countIf(msg_type = 'warning') as warning_count,
max(timestamp) as last_activity
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY datasource_id, topic
ORDER BY max_lag DESC
Use this for a quick assessment of your connector performance and to identify connectors that need attention.
Find inactive connectors¶
Find Data Sources that haven't processed messages recently:
SELECT
datasource_id,
topic,
max(timestamp) as last_activity,
now() - max(timestamp) as time_since_last_activity
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 7 day
GROUP BY datasource_id, topic
HAVING time_since_last_activity > INTERVAL 1 hour
ORDER BY time_since_last_activity DESC
Inactive connectors may indicate that the consumer has stopped, the topic has no new messages, or there's a connectivity issue.
Partition-level analysis¶
Partition analysis helps you understand how your partition assignment strategy is working and identify partitions that may need attention. This analysis helps with scaling decisions and optimizing your partition key design.
Partition performance comparison¶
Compare performance metrics across all partitions:
SELECT
datasource_id,
topic,
partition,
max(lag) as max_lag,
avg(lag) as avg_lag,
sum(processed_messages) as total_messages,
countIf(msg_type = 'error') as errors,
countIf(msg_type = 'warning') as warnings
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
AND partition >= 0
GROUP BY datasource_id, topic, partition
ORDER BY max_lag DESC, errors DESC
This helps you understand if your partition assignment is balanced and whether you need to adjust your setup.
Lag distribution analysis¶
Understand the distribution of consumer lag across partitions:
SELECT
datasource_id,
topic,
quantile(0.5)(lag) as median_lag,
quantile(0.95)(lag) as p95_lag,
quantile(0.99)(lag) as p99_lag,
max(lag) as max_lag,
min(lag) as min_lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
AND partition >= 0
AND msg_type = 'info'
GROUP BY datasource_id, topic
ORDER BY max_lag DESC
This helps identify if lag is evenly distributed or concentrated in specific partitions. Uneven lag distribution may indicate that you need to review your partition key design or consider rebalancing.
Alerting queries¶
Set up automated alerts based on these queries to catch issues early. These queries are designed to trigger alerts when critical conditions are met, helping you maintain reliable data pipelines. Combine these with health checks for comprehensive monitoring.
Connect to monitoring tools: You can connect these monitoring queries to external alerting tools like Grafana, Datadog, PagerDuty, and Slack. Query the ClickHouse HTTP interface directly, or create API endpoints from these queries. For Prometheus-compatible tools, you can export queries in Prometheus format. Configure your monitoring tools to poll these queries periodically and trigger alerts when thresholds are exceeded.
High consumer lag alert¶
Alert when consumer lag exceeds a threshold:
SELECT
datasource_id,
topic,
partition,
lag,
timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 5 minute
AND partition >= 0
AND msg_type = 'info'
AND lag > 50000
ORDER BY timestamp DESC
LIMIT 1 BY datasource_id, topic, partition
High lag indicates that your consumer is falling behind and may need scaling. With Tinybird's consumer autoscaling, the infrastructure automatically handles increased load, but this alert helps you verify autoscaling effectiveness.
Error rate alert¶
Alert when the error rate exceeds a threshold:
SELECT
datasource_id,
topic,
countIf(msg_type = 'error') as error_count,
count(*) as total_operations,
(countIf(msg_type = 'error') * 100.0 / count(*)) as error_rate_pct
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 15 minute
GROUP BY datasource_id, topic
HAVING error_rate_pct > 5
ORDER BY error_rate_pct DESC
High error rates indicate problems with your connector configuration, cluster connectivity, or data quality issues that need immediate attention.
Processing stall alert¶
Alert when no messages are being processed:
SELECT
datasource_id,
topic,
max(timestamp) as last_activity,
sum(processed_messages) as messages_in_window
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 10 minute
GROUP BY datasource_id, topic
HAVING messages_in_window = 0
ORDER BY last_activity DESC
This indicates that your consumer may have stopped, there's a connectivity issue, or the topic has no new messages.
Combine with other Service Data Sources¶
You can join tinybird.kafka_ops_log with other Service Data Sources for deeper analysis. This supports comprehensive analysis that correlates connector operations with Data Source ingestion metrics. Learn more about all available Service Data Sources.
Correlate with Data Source operations¶
Join Kafka operations log with Data Source operations log to understand the complete pipeline from message consumption to ClickHouse ingestion:
SELECT
k.timestamp,
k.datasource_id,
k.topic,
k.partition,
k.processed_messages,
k.committed_messages,
d.rows as rows_in_datasource,
d.elapsed_time
FROM tinybird.kafka_ops_log k
LEFT JOIN (
SELECT
datasource_id,
rows,
elapsed_time,
timestamp
FROM tinybird.datasources_ops_log
WHERE event_type = 'append-kafka'
AND timestamp > now() - INTERVAL 1 hour
) d ON k.datasource_id = d.datasource_id
AND toStartOfMinute(k.timestamp) = toStartOfMinute(d.timestamp)
WHERE k.timestamp > now() - INTERVAL 1 hour
AND k.msg_type = 'info'
ORDER BY k.timestamp DESC
This helps you identify bottlenecks in the ingestion process and optimize your connector performance.
Analyze ingestion using Kafka meta columns¶
Use the Kafka meta columns in your Data Sources to analyze ingestion patterns. For example, track message gaps by analyzing offset sequences:
SELECT
__partition,
__topic,
min(__offset) as min_offset,
max(__offset) as max_offset,
count(*) as message_count,
max(__offset) - min(__offset) + 1 as expected_count,
(max(__offset) - min(__offset) + 1) - count(*) as missing_messages
FROM your_kafka_datasource
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY __partition, __topic
HAVING missing_messages > 0
This helps identify if any messages were skipped during ingestion. You can also analyze distribution to verify your partition key design:
SELECT
__partition,
count(*) as message_count,
count(DISTINCT __key) as unique_keys,
min(__timestamp) as first_message,
max(__timestamp) as last_message
FROM your_kafka_datasource
WHERE timestamp > now() - INTERVAL 24 hour
GROUP BY __partition
ORDER BY __partition
This shows how messages are distributed across partitions, helping you understand if your partition assignment is balanced.
Schema evolution monitoring¶
Monitor schema changes and their impact on ingestion:
SELECT
timestamp,
datasource_id,
topic,
msg,
count(*) as occurrence_count
FROM tinybird.kafka_ops_log
WHERE msg_type = 'warning'
AND (msg LIKE '%schema%' OR msg LIKE '%deserialization%')
AND timestamp > now() - INTERVAL 24 hour
GROUP BY timestamp, datasource_id, topic, msg
ORDER BY occurrence_count DESC
This helps identify schema evolution issues and deserialization problems that may affect ingestion performance.
Performance tuning best practices¶
Based on monitoring data from tinybird.kafka_ops_log, follow these best practices:
Monitor consumer lag regularly: Set up alerts for high lag values to catch issues early. With Tinybird's serverless architecture, consumer autoscaling automatically handles increased load, but monitoring lag helps you verify that autoscaling is working effectively.
Track error patterns: Review error messages periodically to identify recurring issues. Common problems include partition assignment issues, deserialization failures, and connectivity problems.
Compare partition performance: If one partition has significantly higher lag than others, investigate the cause. This may indicate uneven message distribution and the need to review your partition key design.
Monitor throughput trends: Track processing rates over time to understand how the autoscaling infrastructure responds to varying message rates.
Use time windows: Filter queries by recent time windows (for example, last hour, last 24 hours) to focus on current state and avoid analyzing stale data.
Analyze partition assignment: Use partition-level metrics to understand if your partition assignment strategy is working effectively. Uneven partition loads may require rebalancing or adjustments to your setup.
Optimize partition key design: If you see uneven lag distribution across partitions, consider reviewing your partition key design to ensure messages are distributed evenly.
For detailed performance optimization strategies, see the performance optimization guide.
Next steps¶
Kafka connector documentation¶
- Kafka connector guide - Complete setup, configuration, and troubleshooting guide
- Troubleshooting guide - Common errors and solutions
- Performance optimization - Throughput tuning and best practices
- Partitioning strategies - Partition optimization
- Schema management - Schema evolution and data type mapping
Monitoring and observability¶
- Service Data Sources - Explore other monitoring options
- Health checks - Automated monitoring setup
- Data Source operations - Monitor ingestion performance
- Workspace jobs - Additional pipeline observability
Integrate with monitoring tools¶
Connect monitoring queries to Grafana, Datadog, PagerDuty, Slack, and other alerting systems by querying the ClickHouse HTTP interface directly. You can also create API endpoints from these queries, or export them in Prometheus format for Prometheus-compatible tools.