A few weeks ago, I saw a talk from Tesla claiming they were ingesting 1B rows per second using ClickHouse. I'm a petrolhead but I don't have any reason to think they are lying :). One (American) billion rows per second might feel like a lot, so let me try to explain how you can achieve that using ClickHouse. I'm not sure what ClickHouse flavor Tesla uses, but I don't think that's really important. I'm going to use the open source ClickHouse version for these tests.
Let me do first a super quick intro about ClickHouse architecture:
- ClickHouse clusters are made up of nodes which can be replicas and shards.
- Each shard stores a portion of the data. Sharding can be random or using any kind of rule (e.g., split by customer)
- Each shard has N replicas, a "copy" of the data in each node
- Coordination is done using Zookeeper (original Zookeeper or the ClickHouse keeper).
So data is sent to any of the replicas in each shard, ClickHouse replicates to all the replicas in that shard. When querying ClickHouse, the query is distributed using any replica in all the shards.
If you are familiar with ClickHouse you most likely already know. If not, you'll need to do some back and forth with ChatGPT, but the important part is that you have buckets and you put your data in any of them.
How do we ingest 1B rows per second?
It's actually "easy": Check how many rows a single node can ingest, then divide 1B by that number and you know how many you need.
Let's do a quick test. I'm not focusing on performance, it does not matter now, but just so you know I'm running all of this on my laptop (Macbook M4 Pro).
1. Create a simple table
I asked ChatGPT to create some sample:
CREATE TABLE otel_vehicle_metrics
(
time DateTime64(9),
resource LowCardinality(String),
scope LowCardinality(String),
metric_name LowCardinality(String),
metric_type LowCardinality(String),
value Float64,
attributes Map(LowCardinality(String), String)
)
ENGINE = MergeTree
ORDER BY (metric_name, time)
2. Generate 10M rows with sample data
Again, SQL generated by ChatGPT. I'll be using the Native format:
clickhouse client -q "SELECT
now64(9) - toIntervalSecond(rand() % 3600) AS time,
concat('VIN', toString(rand() % 10000)) AS resource,
'vehicle.telemetry' AS scope,
['vehicle_speed_kmh', 'battery_soc_percent', 'battery_temperature_celsius', 'motor_rpm'][1 + (rand() % 4)] AS metric_name,
'gauge' AS metric_type,
multiIf(metric_name = 'vehicle_speed_kmh', randUniform(0., 250.), metric_name = 'battery_soc_percent', randUniform(0., 100.), metric_name = 'battery_temperature_celsius', randUniform(-10., 60.), metric_name = 'motor_rpm', randUniform(0., 18000.), NULL) AS value,
map('drive_mode', ['autopilot', 'manual', 'park'][1 + (rand() % 3)], 'motor_id', ['front', 'rear'][1 + (rand() % 2)], 'battery_pack', ['main', 'aux'][1 + (rand() % 2)]) AS attributes
FROM numbers(10000000) format Native" > /tmp/10M.native
3. Insert it into the table
cat /tmp/10M.native| clickhouse client -t -q "insert into otel_vehicle_metrics format Native"
4.642
10M rows in 4.6 seconds, that's about 2.1M rows/s.
So if you want to get to 1B rows per second you just need a cluster with 500 shards. Ingestion scales linearly, if you add one more shard, you have 2.1M rows/s more capacity. If you optimize the ingested rows per second you can of course use way fewer nodes. Let's do it.
Sort the data before pushing it to ClickHouse
clickhouse client -q "SELECT ... FROM numbers(10000000) order by metric_name, time format Native" > /tmp/10M.sorted.native
➜ cat /tmp/10M.sorted.native| clickhouse client -t -q "insert into otel_vehicle_metrics format Native"
1.788
Now it's 1.788 seconds so 5.6M rows/s.
If we set some settings, like more insert_threads
, bigger parts, and so on, you get to a few more million per second:
➜ cat /tmp/10M.sorted.native| clickhouse client -t -q "insert into otel_vehicle_metrics settings min_insert_block_size_rows=150000,max_insert_threads=8 format Native"
1.128
So now it's 8.8M rows/s.
So, the hardware does matter, but also data preparation and settings. As I mentioned earlier, I'm running all of this on my M4 Pro, so ~113 shards, 16 CPU each would do the trick, which is pretty good for 1B rows/s. Remember these shards are totally independent, there is no coordination so it scales up linearly... on writes.
But what happens on reads?
It depends on how you distributed the data.
- If you did random distribution you need to query all the replicas (and that has an overhead)
- If you sharded by something, for example,
metric_name
in this case, you can pick just a few shards depending on the query you are running, so you don't have any coordination problem.
You have to take into account that the row size is pretty small (~126 bytes avg in my example data) and that makes the exercise of pushing more rows per second way easier. Reality is usually different, especially if you are ingesting logs or any other kind of wide row data. Good news is you can always add more shards. You can also try to be smart about picking efficient data types, something we forgot about since JSON started to be popular.
How to do this for real
The way I'm doing this is:
- Setup a 51-machine ClickHouse cluster in GCP (50 ClickHouse servers and 1 Zookeeper)
- Create a replicated database and a table inside it
- Run an insert into all the databases at the same time
- Measure
Setting up the cluster
I guess you can do the same in AWS with similar commands; I'm using GCP because it's cheaper than AWS for us.
First thing is to create the minimum config ClickHouse needs to run. It's basically 2 files, config.xml
and users.xml
. You can go super barebones with these ones but I added some extra config (will explain why). This is the file I'm using, basically I'm using S3 to store data (you can use local disks as well), /mnt/data
to store logs and metadata, and connecting to a ZK instance called tb-zk-000
. If you want to use it, just set the bucket path and credentials.
You can test this locally with Docker:
Create a network:
docker network create ch_net
Run Zookeeper (using ClickHouse keeper which is handier than having to install JVMs and everything):
docker run -it --name tb-zk-000 --network ch_net -v $(pwd):/cfg -e KEEPER_ID=1 -e KEEPER_HOST=tb-zk-000 clickhouse/clickhouse-server:25.6-alpine clickhouse keeper --config-file /cfg/config.xml bash
Run an instance (you can run many, just remove the -p
so ports don't collide and change the name to tch_XXX
):
mkdir -p replicas/tch_000
docker run -d \
--name tch_000 \
-p 8123:8123 \
-e KEEPER_ID=0 \
-e KEEPER_HOST=tch_000 \
-e CLICKHOUSE_CONFIG=/cfg/config.xml \
-v $(pwd):/cfg \
-v $(pwd)/replicas/tch_000:/mnt/disks/nvme \
--network ch_net \
clickhouse/clickhouse-server:25.6-alpine
If it works locally, there is a high probability it works in GCP. To run it in GCP...
Run zookeeper:
gcloud compute instances create tb-zk-000 --project=$PROJECT \
--zone=$ZONE \
--machine-type=c2d-highcpu-16 \
--local-ssd=interface=nvme \
--image-family=debian-11 \
--image-project=debian-cloud --boot-disk-type=pd-balanced \
--subnet=default \
--metadata-from-file=startup-script=./startup.sh,ch_config=../config.xml,ch_users=../users.xml \
--metadata keeper_id=1,bucket=tb_ch_storage
and the database cluster:
gcloud compute instances bulk create --project=$PROJECT \
--name-pattern=tch### \
--count=50 \
--zone=$ZONE \
--machine-type=c2d-highcpu-32 \
--local-ssd=interface=nvme \
--local-ssd=interface=nvme \
--image-family=debian-11 \
--image-project=debian-cloud --boot-disk-type=pd-balanced \
--subnet=default \
--metadata-from-file=startup-script=./startup.sh,ch_config=../config.xml,ch_users=../users.xml \
--metadata keeper_id=0,bucket=tb_ch_storage
The important parts:
- The script uses a
startup.sh
that's executed when the virtual machine wakes up - I'm setting the config files in the VM metadata
keeper_id=0
andbucket=tb_ch_storage
variables are set and used insidestartup.sh
. Whenkeeper_id == 1
the script runs CH keeper instead of ClickHouse
Let's analyze the startup.sh
script:
#!/bin/bash
# get metadata variables
BUCKET=$(curl http://metadata/computeMetadata/v1/instance/attributes/bucket -H "Metadata-Flavor: Google")
KEEPER_ID=$(curl http://metadata/computeMetadata/v1/instance/attributes/keeper_id -H "Metadata-Flavor: Google")
CH_CONFIG=$(curl http://metadata/computeMetadata/v1/instance/attributes/ch_config -H "Metadata-Flavor: Google")
CH_USERS=$(curl http://metadata/computeMetadata/v1/instance/attributes/ch_users -H "Metadata-Flavor: Google")
HOSTNAME=$(curl http://metadata/computeMetadata/v1/instance/hostname -H "Metadata-Flavor: Google")
# mount local nmve disk
mkfs.ext4 -m 0 -E lazy_itable_init=0,lazy_journal_init=0,discard /dev/nvme0n1
mkdir -p /mnt/disks/nvme
mount -o discard,defaults /dev/nvme0n1 /mnt/disks/nvme
# create clickhouse dirs
mkdir -p /mnt/disks/nvme/data
mkdir -p /mnt/disks/nvme/tmp
mkdir -p /mnt/disks/nvme/logs
mkdir -p /mnt/disks/nvme/caches
mkdir -p /mnt/disks/nvme/misc
echo $CH_CONFIG > config.xml
echo $CH_USERS > users.xml
mkdir -p config.d
# we used our Tinybird fork binary here, but you'd need to use clickhouse binary
curl https://clickhouse.com | sh
chmod +x clickhouse
# check to print the version
./clickhouse local -q "select hostname()"
# run keeper or server in daemon
if [[ "$KEEPER_ID" -eq 0 ]]; then
KEEPER_ID=$KEEPER_ID HOSTNAME=$HOSTNAME ./clickhouse server --config=/config.xml --daemon
else
KEEPER_ID=$KEEPER_ID HOSTNAME=$HOSTNAME ./clickhouse keeper --config=/config.xml --daemon
fi
After running this you should see 51 machines running by executing gcloud compute list
Creating the replicated database and the table
For this I created a small script called run
to help me run the same SQL in all the nodes. It's a for loop over all the nodes. The script was vibe coded but it's good enough. It basically does this:
for host in "${HOSTS[@]}"; do
run_query "$host" "$sql_query" &
done
After that, you can run
./run "create database flock Engine=Replicated('/flock', 'shard_name', concat('00',hostname()))"
This creates the replicated database in all the machines. If you don't want to do this you'd need to create a cluster inside config.xml and that's a pretty static thing (and boring). I don't like config files, I prefer things to be as dynamic as possible.
After that, you can connect to one of the replicas and create the table, it'll be created in all of them
/clickhouse client --host tch001 -q "CREATE TABLE flock.otel_vehicle_metrics
(
time DateTime64(9),
resource LowCardinality(String),
scope LowCardinality(String),
metric_name LowCardinality(String),
metric_type LowCardinality(String),
value Float64,
attributes Map(LowCardinality(String), String)
)
ENGINE = MergeTree
ORDER BY (metric_name, time) settings index_granularity=32000"
You don't need to run things with ON CLUSTER flock
as it's being run in a replicated database.
Inserting data
With our friend run
we do a local insert. Yes, I know I'm cheating a little bit as there is no network involved, but we'll talk about that later.
time ./run "insert into flock.otel_vehicle_metrics SELECT
now64(9) - toIntervalMillisecond(number*100) AS time,
concat('VIN', toString(rand() % 10000)) AS resource,
'vehicle.telemetry' AS scope,
['vehicle_speed_kmh', 'battery_soc_percent', 'battery_temperature_celsius', 'motor_rpm'][1 + (rand() % 4)] AS metric_name,
'gauge' AS metric_type,
multiIf(metric_name = 'vehicle_speed_kmh', randUniform(0., 250.), metric_name = 'battery_soc_percent', randUniform(0., 100.), metric_name = 'battery_temperature_celsius', randUniform(-10., 60.), metric_name = 'motor_rpm', randUniform(0., 18000.), NULL) AS value,
map('drive_mode', ['autopilot', 'manual', 'park'][1 + (rand() % 3)], 'motor_id', ['front', 'rear'][1 + (rand() % 2)], 'battery_pack', ['main', 'aux'][1 + (rand() % 2)]) AS attributes
FROM numbers_mt(1000000000) settings min_insert_block_size_rows=10000000,max_insert_threads=16,max_threads=32"
And that generates a few billion per second. If you run this query to check how many parts (and rows were generated over an interval of 5 seconds)
SELECT
toStartOfInterval(event_time, toIntervalSecond(5)) AS d,
formatReadableQuantity(sum(rows) / 5) AS rows_per_sec,
count() AS parts,
formatReadableQuantity(avg(rows)),
formatReadableSize(avg(bytes_uncompressed) AS c),
formatReadableSize(avg(size_in_bytes) AS u),
u / c AS ratio
FROM clusterAllReplicas('flock', system.part_log)
WHERE (`table` = 'otel_vehicle_metrics') AND (event_type = 'NewPart') AND (event_time > (now() - toIntervalMinute(15)))
GROUP BY d
ORDER BY d ASC
Query id: cf2c8529-88f1-4b49-b51c-e1f1a5ee970d
┌───────────────────d─┬─rows_per_sec───┬─parts─┬─formatReadab⋯(avg(rows))─┬─formatReadableSize(c)─┬─formatReadableSize(u)─┬───────────────ratio─┐
1. │ 2025-08-11 10:35:05 │ 278.33 million │ 394 │ 3.53 million │ 174.25 MiB │ 66.73 MiB │ 0.38293680156486515 │
2. │ 2025-08-11 10:35:10 │ 1.72 billion │ 2456 │ 3.51 million │ 172.91 MiB │ 66.21 MiB │ 0.38293604303851353 │
3. │ 2025-08-11 10:36:10 │ 1.58 billion │ 2234 │ 3.53 million │ 174.13 MiB │ 66.68 MiB │ 0.38293606257890894 │
4. │ 2025-08-11 10:36:15 │ 422.93 million │ 616 │ 3.43 million │ 169.35 MiB │ 64.85 MiB │ 0.38293641615781243 │
5. │ 2025-08-11 10:36:20 │ 248.66 million │ 352 │ 3.53 million │ 174.25 MiB │ 66.73 MiB │ 0.3829371637324703 │
6. │ 2025-08-11 10:36:25 │ 1.75 billion │ 2495 │ 3.51 million │ 172.93 MiB │ 66.22 MiB │ 0.38293613719074504 │
7. │ 2025-08-11 10:36:30 │ 2.12 million │ 3 │ 3.53 million │ 174.25 MiB │ 66.73 MiB │ 0.38294509835704965 │
8. │ 2025-08-11 10:36:35 │ 713.48 million │ 1010 │ 3.53 million │ 174.25 MiB │ 66.73 MiB │ 0.3829364075866938 │
9. │ 2025-08-11 10:36:40 │ 1.29 billion │ 1840 │ 3.50 million │ 172.47 MiB │ 66.04 MiB │ 0.38293654601840244 │
10. │ 2025-08-11 10:36:45 │ 330.60 million │ 468 │ 3.53 million │ 174.25 MiB │ 66.73 MiB │ 0.38293514124856226 │
11. │ 2025-08-11 10:36:50 │ 1.67 billion │ 2382 │ 3.50 million │ 172.87 MiB │ 66.20 MiB │ 0.38293538090566165 │
12. │ 2025-08-11 10:45:15 │ 1.20 billion │ 599 │ 10.01 million │ 493.65 MiB │ 189.03 MiB │ 0.3829176197112358 │
13. │ 2025-08-11 10:45:20 │ 801.09 million │ 401 │ 9.99 million │ 492.72 MiB │ 188.67 MiB │ 0.38291742453346606 │
14. │ 2025-08-11 10:45:25 │ 54.04 million │ 27 │ 10.01 million │ 493.65 MiB │ 189.03 MiB │ 0.38291605040348353 │
15. │ 2025-08-11 10:45:30 │ 1.95 billion │ 973 │ 10.00 million │ 493.27 MiB │ 188.88 MiB │ 0.3829176763096312 │
16. │ 2025-08-11 10:46:50 │ 1.29 billion │ 528 │ 12.20 million │ 601.81 MiB │ 230.44 MiB │ 0.3829154180964474 │
17. │ 2025-08-11 10:46:55 │ 711.65 million │ 272 │ 13.08 million │ 645.29 MiB │ 247.09 MiB │ 0.3829150648068673 │
└─────────────────────┴────────────────┴───────┴──────────────────────────┴───────────────────────┴───────────────────────┴─────────────────────┘
There you have the +1B rows per second. It's not stable but you get the point.
You can now query the data:
SELECT count()
FROM flock.otel_vehicle_metrics
┌────count()─┐
1. │ 1000000000 │ -- 1.00 billion
└────────────┘
But that's just one "shard". If you want to actually query all the tables you'd need to do a proper cluster with shards and everything, but you could do two tricks: using remote
or clusterAllReplicas
. If I do it:
SELECT count()
FROM clusterAllReplicas('flock', flock.otel_vehicle_metrics)
Query id: 49c66fc1-dc0d-4ec5-a23b-a1b965e2720f
┌─────count()─┐
1. │ 50000000000 │ -- 50.00 billion
└─────────────┘
A more complex query:
SELECT
metric_name,
count()
FROM clusterAllReplicas('flock', flock.otel_vehicle_metrics)
GROUP BY ALL
SETTINGS max_threads = 32
┌─metric_name─────────────────┬─────count()─┐
1. │ battery_soc_percent │ 12499942489 │
2. │ motor_rpm │ 12500036073 │
3. │ battery_temperature_celsius │ 12499969770 │
4. │ vehicle_speed_kmh │ 12500051668 │
└─────────────────────────────┴─────────────┘
And if you want to take everything down:
gcloud compute instances stop tb-zk-000 --project=$PROJECT --zone=$ZONE --discard-local-ssd=true --quiet
gcloud compute instances delete $(seq -f "tch%03g" 1 50) --project=$PROJECT --zone=$ZONE --quiet
Is that everything?
No, actually what I showed here is the easy part. There are a few things you need to design the cluster for.
Real-time ingestion
First, real-time data ingestion is hard, because you need to take into account many things:
- failures: inserts fail because of node overload, networking issues, zookeeper problems, and handling - with a dead letter queue or something similar - data that the database does not accept (that happens more often than you think).
- retries: when something fails you need to send the data again, but you need to be sure the last failure didn't write data so you don't duplicate it.
- deploys: handle database upgrades and so on.
- sharding: you need to split the data in different buckets.
- optimization: sorting the data before pushing into the database or converting from JSON/avro/whatever to a format ClickHouse is likely to not fail.
So you need an external system to manage that; the database is not doing that for you (and it shouldn't).
Balancing throughput, parts, and merges
The second thing - and this is ClickHouse-specific - you need to find a balance pushing data into the database, because maybe you can ingest 10-20M rows/s at peak, but ClickHouse needs to do some background processes after ingestion. The main one is the merge process. When you do an insert, ClickHouse writes a part. Those parts could be small, maybe 100-200MB, and you don't want to have 10,000 parts when running a query because it gets slow. The merge process happens all the time, and this takes a lot of resources. If you push too many times, you generate a lot of parts, and the ClickHouse node may not have enough time to merge them, so you end up with TOO_MANY_PARTS
error.
To avoid that, you need to balance the throughput, part size, and merge threads size according to the hardware you have.
For example, in the 50 machine setup I did, it's able to keep up at 400-500M rows/s. More than that, it starts to have a hard time, and inserts eventually fail because it's not capable of merging everything. This is how the cluster load looks after 15 minutes:
SELECT avg(value)
FROM clusterAllReplicas('flock', system.asynchronous_metrics)
WHERE metric = 'LoadAverage15'
Query id: 74132959-1bc9-45f1-8652-56b7aae573ac
┌─────────avg(value)─┐
1. │ 57.925200000000004 │
└────────────────────┘
AS you can see, that's not good for 32-core machines. It's interesting that the number of rows per second stabilizes even if I try to insert more rows... I guess insert throttling kicks in (due to parts_to_delay_insert
) because inserts get slowed down. So realistically, to get to a stable 1b rows/s, I'd need 40-50 more machines and likely another 10-20 if I want to be able to run some queries and have some room for sudden peaks.
Is this setup enough to ingest 1B per second, assuming you add 40-50 more machines? Unreliably, yes, but I think there's much more to it, so let's dig deeper into how I think Tesla is doing it
How I think Tesla is doing it
People usually put something before the database, and this is what the Tesla talk is about: they have Kafka and an ETL. They don't mention what it actually does in detail, but Alon says "consume from Kafka, batches and push to ClickHouse". I guess on top of that, they probably split data for the shards (I do think the data is likely sharded already in Kafka), sort and transform the data so it's faster to ingest in ClickHouse, push one partition at a time, handle retries to avoid duplicated data, handle backpressure, and so on. The fact that they managed to push a stable 1B rows/s for days means they have done a pretty good job here.
It's likely they are spending more money on the Kafka + ETL than the actual ClickHouse cluster. I don't know anything about their stack, but Redpanda seems like the ideal candidate for the buffering layer (more cost effective than other Kafka implementations).
The ClickHouse cluster is probably divided into multiple shards, each shard with its own Zookeeper cluster and multiple replicas, most likely some kind of load balancer in front of the replicas to route read queries, and some kind of more advanced system on top of it so each query is routed to the right cluster (or even mix different clusters if needed using remote/distributed). So, for example, when someone uses their fleet API it can go to the right shard(s) (not sure if they use that cluster for that, but it could perfectly well be).
If they want to be cost effective they should be using hot/cold, local SSD disks for the short term data, merges and cloud storage for historical data using zero-copy for the storage (which is not supported anymore by the open source version). Looking at the greetings in the talk, most of the people in there are ClickHouse core members, so I guess they are using ClickHouse cloud internal fork with a lot of tweaks.
This would be the architecture diagram:
│
│
│ WRITES
│
│
│
│
┌──────────────────────────────────────────────▼─────────────────────────────────────────────────┐
│ Kafka │
│ │
│ ┌──────────────────────────┐ ┌──────────────────────────┐ ┌──────────────────────────┐ │
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │
│ │ topics shard 1 │ │ topics shard 2 │ │ topics shard N │ │
│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │
│ └────────┬─────────────────┘ └─────────┬────────────────┘ └───────────────┬──────────┘ │
│ │ │ │ │
└───────────┼────────────────────────────────┼──────────────────────────────────────┼────────────┘
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
┌───────────▼──────────┐ ┌──────────▼ ──────────┐ ┌─────── ──▼ ──────────┐
│ │ │ │ │ │
│ │ │ │ │ │
│ Real time ETL │ │ Real time ETL │ │ Real time ETL │
│ │ │ │ │ │
│ │ │ │ │ │
└───────────┬──────────┘ └──────────┬───────────┘ └──────────┬───────────┘
│ │ │
│ │ │
│ │ │
│ │ │
┌───────────▼──────────┐ ┌──────────▼───────────┐ ┌───────────▼──────────┐
│ shard 1 │ │ shard 2 │ │ shard N │
│ │ │ │ │ │
│ ┌──────────────┐ │ │ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ replica 1 │ │ │ │ replica 1 │ │ │ │ replica 1 │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └──────────────┘ │ │ └──────────────┘ │ │ └──────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌──────────────┐ │ │ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ replica 2 │ │ │ │ replica 2 │ │ │ │ replica 2 │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └──────────────┘ │ │ └──────────────┘ │ │ └──────────────┘ │
│ │ │ │ │ │
│ │ │ │ │ │
│ ┌──────────────┐ │ │ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ replica N │ │ │ │ replica N │ │ │ │ replica N │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ └──────────────┘ ◄───┐ ┌───► └──────────────┘ │ ┌─────► └──────────────┘ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
└──────────────────────┘ │ │ └──────────────────────┘ │ └──────────────────────┘
│ │ │ │ │ │
┌──────────────────────┐ │ │ ┌──────────────────────┐ │ ┌──────────────────────┐
│ │ │ │ │ │ │ │ │
│ zookeper cluster │ │ │ │ zookeper cluster │ │ │ zookeper cluster │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
└──────────────────────┘ │ │ └──────────────────────┘ │ └──────────────────────┘
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ │
│ │ ┌─────────────────────┐ │
│ │ │ │ │
│ │ │ │ │
│ └───┼ │ │
└──────┤ Load balancer ├─────────┘
│ │
│ │
└──────────▲──────────┘
│
│
│
│ READS
│
│
│
Is that everything (again)?
No. While doing this test, I wanted to check if our current architecture at Tinybird (N replicas, 1 shard) would work to ingest 1B rows/s. So I tried it, and while there are a few problems with this approach at that scale, I think it's worth exploring because it shows you some of the bottlenecks when scaling ClickHouse, and it's still a pretty good architecture for loads under 100M rows/s.
I'll follow up in a different post so we can go into detail with cluster setup, configuration, and problems. These kinds of setups have benefits over the sharding one (and I'd stick to the sharded architecture for 1B rows/s). If you want to get notified when I publish the next one, you should sign up to get our newsletter