Deduplication Strategies in ClickHouse¶
OLAP databases like ClickHouse are optimized for fast ingestion and, for that to work, some trade-offs have to be made. One of them is the lack of unique constraints, since enforcing them would add a big overhead and make ingestion speeds too slow for what’s expected from a database of this kind.
Nonetheless, there will be lots of times when you will have duplicated data and you will want to get rid of it, or just access the latest data point available. There are several ways to go about it, and the general workflow will be ingesting all of the data first, including duplicates, and dealing with them later.
Typically, there are two use-cases where you’ll end up with duplicated data:
Upserts. An upsert is an operation that inserts rows into a database table if they do not already exist, or updates them if they do. On databases like Postgres that’d be accomplished with ON CONFLICT DO INSERT clauses, but as ClickHouse doesn’t enforce uniqueness of primary keys, such clauses aren’t supported either. The way to do upserts on ClickHouse is with a ReplacingMergeTree engine. More on this later.
Constant ingestions and historical data. Imagine you are periodically dumping data from your transactional database to ClickHouse to run analytical queries on it in real-time, like described here. You’d end up with lots of rows inserted at different times with the same primary key. You may want to get only the latest data point - in this case you can get rid of the rest and treat them like upserts. But there are also use-cases where you want to keep all the data to have a historic record of the evolution of the attributes of an object over time.
A real-world example¶
We’ve created a dataset that resembles what a social media analytics company would have to track page views of posts over time. It has data for about 10000 posts over every day of a year. It looks like this:
We’ll use the same dataset to explain what’s the best way to deduplicate data by post to do last-point analytics on it, for the two use cases we’ve described before: upserts and historical data.
Let’s first define a Data Source like this to store the data, where every day you’d append the total views for each post for that day.
Selecting the right sorting keys:
If you’re going to do lots of filtering by
post_id, to keep the scanned index range as small as possible it’s better to sort first by
post_id and then by
date. Read this or Take our ‘Principles of Real Time Analytics’ free course to learn more about how to define the sorting of your indexes better
Now create the data source and append data to it.
Then, there are basically four strategies to deduplicate data:
Doing it at query time
ReplacingMergeTreeengine (you’ll also have to use another one on top of it because the deduplication process is asynchronous and there will be duplicated data after insertions)
Using Materialized Views
These solutions are ordered from more simple to more complex ones.
The bigger your data is, the more likely it is that you’ll need to use a more complex solution in order to have really good performance.
Deduplicating on query time¶
Imagine you are interested only in the latest value of views for each post. In that case, you can deduplicate data on
post_id and get the latest value with these strategies:
Get the max date for each post in a subquery and then filter by its results
Group data by
post_idand use the argMax function
Use the LIMIT BY clause
Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach will be faster than others.
In general, deduplicating at query time will be fine if the size of your data is small. But if you have lots of data, the best option to make your query faster will be using a Materialized View to pre-compute the latest value of views for each
post_id. We’ll show you how to do that in the next section.
If you have lots of data and you only care about the latest insertion for each unique key, you can use a ReplacingMergeTree engine. You need to use these two engine options to use
Rows with the same
engine_sorting_keywill be deduplicated. You can select one or more columns.
engine_vercan be omitted and in that case the last inserted row for each unique
engine_sorting_keywill be kept. If you specify it (the type of
engine_verhas to be
DateTime), the row with the highest
engine_verfor each unique
engine_sorting_keywill be kept.
Define a data source like this
and then you’d push it to Tinybird with the CLI (using Docker here) and append data to it as we did before:
An important note from the docs:
“Data deduplication occurs only during a merge. Merging occurs in the background at an unknown time, so you can’t plan for it. Some of the data may remain unprocessed. Thus, ReplacingMergeTree is suitable for clearing out duplicate data in the background in order to save space, but it doesn’t guarantee the absence of duplicates.”
Frequency of merges
Merging will happen in the background, most likely every 9-10 minutes, but if ClickHouse considers that you don’t have enough data it won’t happen. So doing
select * from ... will most likely give you duplicated rows (even when using a ReplacingMergeTree engine) after an insertion, and you’ll need to use some other strategy to select only the last value on query time.
Note also that we used a new query here, using the FINAL modifier. It merges the data on query time and is not generally recommended because it will usually make queries slower.
By the time you’re reading this, mostly likely the engine will have already performed the deduplication and the above queries will be very fast. Until that happens, their performance will be the same as if you were using a
MergeTree engine with all the duplicated data, as in the first queries. After inserting data, the performance of the query using
FINAL will be similar to the one using
LIMIT BY, and the one using the subquery will be 5-10X faster, based on our tests.
Using materialized views¶
You could create a Materialized View (MV) to store the last state of your data if:
Your data is quite big. You’ll get there fast if the resolution of your data is hourly or lower instead of daily and you save data for 10x or 100x the number of posts here.
You also want to keep a historical record, without discarding old data - this is key. If you don’t care about historical data you don’t need the MV at all, a
ReplacingMergeTreeis all you need.
You want to be able to make last-point analytics (like we’ve done before) very fast, in the millisecond range.
To create a MV you just need:
an origin Data Source, that you’ll use to ingest the data.
posts_views_historical.datasource, in this case
a destination Data Source that partially aggregates the data
a transformation pipe that connects both Data Sources and performs the final aggregation.
posts_views_latest_agg.datasource, the destination data source:
And this is the transformation pipe to connect both data sources:
Push the new data source and the MV like this:
Finally, this query computes the latest point computing all the aggregations.
AggregatingMergeTree will work fine when you can filter the data and therefore, you do not need to read much information. This is not always possible, there are situations when we need to run queries over big amounts of data that we need to be deduplicated.
Let’s imagine we want to sync our MySQL database with Tinybird in order to speed up the analytical queries and generate some endpoints for our back office. We want to be as real-time possible, so follow a change data capture (CDC) approach. To simplify, syncer will be generating CSVs and pushing them to TInybird, but we could be using a Debezium + Kafka approach as well.
Once our syncer is integrated, let’s take a look at one of our tables we want to sync, the
In order to generate the same information in Tinybird, we are going to need to create two data sources:
users_landing.datasourcewhere we append all the changes that are happening in the MySQL table. We will create a row for each insert, update or delete that our MySQL table generates.
users.datasourcewhere we will be storing every snapshot we take. We will generate a new snapshot by composing all the changes and merging them with the previous snapshot.
users_landing.datasource will have the same fields that we have in our MySQL table with two extra columns:
operationwill indicate what kind of operation happened to the row (INSERT, UPDATE, DELETE).
inserted_atwill indicate when the row was inserted in the data source. We will use this column to know if it is already inside a snapshot or not.
users.datasource will have the same fields that we have in our MySQL table with an extra column,
snapshot_time. In this table we are going to be storing every snapshot, so this column will help us to identify the latest snapshot that we want to use.
Now that we have both data sources created, we need a materialized view ( MV ) to connect them.
To learn more about defining MVs with Tinybird, check out our docs.
Other interesting resources:
Everything you should know about Materialized Views, by Denny Crane. A 40-page extensive manual on all the in-and-outs of MVs on ClickHouse
ClickHouse continues to crush time series, by Alexander Zaitsev. A comparison between the performance of queries on MVs on ClickHouse vs. the same queries on time-series specific databases. ClickHouse wins by a big margin
Doing log analytics at scale on NGINX logs, by Javi Santana. With a section on how to speed up the queries using MVs.