---
title: "How to Achieve Real-Time Exactly-Once Ingestion from Kafka to ClickHouse"
excerpt: "Exactly-once delivery from Kafka to ClickHouse isn't magic. It's architecture. Here's the pattern that guarantees no duplicates."
authors: "Gonzalo Gomez"
categories: "The Data Base"
createdOn: "2025-12-16 00:00:00"
publishedOn: "2025-12-16 00:00:00"
updatedOn: "2025-12-16 00:00:00"
status: "published"
---

Achieving real-time exactly-once ingestion from Kafka to ClickHouse{% sup %}®{% /sup %} is a common challenge when building data pipelines.

This guide shows how to achieve exactly-once semantics using Tinybird's Kafka connector, which automatically tracks offsets per message and enables real-time detection of missing messages.

## Understanding Kafka Topics and Partitions

Before diving into exactly-once semantics, it's important to understand the basic building blocks of Kafka.

### What is a Topic?

A **topic** is a category or feed name to which messages are published. Think of it as a stream of related messages. For example, you might have topics like `user_events`, `order_updates`, or `sensor_readings`.

### What is a Partition?

A **partition** is a division of a topic. Topics are split into one or more partitions to enable parallelism and scalability. Each partition is an ordered, immutable sequence of messages.

Partitions allow Kafka to:

- **Scale horizontally**: Distribute data across multiple brokers
- **Enable parallelism**: Multiple consumers can read from different partitions simultaneously
- **Maintain ordering**: Messages within a partition are ordered, but ordering across partitions isn't guaranteed

For example, a topic with 6 partitions can be consumed by up to 6 consumers in parallel, with each consumer handling one partition.

## Understanding Kafka Consumer Groups and Offset Commits

Now that we understand topics and partitions, let's see how Kafka tracks message consumption.

### What is a Consumer Group?

A **consumer group** is a set of consumers that work together to consume messages from one or more Kafka topics. Each consumer in the group is assigned a unique **consumer_id** (also called a consumer instance ID) that identifies it within the group.

When multiple consumers are part of the same consumer group, Kafka automatically distributes partitions across them. For example, if you have a topic with 6 partitions and 3 consumers in a group, each consumer typically handles 2 partitions.

### How Offset Commits Work

Kafka uses **offsets** to track the position of each consumer within a partition. An offset is a unique, sequential identifier for each message in a partition, starting from 0.

When a consumer reads messages from Kafka, it processes them and then **commits** the offset back to Kafka. This commit tells Kafka: "I've successfully processed all messages up to this offset."

The commit process works like this:

1. Consumer reads messages from partition (e.g., offsets 100-199)
2. Consumer processes the messages
3. Consumer commits offset 200 to Kafka (indicating it's ready for the next batch starting at 200)

If a consumer crashes after processing messages but before committing, Kafka will resume from the last committed offset when the consumer restarts. This means messages might be reprocessed, leading to duplicates.

### The Exactly-Once Challenge

Exactly-once semantics means each message is processed exactly once—no duplicates, no missing messages. This is difficult to achieve because:

- **Network failures**: A consumer might process a message but fail before committing the offset
- **Application crashes**: The consumer might crash between processing and committing
- **Distributed systems**: Multiple consumers or instances might process the same message

## How Tinybird Tracks Offsets Per Message

Tinybird's Kafka connector automatically stores Kafka message metadata in your Data Sources, including the offset for each message. This enables real-time detection of missing messages.

### Kafka Meta Columns in Tinybird

When you create a Data Source connected to Kafka, Tinybird automatically includes these meta columns:

- `__topic` - The Kafka topic name
- `__partition` - The partition number
- `__offset` - The message offset within the partition
- `__timestamp` - The Kafka message timestamp
- `__key` - The message key (if present)
- `__value` - The raw message value

The `__offset` column is particularly important for exactly-once detection because it provides a sequential identifier for each message within a partition.

### Viewing Offsets in Your Data Source

You can query the `__offset` column directly to see the offset for each message:

```sql
SELECT
    __topic,
    __partition,
    __offset,
    __timestamp,
    your_data_columns
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
ORDER BY __partition, __offset
```

This query shows you the offset sequence for each partition, making it easy to spot gaps. Remember that offsets are per partition.

## Detecting Missing Offsets

To achieve exactly-once semantics, you need to detect when offsets are missing—indicating that messages were skipped during ingestion.

### Check for Offset Gaps by Partition

The following query detects gaps in the offset sequence for each partition:

```sql
SELECT
    __topic,
    __partition,
    min(__offset) as min_offset,
    max(__offset) as max_offset,
    count() as message_count,
    (max(__offset) - min(__offset) + 1) - count() as missing_messages
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
GROUP BY __topic, __partition
HAVING missing_messages > 0
```

This query calculates the expected number of messages (based on the offset range) and compares it to the actual count. If there's a difference, messages are missing.

Note: If you're using kafka tombstones, Tinybird sends them to quarantine, so check in your topic if the missing offsets on each partition are tombstones.

### Detect Consecutive Offset Gaps

For more precise detection, you can identify specific missing offsets by comparing consecutive offsets:

```sql
WITH offset_sequence AS (
    SELECT
        __topic,
        __partition,
        __offset,
        lagInFrame(__offset) OVER (
            PARTITION BY __topic, __partition 
            ORDER BY __offset
        ) as prev_offset
    FROM your_kafka_data_source
    WHERE __timestamp > now() - INTERVAL 1 hour
)
SELECT
    __topic,
    __partition,
    prev_offset + 1 as expected_offset,
    __offset as actual_offset,
    __offset - prev_offset - 1 as gap_size
FROM offset_sequence
WHERE prev_offset IS NOT NULL
  AND __offset - prev_offset > 1
ORDER BY __topic, __partition, expected_offset
```

This query identifies specific missing offsets and the size of each gap.

## Creating a Pipe for Missing Offset Alerts

To monitor for missing offsets in real-time, create a Tinybird Pipe that runs periodically and alerts when gaps are detected.

### Step 1: Create the Alert Detection Endpoint

Create a new Pipe file (e.g., `endpoints/missing_offsets_alert.pipe`) that detects offset gaps:

```sql
TOKEN monitoring READ

NODE check_missing_offsets
SQL >
    SELECT
        __topic,
        __partition,
        min(__offset) as min_offset,
        max(__offset) as max_offset,
        count() as message_count,
        (max(__offset) - min(__offset) + 1) - count() as missing_messages,
        min(__timestamp) as first_message_time,
        max(__timestamp) as last_message_time
    FROM your_kafka_data_source
    WHERE __timestamp > now() - INTERVAL 15 minute
    GROUP BY __topic, __partition
    HAVING missing_messages > 0

NODE alert_details
SQL >
    SELECT
        __topic,
        __partition,
        missing_messages,
        min_offset,
        max_offset,
        message_count,
        first_message_time,
        last_message_time,
        now() as alert_time
    FROM check_missing_offsets
    ORDER BY missing_messages DESC

TYPE Endpoint
```

This endpoint can be called by your monitoring system (e.g., PagerDuty, Datadog, or a custom webhook) to trigger alerts when missing offsets are detected.

### Step 2: Schedule the Pipe

Configure the Pipe to run automatically at regular intervals. You can use Tinybird's scheduling feature or call the endpoint from an external scheduler:

```bash
# Example: Call the endpoint every 5 minutes using cron
*/5 * * * * curl -X GET "https://api.tinybird.co/v0/pipes/missing_offsets_alert.json" \
  -H "Authorization: Bearer YOUR_TOKEN"
```

### Step 3: Enhanced Alerting with Gap Details

For more detailed alerts, create an Endpoint that identifies specific missing offsets:

```sql
TOKEN monitoring READ

NODE find_gaps
SQL >
    WITH offset_data AS (
        SELECT
            __topic,
            __partition,
            __offset,
            lagInFrame(__offset) OVER (
                PARTITION BY __topic, __partition 
                ORDER BY __offset
            ) as prev_offset,
            __timestamp
        FROM your_kafka_data_source
        WHERE timestamp > now() - INTERVAL 15 minute
    )
    SELECT
        __topic,
        __partition,
        prev_offset + 1 as missing_offset_start,
        __offset - 1 as missing_offset_end,
        __offset - prev_offset - 1 as gap_size,
        __timestamp as detected_at
    FROM offset_data
    WHERE prev_offset IS NOT NULL
      AND __offset - prev_offset > 1
    ORDER BY __topic, __partition, missing_offset_start

TYPE ENDPOINT
```

This Pipe provides specific offset ranges that are missing, making it easier to investigate and potentially reprocess missing messages.

## Real-Time Offset Monitoring Dashboard

You can also create a monitoring dashboard by aggregating offset statistics over time:

```sql
SELECT
    toStartOfMinute(__timestamp) as minute,
    __topic,
    __partition,
    min(__offset) as min_offset,
    max(__offset) as max_offset,
    count() as messages_received,
    max(__offset) - min(__offset) + 1 as expected_messages,
    (max(__offset) - min(__offset) + 1) - count() as missing_messages
FROM your_kafka_data_source
WHERE __timestamp > now() - INTERVAL 1 hour
GROUP BY minute, __topic, __partition
ORDER BY minute DESC, __topic, __partition
```

This query shows offset statistics per minute, helping you identify when gaps occur and track ingestion health over time.

## Best Practices for Exactly-Once Ingestion

When building exactly-once ingestion pipelines with Tinybird:

1. **Check offset columns**: Your Data Source schema includes `__offset` and `__partition` columns, useful for gap detection
2. **Monitor regularly**: Set up the alerting Pipe to run frequently (every 1-5 minutes) to catch issues quickly
3. **Track per partition**: Monitor each partition separately, as gaps might only affect specific partitions
4. **Set up alerts**: Integrate the alerting Pipe with your monitoring system to get notified immediately
5. **Investigate gaps promptly**: When gaps are detected, investigate the root cause (network issues, consumer failures, etc.)

## Conclusion

Tinybird's Kafka connector automatically tracks offsets per message, enabling real-time gap detection and alerting. By creating monitoring Pipes that detect missing offsets, you can ensure data integrity and catch ingestion issues immediately.

Learn more about [Tinybird's Kafka connector](https://www.tinybird.co/docs/forward/get-data-in/connectors/kafka) and see the [Kafka monitoring guide](https://www.tinybird.co/docs/forward/monitoring/kafka-clickhouse-monitoring) for additional monitoring queries.
