How we processed 12 trillion rows during Black Friday
By Javi Santana
One month before Black Friday, one of our clients in the retail space asked us to host for them the real time analytics API that feeds the dashboards that everyone in the company uses to see what’s going on (and of course, make decisions based on it).
We knew it was tight but we specialize in running real-time stuff at scale: for example, while at Carto we served the Wall Street Journal frontpage map on 2012 during the election night, we hosted a website with a link from google.com’s home page (and a bunch of other countries) for 24 hours and some other high load events.
In an ideal world you would setup a Kafka cluster, a streaming system to materialize some metrics on some fast key value store like redis and serve them using a lightweight backend through an API. With that you would get likely thousands of requests per second without much hardware.
And while we can easily ingest Kafka streams, not every company works with streams, not all analytics can be precalculated easily and not all dashboards are simple counters.
In this case we had to deal with data from a transactional database. And the analytics needed to be flexible due to filtering requirements as well as different configurations that needed to be changed on the fly and reflected instantly on the dashboard.
The data input
In order to “stream” data from the transactional database to our system, the client set up 5 parallel threads sending once per second all the rows that changed during the last second. Sounds easy but it gets complicated - wait for it.
That means that even when just a column changes in an already existing row, the row needs to be sent again. And in a system like this, rows change quite a lot. Of course there are better ways to do this but for the use case it was good enough and allowed them to have high quality data (one of the requirements of the project)
We used our product for the setup. It looks like:
- An Nginx load balancer
- A Varnish behind Nginx
- Our Tinybird backend. It runs the API endpoints, the load balancing for Clickhouse replicas and also the ingestion part. It’s a Python application with small bits of C++ for critical request paths.
- A Clickhouse cluster
- A Zookeeper cluster for data replication within Clickhouse
It’s pretty simple (because we like to keep things as simple as possible) and using components we know pretty well. To be clear, in this case the amount of QPS was not going to be crazy high (less than 200QPS) so Nginx and Varnish acted just as routing tools with not so much load (more on this later) but any other load balancer would work pretty well too.
The initial numbers
The plan was to have between 50-100QPS (with 120 QPS peaks) with a response time under 150ms over datasets with some millions of rows (trying not to be too specific here on purpose). The queries were not simple: they needed several joins with tables which were changing quite often so we couldn’t precalculate too many things.
Around 5 ingestions happened per second with millions of rows ingested per minute.
Every hour, many millions of rows would be imported to resync last hours’ with changes that might had been missed in the original database.
The best way to run any analytics service is to serve static data. That’s the perfect world because it is inexpensive and fast as hell.
With dynamic data, what you want to do is to convert the data as it’s ingested into static data so that you only process the data once. That’s possible when the amount of combinations you have to generate is low. A static blog generator with a few posts a day is one example of this.
But when the amount of combinations is high you have to process the data as it comes in as much as you can so it’s fast to query, but taking into account the processing time should be low because you don’t want to add lag (if not, it would not be real-time)
Our product uses Clickhouse and that’s what we used for the data storage and queries. It’s almost perfect for this use case: it allows to run queries over large amounts of data pretty fast, the latency is low and scales horizontally and vertically.
So the plan was easy: using materialized views from Clickhouse, generating views to cover the most frequent use cases. Take into account that Clickhouse matviews are incremental - you don’t need to recalculate the whole view when new data is added. If you use Clickhouse and you are not using them you should take a look, you may save several orders or magnitude in query time.
But there is one big problem: as I said we need to run upserts because the data comes from a transactional database. And Clickhouse (like other databases in the same space) doesn’t deal with upserts in real time.
So what we did was to split the main table, the one with all the sales, in two: one (we call it RT) for the latest 30 minutes and another one for the rest (HISTORIC). Each table has the same materialized views but obviously they are different.
The RT one is really easy to work with because the amount of rows in there is not going to be crazy, so we can recalculate it completely to run the upserts in real time. It takes less than 1 second with several million rows to run all the upserts and recalculate all the mat views for that “small” table.
The HISTORIC one works as an incremental table and data older than 30 minutes is appended to the table. We don’t expect too many changes in rows after 30 minutes, that’s why we move data to this table.
For rows that change after 30 minutes we run an hourly process to update parts of the HISTORIC table and recalculate all the materialized views for that table, only for data that has changed.
The important part: we run this in several machines but taking into account we can never return inconsistent results, every operation needs to be aware of replication lag (pretty low I have to say) and run all the changes in an atomic way. This sounds easy but it isn’t; however, it is something that comes out of the box with Tinybird so we didn’t have to pay too much attention to it.
For the table schema we tried to use the smallest possible data types to save as much space as possible since everything should be in memory to be fast. Also we used LZ4 compression (default for Clickhouse) since it provides a good ratio between compression and decoding speed, so it saves quite a lot of memory bandwidth.
In order to optimize the ingestion process we also used one of the recently added features in Clickhouse, the in memory data parts, that lets you insert rows into a MergeTree table without actually syncing them to disk, which improves performance. All the tables were replicated (multi-master) so in case of a problem with the machine running the ingestion processes, we could switch to a different one without downtime. Data loss was really unlikely since the ingestion process would send the rows again in case of a failed insert.
Once you have the materialized views, the queries are easy: just regular SQL following the real time rules:
- The best data is the one you don’t have to read. Using Clickhouse indexing to skip non needed data is key
- Lightweight operations first: filtering always goes first.
- Replacing joins with “WHERE column IN (SELECT …)” when possible, generally much faster. Using transform function when possible.
- Monitoring and optimizing query times, but also minimising the amount of bytes scanned. We always aim to scan as few bytes as possible in the first steps of the query and then try optimize to reduce the times. We split queries in several chunks so we know how many resources each part of the query takes, see an example.
Also to be able to run queries over two tables without overlapping data, we used a simple but powerful technique: using a table to store the split date.
We have to run the Common Table Expression twice because its scope only covers one select, something that is being improved in Clickhouse as I write this.
The objective was to run all the queries under 150ms with just one core. This is important because there was a lot of concurrency and also we have to leave some spare cores to work on the ingestion process. In our standard setup the load balancer routes more read requests to the machines not ingesting data at that moment so query time percentiles stay low.
For the read load balancer we used a round-robin policy (all the requests take more or less the same time) but ideally the load balancer should try to route requests that use the same part of the data to the same machines to use caching as much as possible. In this case it wasn’t critical because the tables we were querying were not huge and we were running in large machines (32 cores, 64gb ram), so read tables fit in memory.
We also used Varnish to avoid duplicate requests. When running dashboards, it’s pretty common for many browsers to request the same url at the same time, so using Varnish grace mode we avoided extra work in the database.
Varnish was also ready for plan B: in case of an unexpected load peak well outside of our predictions, we could enable caching with a small TTL to save some requests to the database.
One final comment: we optimized queries and materialized views taking into account the distribution of calls to different endpoints. It’s much better to speed up an endpoint which is run 80% of the times than another one which is 2x slower.
And this sounds like obvious but developers sometimes forget about two things: basic maths and the big picture. If you are interested we published a blog post on how we optimize our endpoints based on basic statistics
Final numbers and results
Numbers after Black Friday days (4 in our case):
- We ingested +650B rows.
- All the queries read a total of 12,213,675,632,435 rows. This makes a 35M rows per second average. You can know how many rows a query reads using system.query_log table in Clickhouse. A tip about system tables: don’t forget to remove old data in those tables because under high load you could run out of disk pretty easily. If you don’t need system.query_thread_log just disable it.
- Median of 50QPS with peaks of 300QPS
- Avg API response time: 600ms. A little bit over what we planned, mainly because there was a query where we miss-calculated the number of times it was going to be called and we didn’t optimize it properly.
But not everything went smoothly (I hate when technical people write in their company blogs with just the good parts). In this case we had a small problem that led to a downtime: during one of the traffic peaks I broke one of the running scripts because I had two terminals open with vim and I made some test changes in my local file and saved. It was not my local machine. 😭
We use Ansible for this kind of thing but I like to ssh into the servers and run htop and some other monitoring tools.
But overall the system performed well all the time and the data architecture enabled us to scale pretty well without sharding or anything more complex.
We also tested other ways to handle the data stream but as you might guess I’m not going to write about them now. So you’ll have to subscribe to our mailing list and you’ll get notified when we publish it.