Deduplicate the data in your Data Source¶
OLAP databases like ClickHouse® are optimized for fast ingestion and, for that to work, some trade-offs are made. One of these is the lack of unique data constraints: Enforcing them would add a big overhead, and make ingestion speeds too slow for what's expected from this kind of database.
However, there will be times when you either want to get rid of duplicated data, or just access the most recent data point available. There are several options, but the general workflow is ingesting all of the data first (including duplicates) and dealing with them after ingestion.
The problem¶
Typically, there are two scenarios that lead to duplicated data:
- Upserts. An upsert is an operation that inserts new rows into a database table if they do not already exist, or updates them if they do. With databases like Postgres, this can be accomplished with
ON CONFLICT DO INSERT
clauses. Because ClickHouse does not enforce uniqueness of primary keys, it doesn't support clauses like this. The way to do upserts on ClickHouse is with a ReplacingMergeTree engine. More on this later. - Constant ingestions and historical data. Imagine you're periodically dumping data from your transactional database to ClickHouse to run analytical queries on it in real-time, as described in this Tinybird blog post. You would end up with lots of rows inserted at different times with the same primary key. You may want to get only the latest data point - in this case you can get rid of the rest and treat them like upserts. But there are also use cases where you want to keep all the data to have a historic record of the evolution of the attributes of an object over time.
Different alternatives based on your requirements¶
- Deduplicate at Query time: if you are still prototyping or the Data Source is not too big, like below 1M rows, probably it is still fast to deduplicate at query time.
- Engine based:
ReplacingMergeTree
orAggregatingMergeTree
and query withFINAL
is the easiest way to deduplicate. Downsides: not being able to use Aggregation MVs from these Data Sources. Usually the fastest and more efficient way to deduplicate. - Snapshot based: if data freshness is not a hard requirement, you can generate periodic snapshots of the data and also take advantage of subsequent Materialized Views for rollups.
- Hybrid approach: when you need to overcome Engine approach limitations but freshness is also needed you can combine both approaches in a Lambda Architecture.
Note: for dimensional and small tables, probably a periodical full replace is the best alternative.
Example scenario¶
For this guide, imagine a dataset of a social media analytics company that wants to track some data -views, likes, tags— of content over time.
You receive an event with the latest info for each post, identified by post_id
. The three fields views
, likes
, tags
will vary from event to event.
post.ndjson
{ "timestamp": "2024-07-02T02:22:17", "post_id": 956, "views": 856875, "likes": 2321, "tags": "Sports" }
If you want to follow along, you can define the following .datasource
file locally (but you can also just read through to understand the different options). This file would create the schema to store the data, where every day total views for each post are appended:
posts_info.datasource
DESCRIPTION > Data Source to save post info SCHEMA > `post_id` Int32 `json:$.post_id`, `views` Int32 `json:$.views`, `likes` Int32 `json:$.likes`, `tag` String `json:$.tag`, `timestamp` DateTime `json:$.timestamp` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYear(timestamp)" ENGINE_SORTING_KEY "post_id, timestamp"
Selecting the right sorting key matters. If you're going to do lots of filtering by post_id
, to keep the scanned index range as small as possible it's better to sort first by post_id
and then by timestamp
. Read this resource or take Tinybird's free 'Principles of Real Time Analytics' course to learn how to better define and sort your indexes.
You can create a Tinybird Data Source by adding this definition file ^ to your Workspace and append mock data using this data generator in Mockingbird. Just remember to set your Host and a Token with the needed rights.
Deduplicate at query time¶
Imagine you're only interested in the latest value of views for each post. In that case, you can deduplicate data on post_id
and get the latest value with these strategies:
- Get the max date for each post in a subquery and then filter by its results.
- Group data by
post_id
and use the argMax function. - Use the LIMIT BY clause.
Select the Subquery
, argMax
, or the LIMIT BY
links below to see the example queries.
Deduplicating data on post_id using Subquery
SELECT * FROM posts_info WHERE (post_id, timestamp) IN ( SELECT post_id, max(timestamp) FROM posts_info GROUP BY post_id )
Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach will be faster than others.
In general, deduplicating at query time is fine if the size of your data is small. But if you have lots of data, the best option to make your query faster is to use a specific Engine that will take care of deduplication for you.
Use a ReplacingMergeTree Engine¶
If you have lots of data and you only care about the latest insertion for each unique key, you can use a ReplacingMergeTree engine. You need to use these three engine options to use ReplacingMergeTree
engines: ENGINE_SORTING_KEY
, ENGINE_VER
, and ENGINE_IS_DELETED
.
- Rows with the same
ENGINE_SORTING_KEY
will be deduplicated. You can select one or more columns. ENGINE_VER
can be omitted and in that case the last inserted row for each uniqueENGINE_SORTING_KEY
will be kept. If you specify it (the type ofENGINE_VER
has to beUInt*
,Date
orDateTime
), the row with the highestENGINE_VER
for each uniqueENGINE_SORTING_KEY
will be kept.ENGINE_IS_DELETED
is only enabled ifENGINE_VER
is used. This column is used to determine whether the row represents the state or is to be deleted;1
is a "deleted" row,0
is a "state" row. The type must beUInt8
.
Define a Data Source like this:
post_views_rmt.datasource
DESCRIPTION > Data Source to save post info. ReplacingMergeTree Engine. SCHEMA > `post_id` Int32 `json:$.post_id`, `views` Int32 `json:$.views`, `likes` Int32 `json:$.likes`, `tag` String `json:$.tag`, `timestamp` DateTime `json:$.timestamp`, `_is_deleted` UInt8 `json:$._is_deleted` ENGINE "ReplacingMergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "post_id" ENGINE_VER "timestamp" ENGINE_IS_DELETED "_is_deleted"
You can create the new Data Source by adding this new definition file to your project, with the same methods described above.
An important note from the ClickHouse docs:
"Data deduplication occurs only during a merge. Merging occurs in the background at an unknown time, so you can’t plan for it. Some of the data may remain unprocessed. Although you can run an unscheduled merge using the OPTIMIZE
query, do not count on using it, because the OPTIMIZE
query will read and write a large amount of data. Thus, ReplacingMergeTree
is suitable for clearing out duplicate data in the background in order to save space, but it does not guarantee the absence of duplicates."
Merging will happen in the background, most likely every 9-10 minutes, but if ClickHouse considers that you do not have enough data it will not happen as frequently quickly. So doing SELECT * FROM ...
will most likely give you duplicated rows (even when using a ReplacingMergeTree
engine) after an insertion, and you'll need to use an additional strategy to select only the last value at query time.
Easiest way to achieve this deduplication is adding the FINAL
to the query here, using the FINAL statement.
Deduplicating data on post_id using FINAL
SELECT * FROM posts_info_rmt FINAL
You can define the posts_info_rmt
as the landing Data Source —the one you send events to— or as a Materialized View from posts_info
.
And, following the MV path, it is possible to create a Data Source with an AggregatingMergeTree
Engine using maxState(ts)
and argMaxState(field,ts)
.
Snapshots¶
Sometimes you need other Sorting Keys that will change with updates, or need to do rollups and want to use Materialized Views, or simply response times are too big with a ReplacingMergeTree
. Luckily you have Copy Pipes that can take a query result and write it to a new Data Source.
post_generate_snapshot.pipe
NODE gen_snapshot SQL > SELECT post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag, max(timestamp) as ts, toStartOfMinute(now()) - INTERVAL 1 MINUTE as snapshot_ts FROM posts_info WHERE timestamp <= toStartOfMinute(now()) - INTERVAL 1 MINUTE GROUP BY post_id TYPE copy TARGET_DATASOURCE post_snapshot COPY_MODE replace COPY_SCHEDULE 0 * * * *
Note the WHERE
clause and the snapshot_ts
to stick to Best Practices.
Here for example we can use tag
as Sorting Key, since TARGET_DATASOURCE
of the Copy Pipe can be a regular MergeTree
.
post_snapshot.datasource
SCHEMA > `post_id` Int32, `views` Int32, `likes` Int32, `tag` String, `ts` DateTime, `snapshot_ts` DateTime ENGINE "MergeTree" ENGINE_PARTITION_KEY "" ENGINE_SORTING_KEY "tag, post_id"
Hybrid approach, Lambda architecture¶
Snapshots are great but it is true that you are sacrificing a bit of data freshness —or, if we run Copy Pipes too frequently they can be more expensive than MVs— so sometimes it is needed to combine batch and realtime processing. Reading the latest snapshot and incorporating the changes that happened since then.
This pattern is outlined in the Lambda Architecture guide, and you can see a practical example in the CDC using Lambda guide.
Using the post_snapshot
Data Source created before, the realtime Pipe would be something like:
latest_values.pipe
NODE get_latest_changes SQL > SELECT max(timestamp) last_ts, post_id, argMax(views, timestamp) views, argMax(likes, timestamp) likes, argMax(tag, timestamp) tag FROM posts_info_rmt WHERE timestamp > (SELECT max(snapshot_ts) FROM post_snapshot) GROUP BY post_id NODE get_snapshot SQL > SELECT last_ts, post_id, views, likes, tag FROM posts_info_rmt WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM post_snapshot) AND post_id NOT IN (SELECT post_id FROM get_latest_changes) NODE combine_both SQL > SELECT * FROM get_snapshot UNION ALL SELECT * FROM get_latest_changes
Next steps¶
- Read the Materialized Views docs.
- Read the Lambda Architecture guide.
- Got your data sorted out? Visualize it fast using Tinybird Charts.