Tinybird Forward now includes sinks for Amazon S3, Google Cloud Storage, and Kafka. Sinks let you export processed data from your pipes to external destinations on a schedule or on-demand.
Tinybird Classic users will be familiar with S3 sinks, and Forward expands functionality to include two additional destinations with enhanced partitioning, multiple file formats, and local development support.
What are sinks?
Sinks export the results of your SQL queries to external systems. They work as an extension of Tinybird pipes - instead of exposing an API endpoint, a sink pipe writes query results to a file or stream.
Here's a basic sink that exports daily user metrics to S3:
-- daily_metrics.pipe
NODE user_summary
SQL >
SELECT
date,
user_id,
count(*) as page_views,
sum(session_duration) as total_session_time
FROM web_analytics
WHERE date = yesterday()
GROUP BY date, user_id
TYPE sink
EXPORT_CONNECTION_NAME "s3_warehouse"
EXPORT_BUCKET_URI "s3://analytics-bucket/daily-exports/"
EXPORT_FILE_TEMPLATE "user_metrics_{date}.csv"
EXPORT_FORMAT "csv"
EXPORT_SCHEDULE "0 2 * * *"
This sink runs every day at 2 AM, queries yesterday's web analytics data, and exports it as a CSV file to S3.
File partitioning and templates
Sinks support sophisticated file partitioning through template variables. You can partition by any column in your query results:
-- Partition by customer and date
EXPORT_FILE_TEMPLATE "exports/{customer_id}/metrics_{date}.parquet"
-- Time-based partitioning with custom formatting
EXPORT_FILE_TEMPLATE "data/{timestamp, '%Y/%m/%d'}/hourly_{timestamp, '%H'}.json"
-- Split large datasets into multiple files
EXPORT_FILE_TEMPLATE "bulk_export/part_{8}.csv"
For timestamp columns, format strings prevent creating too many files. Without formatting, a timestamp column could create thousands of files (one per second or millisecond). With {timestamp, '%Y-%m-%d'}
, you get one file per day instead.
The partitioning also supports combining strategies:
EXPORT_FILE_TEMPLATE "data/{customer_tier}/date={date}/hour={timestamp, '%H'}/part_{4}.parquet"
This creates a Hive-style partitioned structure that works well with tools like Athena, BigQuery, or Spark.
Supported formats and compression
Forward sinks support three file formats:
- CSV - Standard comma-separated values with gzip compression
- NDJSON - Newline-delimited JSON with gzip compression
- Parquet - Columnar format with snappy, gzip, lzo, brotli, lz4, or zstd compression
Format choice depends on your downstream systems. Parquet works well for data warehouses and analytics tools, while NDJSON is useful for event streaming and log processing.
Amazon S3 sinks
S3 sinks require an IAM role with permissions to write to your bucket:
# s3_warehouse.connection
TYPE s3
S3_REGION "us-east-1"
S3_ARN "arn:aws:iam::123456789012:role/TinybirdExportRole"
The IAM role needs these permissions:
s3:PutObject
ands3:PutObjectAcl
for writing filess3:ListBucket
ands3:GetBucketLocation
for bucket access
Here's a complete example that exports e-commerce order data with customer partitioning:
-- ecommerce_orders.pipe
NODE daily_orders
SQL >
SELECT
order_date,
customer_id,
order_id,
product_category,
revenue,
items_count
FROM orders
WHERE order_date = yesterday()
AND status = 'completed'
TYPE sink
EXPORT_CONNECTION_NAME "s3_warehouse"
EXPORT_BUCKET_URI "s3://ecommerce-data-lake/orders/"
EXPORT_FILE_TEMPLATE "year={order_date, '%Y'}/month={order_date, '%m'}/customer_{customer_id}.parquet"
EXPORT_FORMAT "parquet"
EXPORT_COMPRESSION "snappy"
EXPORT_SCHEDULE "0 1 * * *"
Google Cloud Storage sinks
GCS sinks work similarly to S3 but use Service Account authentication:
# gcs_warehouse.connection
TYPE gcs
GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT") }}
The Service Account needs these roles:
roles/storage.objectCreator
- Create objectsroles/storage.objectViewer
- Read objects and metadataroles/storage.legacyBucketReader
- List bucket contents
Here's an example that exports IoT sensor data to BigQuery-compatible partitions:
-- sensor_data_export.pipe
NODE sensor_readings
SQL >
SELECT
device_id,
sensor_type,
reading_value,
temperature,
humidity,
timestamp
FROM iot_sensors
WHERE timestamp >= subtractHours(now(), 1)
TYPE sink
EXPORT_CONNECTION_NAME "gcs_warehouse"
EXPORT_BUCKET_URI "gs://iot-analytics-bucket/sensor-data/"
EXPORT_FILE_TEMPLATE "device_type={sensor_type}/date={timestamp, '%Y-%m-%d'}/hour={timestamp, '%H'}/readings.parquet"
EXPORT_FORMAT "parquet"
EXPORT_COMPRESSION "gzip"
EXPORT_SCHEDULE "0 * * * *"
Kafka sinks
Kafka sinks enable streaming analytics-like capabilities in event-driven architectures:
# kafka_events.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS "broker1:9092,broker2:9092"
KAFKA_SECURITY_PROTOCOL "SASL_SSL"
KAFKA_SASL_MECHANISM "PLAIN"
KAFKA_KEY {{ tb_secret("KAFKA_KEY") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }}
Kafka sinks are particularly useful for triggering downstream processes. Here's an example that streams fraud alerts:
-- fraud_alerts.pipe
NODE suspicious_transactions
SQL >
SELECT
transaction_id,
user_id,
amount,
risk_score,
flagged_reasons,
transaction_timestamp
FROM transactions_with_risk_score
WHERE risk_score > 0.8
AND transaction_timestamp >= subtractMinutes(now(), 1)
TYPE sink
EXPORT_CONNECTION_NAME "kafka_alerts"
EXPORT_KAFKA_TOPIC "fraud_detection_alerts"
EXPORT_SCHEDULE "*/1 * * * *"
This sink runs every minute, finds high-risk transactions, and sends them to a Kafka topic where fraud detection services can consume them.
Real-world use cases
Data warehouse loading
Export processed analytics data to your data warehouse on a schedule. Many companies use this to replace complex ETL pipelines:
-- warehouse_daily_load.pipe
NODE daily_summary
SQL >
SELECT
date,
product_id,
sum(revenue) as daily_revenue,
count(distinct user_id) as unique_customers,
sum(quantity) as items_sold
FROM sales_events
WHERE date = yesterday()
GROUP BY date, product_id
TYPE sink
EXPORT_CONNECTION_NAME "snowflake_s3"
EXPORT_BUCKET_URI "s3://warehouse-staging/daily-loads/"
EXPORT_FILE_TEMPLATE "sales_summary/{date}/products.parquet"
EXPORT_FORMAT "parquet"
EXPORT_SCHEDULE "0 3 * * *"
Customer data exports
Provide customers with their usage data for compliance or transparency:
-- customer_data_export.pipe
NODE customer_usage
SQL >
SELECT *
FROM customer_analytics_view
WHERE customer_id = {{ String(customer_id, required=True) }}
AND date >= {{ Date(start_date, required=True) }}
AND date <= {{ Date(end_date, required=True) }}
TYPE sink
EXPORT_CONNECTION_NAME "s3_customer_exports"
EXPORT_BUCKET_URI "s3://customer-data-exports/"
EXPORT_FILE_TEMPLATE "customer_{customer_id}/usage_{start_date}_to_{end_date}.csv"
EXPORT_FORMAT "csv"
EXPORT_SCHEDULE "@on-demand"
Event streaming for microservices
Stream processed events to other systems using Kafka:
-- inventory_alerts.pipe
NODE low_stock_items
SQL >
SELECT
product_id,
current_stock,
reorder_threshold,
warehouse_location,
supplier_id
FROM inventory_levels
WHERE current_stock <= reorder_threshold
TYPE sink
EXPORT_CONNECTION_NAME "kafka_inventory"
EXPORT_KAFKA_TOPIC "inventory_alerts"
EXPORT_SCHEDULE "*/15 * * * *"
Scheduling and execution
Sinks support cron expressions for scheduling or @on-demand
for manual execution:
# Run every hour
EXPORT_SCHEDULE "0 * * * *"
# Run daily at 2:30 AM
EXPORT_SCHEDULE "30 2 * * *"
# Run every 15 minutes
EXPORT_SCHEDULE "*/15 * * * *"
# Manual execution only
EXPORT_SCHEDULE "@on-demand"
You can trigger on-demand sinks manually:
# Using the CLI
tb sink run customer_data_export --customer_id=123 --start_date=2024-01-01 --end_date=2024-01-31
# Using the Sink Pipes API
curl \
-X POST "https://api.tinybird.co/v0/pipes/p_test/sink" \
-H "Authorization: Bearer <PIPES:READ token>" \
-d "file_template=export_file" \
-d "format=csv" \
-d "compression=gz" \
-d "write_strategy=truncate" \
-d {key}={val}
Note that in local environments, scheduling is not supported; you can only run sinks on demand.
Observability
All sink operations are logged to Service Data Sources for monitoring:
-- Monitor sink performance
SELECT
pipe_id,
job_status,
duration_ms,
bytes_processed
FROM tinybird.sinks_ops_log
WHERE created_at >= yesterday()
ORDER BY created_at DESC
-- Track data transfer costs
SELECT
sum(bytes) / 1024 / 1024 / 1024 as gb_transferred,
sum(cost_usd) as total_cost
FROM tinybird.data_transfer
WHERE service = 'sink'
AND date >= startOfMonth(today())
Limits
Sinks have some limits including execution time, frequency, and memory. Refer to the Sinks limits documentation for more details on sinks limits for your workspace.
Getting started
Sinks are available today in Developer and Enterprise plans. To create your first sink:
- Set up a connection using
tb connection create s3|gcs|kafka
- Create a
.pipe
file withTYPE sink
and export configuration - Deploy with
tb deploy
- Monitor execution in the Tinybird UI or via Service Data Sources
Check the sinks documentation for detailed setup guides for each destination type and additional info about observability, billing, and limits.
Have questions or feedback?
Let us know if you get stuck or have feedback, including new sink destinations you'd like us to support. You can post feedback in the #feedback channel of our Slack community.