Achieving real-time exactly-once ingestion from Kafka to ClickHouse® is a common challenge when building data pipelines.
This guide shows how to achieve exactly-once semantics using Tinybird's Kafka connector, which automatically tracks offsets per message and enables real-time detection of missing messages.
Understanding Kafka Topics and Partitions
Before diving into exactly-once semantics, it's important to understand the basic building blocks of Kafka.
What is a Topic?
A topic is a category or feed name to which messages are published. Think of it as a stream of related messages. For example, you might have topics like user_events, order_updates, or sensor_readings.
What is a Partition?
A partition is a division of a topic. Topics are split into one or more partitions to enable parallelism and scalability. Each partition is an ordered, immutable sequence of messages.
Partitions allow Kafka to:
- Scale horizontally: Distribute data across multiple brokers
- Enable parallelism: Multiple consumers can read from different partitions simultaneously
- Maintain ordering: Messages within a partition are ordered, but ordering across partitions isn't guaranteed
For example, a topic with 6 partitions can be consumed by up to 6 consumers in parallel, with each consumer handling one partition.
Understanding Kafka Consumer Groups and Offset Commits
Now that we understand topics and partitions, let's see how Kafka tracks message consumption.
What is a Consumer Group?
A consumer group is a set of consumers that work together to consume messages from one or more Kafka topics. Each consumer in the group is assigned a unique consumer_id (also called a consumer instance ID) that identifies it within the group.
When multiple consumers are part of the same consumer group, Kafka automatically distributes partitions across them. For example, if you have a topic with 6 partitions and 3 consumers in a group, each consumer typically handles 2 partitions.
How Offset Commits Work
Kafka uses offsets to track the position of each consumer within a partition. An offset is a unique, sequential identifier for each message in a partition, starting from 0.
When a consumer reads messages from Kafka, it processes them and then commits the offset back to Kafka. This commit tells Kafka: "I've successfully processed all messages up to this offset."
The commit process works like this:
- Consumer reads messages from partition (e.g., offsets 100-199)
- Consumer processes the messages
- Consumer commits offset 200 to Kafka (indicating it's ready for the next batch starting at 200)
If a consumer crashes after processing messages but before committing, Kafka will resume from the last committed offset when the consumer restarts. This means messages might be reprocessed, leading to duplicates.
The Exactly-Once Challenge
Exactly-once semantics means each message is processed exactly once—no duplicates, no missing messages. This is difficult to achieve because:
- Network failures: A consumer might process a message but fail before committing the offset
- Application crashes: The consumer might crash between processing and committing
- Distributed systems: Multiple consumers or instances might process the same message
How Tinybird Tracks Offsets Per Message
Tinybird's Kafka connector automatically stores Kafka message metadata in your Data Sources, including the offset for each message. This enables real-time detection of missing messages.
Kafka Meta Columns in Tinybird
When you create a Data Source connected to Kafka, Tinybird automatically includes these meta columns:
__topic- The Kafka topic name__partition- The partition number__offset- The message offset within the partition__timestamp- The Kafka message timestamp__key- The message key (if present)__value- The raw message value
The __offset column is particularly important for exactly-once detection because it provides a sequential identifier for each message within a partition.
Viewing Offsets in Your Data Source
You can query the __offset column directly to see the offset for each message:
SELECT
__topic,
__partition,
__offset,
__timestamp,
your_data_columns
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
ORDER BY __partition, __offset
This query shows you the offset sequence for each partition, making it easy to spot gaps. Remember that offsets are per partition.
Detecting Missing Offsets
To achieve exactly-once semantics, you need to detect when offsets are missing—indicating that messages were skipped during ingestion.
Check for Offset Gaps by Partition
The following query detects gaps in the offset sequence for each partition:
SELECT
__topic,
__partition,
min(__offset) as min_offset,
max(__offset) as max_offset,
count() as message_count,
(max(__offset) - min(__offset) + 1) - count() as missing_messages
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
GROUP BY __topic, __partition
HAVING missing_messages > 0
This query calculates the expected number of messages (based on the offset range) and compares it to the actual count. If there's a difference, messages are missing.
Note: If you're using kafka tombstones, Tinybird sends them to quarantine, so check in your topic if the missing offsets on each partition are tombstones.
Detect Consecutive Offset Gaps
For more precise detection, you can identify specific missing offsets by comparing consecutive offsets:
WITH offset_sequence AS (
SELECT
__topic,
__partition,
__offset,
lagInFrame(__offset) OVER (
PARTITION BY __topic, __partition
ORDER BY __offset
) as prev_offset
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
)
SELECT
__topic,
__partition,
prev_offset + 1 as expected_offset,
__offset as actual_offset,
__offset - prev_offset - 1 as gap_size
FROM offset_sequence
WHERE prev_offset IS NOT NULL
AND __offset - prev_offset > 1
ORDER BY __topic, __partition, expected_offset
This query identifies specific missing offsets and the size of each gap.
Creating a Pipe for Missing Offset Alerts
To monitor for missing offsets in real-time, create a Tinybird Pipe that runs periodically and alerts when gaps are detected.
Step 1: Create the Alert Detection Endpoint
Create a new Pipe file (e.g., endpoints/missing_offsets_alert.pipe) that detects offset gaps:
TOKEN monitoring READ
NODE check_missing_offsets
SQL >
SELECT
__topic,
__partition,
min(__offset) as min_offset,
max(__offset) as max_offset,
count() as message_count,
(max(__offset) - min(__offset) + 1) - count() as missing_messages,
min(__timestamp) as first_message_time,
max(__timestamp) as last_message_time
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 15 minute
GROUP BY __topic, __partition
HAVING missing_messages > 0
NODE alert_details
SQL >
SELECT
__topic,
__partition,
missing_messages,
min_offset,
max_offset,
message_count,
first_message_time,
last_message_time,
now() as alert_time
FROM check_missing_offsets
ORDER BY missing_messages DESC
TYPE Endpoint
This endpoint can be called by your monitoring system (e.g., PagerDuty, Datadog, or a custom webhook) to trigger alerts when missing offsets are detected.
Step 2: Schedule the Pipe
Configure the Pipe to run automatically at regular intervals. You can use Tinybird's scheduling feature or call the endpoint from an external scheduler:
# Example: Call the endpoint every 5 minutes using cron
*/5 * * * * curl -X GET "https://api.tinybird.co/v0/pipes/missing_offsets_alert.json" \
-H "Authorization: Bearer YOUR_TOKEN"
Step 3: Enhanced Alerting with Gap Details
For more detailed alerts, create an Endpoint that identifies specific missing offsets:
TOKEN monitoring READ
NODE find_gaps
SQL >
WITH offset_data AS (
SELECT
__topic,
__partition,
__offset,
lagInFrame(__offset) OVER (
PARTITION BY __topic, __partition
ORDER BY __offset
) as prev_offset,
__timestamp
FROM your_kafka_data_source
WHERE timestamp > now() - INTERVAL 15 minute
)
SELECT
__topic,
__partition,
prev_offset + 1 as missing_offset_start,
__offset - 1 as missing_offset_end,
__offset - prev_offset - 1 as gap_size,
__timestamp as detected_at
FROM offset_data
WHERE prev_offset IS NOT NULL
AND __offset - prev_offset > 1
ORDER BY __topic, __partition, missing_offset_start
TYPE ENDPOINT
This Pipe provides specific offset ranges that are missing, making it easier to investigate and potentially reprocess missing messages.
Real-Time Offset Monitoring Dashboard
You can also create a monitoring dashboard by aggregating offset statistics over time:
SELECT
toStartOfMinute(__timestamp) as minute,
__topic,
__partition,
min(__offset) as min_offset,
max(__offset) as max_offset,
count() as messages_received,
max(__offset) - min(__offset) + 1 as expected_messages,
(max(__offset) - min(__offset) + 1) - count() as missing_messages
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
GROUP BY minute, __topic, __partition
ORDER BY minute DESC, __topic, __partition
This query shows offset statistics per minute, helping you identify when gaps occur and track ingestion health over time.
Best Practices for Exactly-Once Ingestion
When building exactly-once ingestion pipelines with Tinybird:
- Check offset columns: Your Data Source schema includes
__offsetand__partitioncolumns, useful for gap detection - Monitor regularly: Set up the alerting Pipe to run frequently (every 1-5 minutes) to catch issues quickly
- Track per partition: Monitor each partition separately, as gaps might only affect specific partitions
- Set up alerts: Integrate the alerting Pipe with your monitoring system to get notified immediately
- Investigate gaps promptly: When gaps are detected, investigate the root cause (network issues, consumer failures, etc.)
Conclusion
Tinybird's Kafka connector automatically tracks offsets per message, enabling real-time gap detection and alerting. By creating monitoring Pipes that detect missing offsets, you can ensure data integrity and catch ingestion issues immediately.
Learn more about Tinybird's Kafka connector and see the Kafka monitoring guide for additional monitoring queries.
