Monitor Kafka to ClickHouse® connector and consumer lag

Tinybird's Kafka connector automatically consumes data from Kafka topics and ingests it into ClickHouse® Data Sources. The connector features consumer autoscaling and multi-broker connectivity. The tinybird.kafka_ops_log Service Data Source provides operational logs for monitoring your integration, tracking consumer lag, analyzing performance, and optimizing your connector configuration.

Use kafka_ops_log to monitor consumer lag, track throughput, identify errors, and analyze performance across your Data Sources. The connector's serverless architecture automatically scales consumers based on load, and kafka_ops_log provides visibility into this autoscaling behavior along with error handling and debugging information.

Logs are retained for 30 days and include information about each partition, topic, and Data Source in your workspace. This supports analysis of partition assignment, rebalancing, and consumer group scaling behavior.

New to the Kafka connector? If you're setting up the Kafka connector for the first time, see the Kafka connector documentation for setup instructions, configuration options, and troubleshooting. This monitoring guide focuses on operational monitoring once your connector is running.

Kafka meta columns in your Data Sources

In addition to monitoring via kafka_ops_log, each Data Source connected to Kafka includes Kafka meta columns that store metadata from Kafka messages. These columns are automatically added to your Data Source schema and can be used for analysis, debugging, and monitoring.

The available meta columns include:

  • __topic: The Kafka topic the message was read from
  • __partition: The partition number the message was read from
  • __offset: The message offset within the partition
  • __timestamp: The timestamp of the Kafka message
  • __key: The message key (if present)
  • __value: The raw message value (if KAFKA_STORE_RAW_VALUE is set to True)
  • __headers: Kafka headers as a Map (if KAFKA_STORE_HEADERS is set to True)

Use cases for Kafka meta columns

Track message ordering and detect gaps: Use __offset to identify missing messages or gaps in your data ingestion. Compare consecutive offsets to detect when messages are skipped.

Analyze distribution: Use __partition to understand how messages are distributed across partitions. This helps verify that your partition key design is working as expected and that messages are evenly distributed.

Correlate with kafka_ops_log: Join your Data Source with kafka_ops_log using __partition and __offset to correlate ingested messages with connector operations, helping you understand ingestion latency and identify bottlenecks.

Message timestamp analysis: Compare __timestamp (Kafka message timestamp) with your Data Source's ingestion timestamp to understand end-to-end latency from message production to ClickHouse ingestion.

Debug with message keys: Use __key to track specific messages or understand how keys map to partitions, which is essential for partition key design optimization.

Leverage headers for routing and tracing: If KAFKA_STORE_HEADERS is set to True, use __headers to access custom metadata, trace IDs, routing information, or other context stored in Kafka headers. This is useful for distributed tracing, A/B testing, or routing logic based on header values.

FieldTypeDescription
timestampDateTimeDate and time when the operation took place.
datasource_idStringID of your Data Source. The Data Source ID is consistent after renaming operations.
topicStringKafka topic.
partitionInt16Partition number, or -1 for all partitions. Use this field to analyze partition assignment and identify which partitions are assigned to your consumer.
msg_typeString'info' for regular messages, 'warning' for issues related to the user's Kafka cluster, deserialization or Materialized Views, and 'error' for other issues.
lagInt64Number of messages behind for the partition. This is the difference between the high-water mark and the last commit offset. This is the primary metric for consumer lag monitoring.
processed_messagesInt32Messages processed for a topic and partition.
processed_bytesInt32Amount of bytes processed.
committed_messagesInt32Messages ingested for a topic and partition.
msgStringInformation in the case of warnings or errors. Empty otherwise.

The tinybird.kafka_ops_log Service Data Source includes operational metrics from your serverless Kafka connector. Each row represents a snapshot of the connector's state for a specific partition at a point in time, providing visibility into autoscaling consumer behavior, error handling, and debugging information.

Monitor consumer lag

Consumer lag is the difference between the latest offset in a partition (high-water mark) and the last committed offset by your consumer. High lag values indicate that your consumer is falling behind the producer, which can lead to delayed data ingestion. With Tinybird's serverless connector, consumer autoscaling automatically adjusts to handle increased load, but monitoring lag helps you understand when scaling occurs and verify that autoscaling is working effectively.

Monitoring consumer lag is critical for performance tuning. When lag increases, Tinybird's autoscaling infrastructure automatically scales consumers to handle the load. The kafka_ops_log provides visibility into this autoscaling behavior, helping you understand partition assignment patterns and verify that your integration is performing optimally. For more details on consumer configuration, see the Kafka connector documentation.

Find current consumer lag by partition

Get the most recent lag measurement for each partition:

SELECT
    datasource_id,
    topic,
    partition,
    lag,
    timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
  AND partition >= 0
  AND msg_type = 'info'
ORDER BY timestamp DESC
LIMIT 1 BY datasource_id, topic, partition

Consumer lag trend over time

Track how lag changes over time to identify patterns and trends:

SELECT
    toStartOfHour(timestamp) as hour,
    datasource_id,
    topic,
    partition,
    max(lag) as max_lag,
    avg(lag) as avg_lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND partition >= 0
  AND msg_type = 'info'
GROUP BY hour, datasource_id, topic, partition
ORDER BY hour DESC, max_lag DESC

Rising lag trends may indicate that your current consumer group configuration cannot keep up with the message rate. This is particularly useful for understanding how Tinybird's consumer autoscaling responds to varying load patterns.

Find partitions with high lag

Identify partitions that need immediate attention:

SELECT
    datasource_id,
    topic,
    partition,
    max(lag) as current_lag,
    timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
  AND partition >= 0
  AND msg_type = 'info'
GROUP BY datasource_id, topic, partition, timestamp
HAVING current_lag > 10000
ORDER BY current_lag DESC

High lag on specific partitions may indicate uneven message distribution, where some partitions receive more messages than others. This is valuable information for partition key design and scaling decisions.

Track throughput and performance

Throughput monitoring helps you understand how efficiently your connector is processing messages. This is essential for performance tuning and understanding how the autoscaling infrastructure responds to varying message rates.

Messages processed per hour by topic

Track message processing rates over time:

SELECT
    toStartOfHour(timestamp) as hour,
    datasource_id,
    topic,
    sum(processed_messages) as total_messages,
    sum(processed_bytes) as total_bytes,
    sum(committed_messages) as total_committed
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND msg_type = 'info'
GROUP BY hour, datasource_id, topic
ORDER BY hour DESC

Use this to track throughput trends and identify peak processing periods. This data helps you understand how consumer autoscaling responds to load changes and supports capacity planning.

Processing rate comparison across partitions

Compare processing rates to identify bottlenecks:

SELECT
    datasource_id,
    topic,
    partition,
    sum(processed_messages) as total_processed,
    sum(committed_messages) as total_committed,
    count(*) as operation_count,
    sum(processed_messages) / count(*) as avg_messages_per_operation
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
  AND partition >= 0
  AND msg_type = 'info'
GROUP BY datasource_id, topic, partition
ORDER BY total_processed DESC

Uneven processing rates may indicate that you need to review your partition key design or adjust your partition assignment strategy.

Throughput efficiency

Compare processed messages to committed messages to understand your ingestion success rate:

SELECT
    datasource_id,
    topic,
    sum(processed_messages) as processed,
    sum(committed_messages) as committed,
    (sum(committed_messages) * 100.0 / sum(processed_messages)) as success_rate_pct
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND msg_type = 'info'
  AND processed_messages > 0
GROUP BY datasource_id, topic
ORDER BY success_rate_pct ASC

A low success rate indicates that messages are being processed but not successfully ingested into ClickHouse, which may point to schema issues, data quality problems, or configuration errors. The connector's quarantine feature automatically handles problematic messages, and error details in kafka_ops_log help you debug and resolve issues. Learn more about Quarantine Data Sources.

Monitor errors and warnings

Errors and warnings can indicate issues with your Kafka cluster connectivity, deserialization problems, Materialized View errors, or configuration issues. The serverless connector includes comprehensive error handling with quarantine for problematic messages. Regular monitoring of kafka_ops_log helps you catch and resolve issues before they impact your data pipeline. For troubleshooting guidance, see the Kafka connector troubleshooting guide.

Recent errors and warnings

View all errors and warnings from your connector operations:

SELECT
    timestamp,
    datasource_id,
    topic,
    partition,
    msg_type,
    msg,
    lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND msg_type IN ('warning', 'error')
ORDER BY timestamp DESC

Review these messages to identify patterns and root causes. Common issues include partition rebalancing events, deserialization failures, and connectivity problems.

Error count by Data Source and topic

Identify which Data Sources or topics are experiencing the most issues:

SELECT
    datasource_id,
    topic,
    msg_type,
    count(*) as error_count
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND msg_type IN ('warning', 'error')
GROUP BY datasource_id, topic, msg_type
ORDER BY error_count DESC

This helps prioritize troubleshooting efforts and identify if specific topics or partitions are problematic.

Group errors by message type

Identify recurring issues and patterns:

SELECT
    msg,
    msg_type,
    count(*) as occurrence_count,
    count(DISTINCT datasource_id) as affected_datasources
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 7 day
  AND msg_type IN ('warning', 'error')
  AND msg != ''
GROUP BY msg, msg_type
ORDER BY occurrence_count DESC

This analysis helps you understand common failure modes in your connector.

Health checks

Regular health checks help you assess the overall status of your integration. These queries provide a comprehensive view of connector health, including consumer lag, throughput, and error rates. You can also set up automated health checks for continuous monitoring.

Connector health summary

Get a comprehensive health overview of all your connectors:

SELECT
    datasource_id,
    topic,
    max(lag) as max_lag,
    sum(processed_messages) as total_processed,
    sum(committed_messages) as total_committed,
    countIf(msg_type = 'error') as error_count,
    countIf(msg_type = 'warning') as warning_count,
    max(timestamp) as last_activity
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY datasource_id, topic
ORDER BY max_lag DESC

Use this for a quick assessment of your connector performance and to identify connectors that need attention.

Find inactive connectors

Find Data Sources that haven't processed messages recently:

SELECT
    datasource_id,
    topic,
    max(timestamp) as last_activity,
    now() - max(timestamp) as time_since_last_activity
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 7 day
GROUP BY datasource_id, topic
HAVING time_since_last_activity > INTERVAL 1 hour
ORDER BY time_since_last_activity DESC

Inactive connectors may indicate that the consumer has stopped, the topic has no new messages, or there's a connectivity issue.

Partition-level analysis

Partition analysis helps you understand how your partition assignment strategy is working and identify partitions that may need attention. This analysis helps with scaling decisions and optimizing your partition key design.

Partition performance comparison

Compare performance metrics across all partitions:

SELECT
    datasource_id,
    topic,
    partition,
    max(lag) as max_lag,
    avg(lag) as avg_lag,
    sum(processed_messages) as total_messages,
    countIf(msg_type = 'error') as errors,
    countIf(msg_type = 'warning') as warnings
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 24 hour
  AND partition >= 0
GROUP BY datasource_id, topic, partition
ORDER BY max_lag DESC, errors DESC

This helps you understand if your partition assignment is balanced and whether you need to adjust your setup.

Lag distribution analysis

Understand the distribution of consumer lag across partitions:

SELECT
    datasource_id,
    topic,
    quantile(0.5)(lag) as median_lag,
    quantile(0.95)(lag) as p95_lag,
    quantile(0.99)(lag) as p99_lag,
    max(lag) as max_lag,
    min(lag) as min_lag
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 1 hour
  AND partition >= 0
  AND msg_type = 'info'
GROUP BY datasource_id, topic
ORDER BY max_lag DESC

This helps identify if lag is evenly distributed or concentrated in specific partitions. Uneven lag distribution may indicate that you need to review your partition key design or consider rebalancing.

Alerting queries

Set up automated alerts based on these queries to catch issues early. These queries are designed to trigger alerts when critical conditions are met, helping you maintain reliable data pipelines. Combine these with health checks for comprehensive monitoring.

Connect to monitoring tools: You can connect these monitoring queries to external alerting tools like Grafana, Datadog, PagerDuty, and Slack. Query the ClickHouse HTTP interface directly, or create API endpoints from these queries. For Prometheus-compatible tools, you can export queries in Prometheus format. Configure your monitoring tools to poll these queries periodically and trigger alerts when thresholds are exceeded.

High consumer lag alert

Alert when consumer lag exceeds a threshold:

SELECT
    datasource_id,
    topic,
    partition,
    lag,
    timestamp
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 5 minute
  AND partition >= 0
  AND msg_type = 'info'
  AND lag > 50000
ORDER BY timestamp DESC
LIMIT 1 BY datasource_id, topic, partition

High lag indicates that your consumer is falling behind and may need scaling. With Tinybird's consumer autoscaling, the infrastructure automatically handles increased load, but this alert helps you verify autoscaling effectiveness.

Error rate alert

Alert when the error rate exceeds a threshold:

SELECT
    datasource_id,
    topic,
    countIf(msg_type = 'error') as error_count,
    count(*) as total_operations,
    (countIf(msg_type = 'error') * 100.0 / count(*)) as error_rate_pct
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 15 minute
GROUP BY datasource_id, topic
HAVING error_rate_pct > 5
ORDER BY error_rate_pct DESC

High error rates indicate problems with your connector configuration, cluster connectivity, or data quality issues that need immediate attention.

Processing stall alert

Alert when no messages are being processed:

SELECT
    datasource_id,
    topic,
    max(timestamp) as last_activity,
    sum(processed_messages) as messages_in_window
FROM tinybird.kafka_ops_log
WHERE timestamp > now() - INTERVAL 10 minute
GROUP BY datasource_id, topic
HAVING messages_in_window = 0
ORDER BY last_activity DESC

This indicates that your consumer may have stopped, there's a connectivity issue, or the topic has no new messages.

Combine with other Service Data Sources

You can join tinybird.kafka_ops_log with other Service Data Sources for deeper analysis. This supports comprehensive analysis that correlates connector operations with Data Source ingestion metrics. Learn more about all available Service Data Sources.

Correlate with Data Source operations

Join Kafka operations log with Data Source operations log to understand the complete pipeline from message consumption to ClickHouse ingestion:

SELECT
    k.timestamp,
    k.datasource_id,
    k.topic,
    k.partition,
    k.processed_messages,
    k.committed_messages,
    d.rows as rows_in_datasource,
    d.elapsed_time
FROM tinybird.kafka_ops_log k
LEFT JOIN (
    SELECT
        datasource_id,
        rows,
        elapsed_time,
        timestamp
    FROM tinybird.datasources_ops_log
    WHERE event_type = 'append-kafka'
      AND timestamp > now() - INTERVAL 1 hour
) d ON k.datasource_id = d.datasource_id
    AND toStartOfMinute(k.timestamp) = toStartOfMinute(d.timestamp)
WHERE k.timestamp > now() - INTERVAL 1 hour
  AND k.msg_type = 'info'
ORDER BY k.timestamp DESC

This helps you identify bottlenecks in the ingestion process and optimize your connector performance.

Analyze ingestion using Kafka meta columns

Use the Kafka meta columns in your Data Sources to analyze ingestion patterns. For example, track message gaps by analyzing offset sequences:

SELECT
    __partition,
    __topic,
    min(__offset) as min_offset,
    max(__offset) as max_offset,
    count(*) as message_count,
    max(__offset) - min(__offset) + 1 as expected_count,
    (max(__offset) - min(__offset) + 1) - count(*) as missing_messages
FROM your_kafka_datasource
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY __partition, __topic
HAVING missing_messages > 0

This helps identify if any messages were skipped during ingestion. You can also analyze distribution to verify your partition key design:

SELECT
    __partition,
    count(*) as message_count,
    count(DISTINCT __key) as unique_keys,
    min(__timestamp) as first_message,
    max(__timestamp) as last_message
FROM your_kafka_datasource
WHERE timestamp > now() - INTERVAL 24 hour
GROUP BY __partition
ORDER BY __partition

This shows how messages are distributed across partitions, helping you understand if your partition assignment is balanced.

Schema evolution monitoring

Monitor schema changes and their impact on ingestion:

SELECT
    timestamp,
    datasource_id,
    topic,
    msg,
    count(*) as occurrence_count
FROM tinybird.kafka_ops_log
WHERE msg_type = 'warning'
  AND (msg LIKE '%schema%' OR msg LIKE '%deserialization%')
  AND timestamp > now() - INTERVAL 24 hour
GROUP BY timestamp, datasource_id, topic, msg
ORDER BY occurrence_count DESC

This helps identify schema evolution issues and deserialization problems that may affect ingestion performance.

Performance tuning best practices

Based on monitoring data from tinybird.kafka_ops_log, follow these best practices:

  • Monitor consumer lag regularly: Set up alerts for high lag values to catch issues early. With Tinybird's serverless architecture, consumer autoscaling automatically handles increased load, but monitoring lag helps you verify that autoscaling is working effectively.

  • Track error patterns: Review error messages periodically to identify recurring issues. Common problems include partition assignment issues, deserialization failures, and connectivity problems.

  • Compare partition performance: If one partition has significantly higher lag than others, investigate the cause. This may indicate uneven message distribution and the need to review your partition key design.

  • Monitor throughput trends: Track processing rates over time to understand how the autoscaling infrastructure responds to varying message rates.

  • Use time windows: Filter queries by recent time windows (for example, last hour, last 24 hours) to focus on current state and avoid analyzing stale data.

  • Analyze partition assignment: Use partition-level metrics to understand if your partition assignment strategy is working effectively. Uneven partition loads may require rebalancing or adjustments to your setup.

  • Optimize partition key design: If you see uneven lag distribution across partitions, consider reviewing your partition key design to ensure messages are distributed evenly.

For detailed performance optimization strategies, see the performance optimization guide.

Next steps

Kafka connector documentation

Monitoring and observability

Integrate with monitoring tools

Connect monitoring queries to Grafana, Datadog, PagerDuty, Slack, and other alerting systems by querying the ClickHouse HTTP interface directly. You can also create API endpoints from these queries, or export them in Prometheus format for Prometheus-compatible tools.

Updated