---
title: Deduplicate data in your data source
tags: realtime-cdc
meta:
  description: Learn several strategies for deduplicating data in Tinybird.
---

# Deduplicate data in your data source

Sometimes you might need to deduplicate data, for example to receive updates or data from a transactional database through CDC. You might want to retrieve only the latest data point, or keep a historic record of the evolution of the attributes of an object over time.

Because Tinybird doesn't enforce uniqueness for primary keys when inserting rows, you need to follow different strategies to deduplicate data with minimal side effects.

## Deduplication strategies

You can use one of the following strategies to deduplicate your data.

{% table %}
  * Method
  * When to use
  ---
  * [Deduplicate at query time](#deduplicate-at-query-time)
  * Deduplicate data at query time if you are still prototyping or the data source is small.
  ---
  * [Use ReplacingMergeTree](#use-the-replacingmergetree-engine)
  * Use `ReplacingMergeTree` or `AggregatingMergeTree` for greater performance.
  ---
  * [Snapshot based deduplication](#snapshot-based-deduplication)
  * If data freshness isn't required, generate periodic snapshots of the data and take advantage of subsequent materialized views for rollups.
  ---
  * [Hybrid approach using Lambda architecture](#hybrid-approach-using-lambda-architecture)
  * When you need to overcome engine approach limitations while preserving freshness, combine approaches in a Lambda architecture.
  {% /table %}

{% callout type="tip" %}
For dimensional and small tables, a periodical full replace is usually the best option.
{% /callout %}

## Example case

Consider a dataset from a social media analytics company that wants to track some data content over time. You receive an event with the latest info for each post, identified by `post_id`. The three fields, `views`, `likes`, `tags`, vary from event to event. For example:

```ndjson {% title="post.ndjson" %}
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }
```

## Deduplicate at query time

Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on `post_id` and get the latest value with these strategies:

- Get the max date for each post in a subquery and then filter by its results.
- Group data by `post_id` and use the `argMax` function.
- Use the `LIMIT BY` clause.

Select `Subquery`, `argMax`, or `LIMIT BY` to see the example queries for each.

{% tabs initial="Subquery" %}

{% tab label="Subquery"  %}

```sql {% title="Deduplicating data on post_id using Subquery" %}
SELECT *
FROM posts_info 
WHERE (post_id, timestamp) IN 
(
    SELECT 
        post_id, 
        max(timestamp)
    FROM posts_info 
    GROUP BY post_id
)
```

{% /tab %}

{% tab label="argMax" %}

```sql {% title="Deduplicating data on post_id using argMax" %}
SELECT 
    max(timestamp) last_ts,
    post_id, 
    argMax(views, timestamp) views,
    argMax(likes, timestamp) likes,
    argMax(tag, timestamp) tag
FROM posts_info
GROUP BY post_id
ORDER BY last_ts, post_id
```

{% /tab %}

{% tab label="LIMIT BY" %}

```sql {% title="Deduplicating data on post_id using LIMIT BY" %}
SELECT *
FROM posts_info
ORDER BY post_id, timestamp DESC
LIMIT 1 BY post_id
```

{% /tab %}

{% /tabs %}

Depending on your data and how you define the sorting keys in your data sources to store it on disk, one approach is faster than the others.

In general, deduplicating at query time is fine if the size of your data is small. If you have lots of data, use a specific Engine that takes care of deduplication for you.

## Use the ReplacingMergeTree engine

If you've lots of data and you're interested in the latest insertion for each unique key, use the ReplacingMergeTree engine with the following options: `ENGINE_SORTING_KEY`, `ENGINE_VER`, and `ENGINE_IS_DELETED`.

- Rows with the same `ENGINE_SORTING_KEY` are deduplicated. You can select one or more columns.
- If you specify a type for `ENGINE_VER`, the row with the highest `ENGINE_VER` for each unique `ENGINE_SORTING_KEY` is kept, for example a timestamp.
- `ENGINE_IS_DELETED` is only active if you use `ENGINE_VER`. This column determines whether the row represents the state or is to be deleted; `1` is a deleted row, `0` is a state row. The type must be `UInt8`.
- You can omit `ENGINE_VER`, so that the last inserted row for each unique `ENGINE_SORTING_KEY` is kept.

{% callout type="caution" %}
Do not build materialized views with an AggregatingMergeTree on top of a ReplacingMergeTree. The target data source will always contain duplicates due to the incremental nature of materialized views.
{% /callout %}

### Define a data source

Define a data source like the following:

```tb {% title="post_views_rmt.datasource" %}
DESCRIPTION >
    data source to save post info. ReplacingMergeTree Engine.

SCHEMA >
    `post_id` Int32 `json:$.post_id`,
    `views` Int32 `json:$.views`,
    `likes` Int32 `json:$.likes`,
    `tag` String `json:$.tag`,
    `timestamp` DateTime `json:$.timestamp`,
    `_is_deleted` UInt8 `json:$._is_deleted`

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "post_id"
ENGINE_VER "timestamp"
ENGINE_IS_DELETED "_is_deleted"
```

ReplacingMergeTree deduplicates during a merge, and merges can't be controlled. Consider adding the `FINAL` clause, or an alternative deduplication method, to apply the merge at query time. Note also that rows are masked, not removed, when using `FINAL`.

{% tabs initial="FINAL" %}

{% tab label="FINAL"  %}

```sql {% title="Deduplicating data on post_id using FINAL" %}
SELECT *
FROM posts_info_rmt FINAL
```

{% /tab %}

{% tab label="Subquery"  %}

```sql {% title="Deduplicating data on post_id using Subquery" %}
SELECT *
FROM posts_info_rmt 
WHERE (post_id, timestamp) IN 
(
    SELECT 
        post_id, 
        max(timestamp)
    FROM posts_info_rmt 
    GROUP BY post_id
)
```

{% /tab %}

{% tab label="argMax" %}

```sql {% title="Deduplicating data on post_id using argMax" %}
SELECT 
    max(timestamp) last_ts,
    post_id, 
    argMax(views, timestamp) views,
    argMax(likes, timestamp) likes,
    argMax(tag, timestamp) tag
FROM posts_info_rmt
GROUP BY post_id
ORDER BY last_ts, post_id
```

{% /tab %}

{% tab label="LIMIT BY" %}

```sql {% title="Deduplicating data on post_id using LIMIT BY" %}
SELECT *
FROM posts_info_rmt
ORDER BY post_id, timestamp DESC
LIMIT 1 BY post_id
```

{% /tab %}

{% /tabs %}

You can define the `posts_info_rmt` as the landing data source, the one you send events to, or as a materialized view from `posts_info`. You can also create a data source with an `AggregatingMergeTree` Engine using `maxState(ts)` and `argMaxState(field,ts)`.

## Snapshot based deduplication

Use [Copy Pipes](/classic/work-with-data/process-and-copy/copy-pipes) to take a query result and write it to a new data source in the following situations:

- You need other Sorting Keys that might change with updates.
- You need to do rollups and want to use materialized views.
- Response times are too long with a `ReplacingMergeTree`.

The following is an example snapshot:

```tb {% title="post_generate_snapshot.pipe" %}
NODE gen_snapshot
SQL >

    SELECT
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag,
        max(timestamp) as ts,
        toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts
    FROM posts_info
    WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE
    GROUP BY post_id

TYPE copy
TARGET_DATASOURCE post_snapshot
COPY_MODE replace
COPY_SCHEDULE 0 * * * *
```

Because the `TARGET_DATASOURCE` engine is a MergeTree, you can use fields that you expect to be updated as sorting keys in the ReplacingMergeTree.

```tb {% title="post_snapshot.datasource" %}
SCHEMA >
    `post_id` Int32,
    `views` Int32,
    `likes` Int32,
    `tag` String,
    `ts` DateTime,
    `snapshot_ts` DateTime

ENGINE "MergeTree"
ENGINE_PARTITION_KEY ""
ENGINE_SORTING_KEY "tag, post_id"
```

## Hybrid approach using Lambda architecture

Snapshots might decrease data freshness, and running Copy Pipes too frequently might be more expensive than materialized views. A way to mitigate these issues is to combine batch and real-time processing, reading the latest snapshot and incorporating the changes that happened since then.

This pattern is described in the [Lambda architecture](/classic/work-with-data/query/guides/lambda-architecture) guide. See a practical example in the [CDC using Lambda](/classic/work-with-data/query/guides/lambda-example-cdc) guide.

Using the `post_snapshot` data source created before, the real-time Pipe would be like the following:

```tb {% title="latest_values.pipe" %}
NODE get_latest_changes
SQL >

    SELECT 
        max(timestamp) last_ts,
        post_id, 
        argMax(views, timestamp) views,
        argMax(likes, timestamp) likes,
        argMax(tag, timestamp) tag
    FROM posts_info_rmt
    WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot)
    GROUP BY post_id

NODE get_snapshot
SQL >

    SELECT 
        last_ts,
        post_id, 
        views,
        likes,
        tag
    FROM posts_info_rmt
    WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot)
    AND post_id NOT IN (SELECT post_id FROM get_latest_changes)


NODE combine_both
SQL >

    SELECT * FROM get_snapshot
    UNION ALL
    SELECT * FROM get_latest_changes

```

## Using argMax with null values

Here are the definitions for `argMax` functions:

* `argMaxState`: used to pipe a constantly updating max state into a materialized view.
* `argMaxMerge`: used to query a max state value out of a materialized view.

__When dealing with null values, `argMax` functions might not behave as you expect.__ In raw queries, you might see a null value for the most recent record. However, when data is piped into an AggregatingMergeTree materialized view using `argMaxState` and later queried using `argMaxMerge`, the result can be different.

Returning to the social media analytics example, imagine you want to track most recent time when a post was flagged. 

The raw query would be as follows:
    
```tb {% title="get_latest_flagged_at_raw.pipe" %}
# This returns `null` if the most recent record's `flaggedAt` value is null.

NODE get_latest_flagged_at_raw
SQL >

    SELECT flaggedAt
    FROM posts
    WHERE post_id = 'abc123'
    ORDER BY timestamp DESC
    LIMIT 1
```
    
First, data is aggregated into a materialized view with a pipe that uses `argMaxState`:
    
```tb {% title="get_latest_flagged_at_mat.pipe" %}
NODE get_latest_flagged_at
SQL >

    SELECT argMaxState(flaggedAt, timestamp)
    FROM posts

TYPE materialized
DATASOURCE post_analytics_mv
```

Later, the aggregated state is merged with `argMaxMerge` when the materialized view is queried:
    
```tb {% title="get_latest_flagged_at_mat.pipe" %}
NODE get_absolute_latest_flagged_at
SQL >

    SELECT argMaxMerge(flaggedAt)
    FROM post_analytics_mv
    WHERE post_id = 'abc123'    
```
    
Although you might expect a `null` result because the raw query returned `null`, the merging process prefers any non‑null value over a null value—even if its associated timestamp is lower. The result is the most recent non‑null `flaggedAt` value.

#### Why this happens and workaround

During the merge process, the materialized view combines key‑value candidate states. A key-value candidate in this example would be `timestamp` and `flaggedAt`. If one candidate has a non‑null `flaggedAt` and another has a null value for `flaggedAt`, the non‑null value “wins” regardless of its timestamp. This behavior is inherent to the merging logic in `argMaxMerge` . It explains why in an argMaxMerge query, you might not get the absolute max.

To prevent the merging behavior from overriding a null with a non‑null candidate, you need to handle null values explicitly before they enter the materialized view. One common approach is to transform null values to a known default—often the Unix epoch `1970-01-01 00:00:00`. However, be aware that such conversions produce “fake” values which must be recognized in subsequent processing.

Assume your raw data includes a flaggedAt column, which may contain nulls. You can pre-process the data during aggregation as follows:

```tb {% title="get_latest_flagged_at_no_nulls.pipe" %}
NODE get_latest_flagged_at_no_nulls
SQL >

    SELECT
        argMaxState(
            CASE 
                WHEN flaggedAt IS NULL 
                THEN toDateTime('1970-01-01 00:00:00') 
                ELSE flaggedAt 
            END,
            timestamp
        )
    FROM posts  
```

Here, a CASE expression is used to convert any null flaggedAt values into the default datetime 1970-01-01 00:00:00.
This ensures that during the merge, the aggregation logic processes these explicit default values rather than implicitly “overriding” nulls.

{% callout type="caution" %}
Since `1970-01-01 00:00:00` is used as a default placeholder, ensure that any downstream logic differentiates between genuine datetime values and these default values.
{% /callout %}

## Next steps

- Read the [materialized views docs](/classic/work-with-data/process-and-copy/materialized-views#creating-a-materialized-view-in-the-tinybird-ui).
- Read the [Lambda architecture guide](/classic/work-with-data/query/guides/lambda-architecture).
- Visualize your data using [Tinybird Charts](/classic/publish-data/charts).
