"Big data" means different things in different contexts. For this post, it means datasets large enough that a single server can't hold them in memory, query patterns that scan billions to hundreds of billions of rows at a time, and ingestion rates that sustain millions of rows per second continuously.
ClickHouse® is purpose-built for this workload. The ClickHouse academic overview describes it as "a popular open-source OLAP database designed for high-performance analytics over petabyte-scale datasets with high ingestion rates." At large scale, ClickHouse combines a columnar storage format based on LSM trees with a vectorized query engine and a distributed execution model that scales horizontally across nodes.
This post covers the architecture decisions, schema patterns, and query techniques that matter specifically at scale, from hundreds of billions of rows to petabytes.
How ClickHouse scales
Compute-storage separation
In ClickHouse Cloud, table data lives in shared object storage (S3-compatible). Stateless compute nodes access this shared storage without fixed shard ownership. Scaling out adds more compute without resharding or migrating data.
According to ClickHouse's top cloud data warehouses comparison, this architecture lets ClickHouse Cloud deliver "sub-second responses at 1,000+ queries per second" while handling warehouse-scale workloads. It also provides a cleaner path between development and production than traditional sharded deployments, where shard count decisions made early are difficult to undo later without a full table rebuild.
Index sharding at petabyte scale
At truly massive scale, the primary key index itself becomes a memory problem. As detailed in the ClickHouse index sharding announcement, at hundreds of billions of rows, the primary key alone can reach 100-400 GiB per replica. With secondary indexes (bloom filters, full-text, vector search) on top, memory that could be used for query execution is consumed by redundant index copies across every replica.
Index sharding distributes the analysis of primary and secondary indexes across replicas rather than loading the full index on each node. Benchmarks on a 50 billion row table show 4.3x faster primary key range queries and 7.7x faster bloom filter lookups. It activates automatically once indexes exceed ~1GB, requiring no configuration changes.
Multi-stage distributed execution
For join-heavy queries and high-cardinality aggregations across very large tables, ClickHouse Cloud introduced multi-stage distributed query execution. Rather than the classic single-node aggregation plus distributed fan-out, it splits the query plan into stages running in parallel across worker nodes. Between stages, exchange operators repartition intermediate results.
Early TPC-H benchmarks at scale factor 100 show 2x speedup on 8 nodes vs. 1 node, with 7.4x near-linear aggregation scaling. For queries that previously required manual sharding key design, multi-stage execution handles the distribution automatically.
Schema design at scale
Partition and sort key selection
At petabyte scale, the partition and sort key choices determine whether queries scan gigabytes or petabytes. The rule is: put your most selective filter column first in ORDER BY, then time, then secondary filters:
CREATE TABLE events
(
tenant_id String,
timestamp DateTime64(3),
event_type LowCardinality(String),
user_id String,
session_id String,
properties String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (tenant_id, toStartOfHour(timestamp), event_type, user_id);
A query filtering on tenant_id and a time range touches only the rows for that tenant within the matching partitions. At petabyte scale this is the difference between scanning 100MB and scanning 10TB. The sort key selectivity is what makes ClickHouse fast at scale. A sort key that doesn't match your query patterns means full partition scans regardless of how fast the engine is.
Secondary indexes for high-cardinality lookups. For columns not in the sort key but frequently filtered, bloom filter indexes can skip granules efficiently:
ALTER TABLE events ADD INDEX idx_session_id session_id TYPE bloom_filter GRANULARITY 1;
Only add secondary indexes where you have a concrete query pattern that benefits from them. Bloom filters on low-selectivity filters or columns already in the sort key add storage overhead without benefit.
Tiered storage with TTL
Most big data workloads have hot and cold access patterns: recent data is queried often, older data rarely. ClickHouse's TTL rules move data between storage tiers automatically:
ALTER TABLE events
MODIFY TTL
timestamp + INTERVAL 30 DAY TO VOLUME 'warm',
timestamp + INTERVAL 1 YEAR TO VOLUME 'cold',
timestamp + INTERVAL 5 YEAR DELETE;
Recent data stays on NVMe for fast scans. Data older than 30 days moves to higher-latency object storage. Data older than 5 years is deleted. No cron jobs, no manual archiving, no risk of forgetting to clean up old partitions.
Sampling for interactive exploration
At hundreds of billions of rows, even a fast analytical query can take seconds. Use SAMPLE to get representative subsets for exploratory queries:
-- Sample 0.1% for fast exploration
SELECT
event_type,
count() * 1000 AS estimated_total,
uniq(user_id) AS approx_unique_users
FROM events SAMPLE 0.001
WHERE timestamp >= now() - INTERVAL 90 DAY
GROUP BY event_type
ORDER BY estimated_total DESC;
Results scale linearly. This pattern is how you explore a 500-billion-row dataset in under a second before deciding which query to run against the full table. For dashboards where approximate counts are acceptable, sample-based queries can be 100x faster than full scans.
Ingestion at scale
ClickHouse performs best with inserts of 1,000 to 100,000 rows per batch. Inserting row by row creates a new data part for every insert, which overwhelms the background merge scheduler and causes "too many parts" errors that block further inserts.
For large-scale streaming ingestion, the Kafka engine is the standard pattern. As documented in how to stream Kafka to ClickHouse, the three-part setup (Kafka engine table, materialized view, MergeTree destination) means ClickHouse consumes from Kafka automatically with no application-side batching code required. The Kafka engine handles consumer group management, offset tracking, and retry logic.
For bulk loading from object storage, the S3 table function reads Parquet or CSV files directly:
INSERT INTO events
SELECT *
FROM s3(
'https://s3.us-east-1.amazonaws.com/data-lake/events/2026/*.parquet',
'AWSAccessKey',
'AWSSecret',
'Parquet'
);
This is how you backfill historical data into ClickHouse without intermediate staging. For petabyte-scale backfills, run this in parallel across date partitions using multiple INSERT statements, each targeting a specific date range.
Query patterns for billion-row tables
Approximate aggregations
At scale, exact distinct counts are expensive. uniq() (HyperLogLog, ~2% error) and quantile() (T-Digest, approximate percentiles) cover most analytics use cases at a fraction of the cost:
SELECT
toStartOfDay(timestamp) AS day,
event_type,
count() AS events,
uniq(user_id) AS approx_unique_users,
quantile(0.50)(duration_ms) AS p50_duration,
quantile(0.95)(duration_ms) AS p95_duration,
quantile(0.99)(duration_ms) AS p99_duration
FROM events
WHERE timestamp >= now() - INTERVAL 30 DAY
GROUP BY day, event_type
ORDER BY day;
For billing or compliance where exactness is required, use uniqExact() and quantileExact(), and expect the query to take significantly longer. Most product analytics and operational monitoring use cases are well-served by the approximate versions.
Pre-aggregated rollup tables
For dashboards and APIs serving large numbers of queries, pre-aggregate at insert time rather than computing at query time. Hourly and daily rollup tables fed by materialized views mean dashboard queries scan pre-aggregated rows regardless of the underlying table size:
CREATE TABLE events_daily
(
day Date,
tenant_id String,
event_type LowCardinality(String),
events UInt64,
unique_users AggregateFunction(uniq, String),
revenue AggregateFunction(sum, Float64)
)
ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMM(day)
ORDER BY (tenant_id, day, event_type);
A 90-day dashboard query now scans at most 90 rows per tenant/event-type combination, regardless of whether the underlying event table has 1 billion or 1 trillion rows. The rollup table size stays bounded even as raw data grows indefinitely.
Using PREWHERE at scale
At petabyte scale, PREWHERE makes a measurable difference. It reads the filter column first, identifies matching rows, then reads remaining columns only for those rows:
SELECT tenant_id, count(), uniq(user_id)
FROM events
PREWHERE event_type = 'conversion'
WHERE timestamp >= now() - INTERVAL 7 DAY
GROUP BY tenant_id;
On a wide event table with a selective event_type filter, PREWHERE can reduce I/O by 80% or more compared to WHERE alone. ClickHouse applies this optimization automatically in many cases, but explicitly using PREWHERE ensures it applies when you have a highly selective filter on a non-sort-key column.
Monitoring merge health at scale
A healthy ClickHouse deployment keeps the number of active parts per partition low. Too many parts means the merge scheduler is falling behind ingestion, which degrades both insert and query performance. Monitor with:
SELECT
database,
table,
partition,
count() AS part_count,
sum(rows) AS total_rows,
sum(bytes_on_disk) / 1e9 AS size_gb
FROM system.parts
WHERE active = 1
GROUP BY database, table, partition
HAVING part_count > 100
ORDER BY part_count DESC;
If any partition shows more than 300 active parts, you have a merge backlog. Reduce insert frequency, increase batch size, or check system.merges to see if merges are stalled. At petabyte scale, merge backlog is one of the most common performance degradation patterns.
When to use managed ClickHouse
Running ClickHouse at petabyte scale in production means: provisioning and sizing clusters, configuring replication and ZooKeeper/Keeper quorums, managing rolling upgrades without downtime, monitoring merge backlogs, and tuning memory limits. Self-hosted ClickHouse cost grows fast when you factor in the dedicated engineering time.
Tinybird is managed ClickHouse with analytics at scale built in. The Events API ingests at millions of rows per second without batching code on your side. SQL Pipes become parameterized HTTP endpoints. Tinybird handles cluster scaling, replication, and upgrades. You write SQL and ship.
