---
title: "Sinks: Export your data to S3, GCS, and Kafka"
excerpt: "Export processed data from Tinybird pipes to S3, GCS, or Kafka -- on a schedule or on demand, in any format, with full control."
authors: "Joe Krawiec"
categories: "Product updates"
createdOn: "2025-06-25 10:00:00"
publishedOn: "2025-06-26 10:00:00"
updatedOn: "2025-06-26 10:00:00"
status: "published"
---

Tinybird Forward now includes sinks for Amazon S3, Google Cloud Storage, and Kafka. Sinks let you export processed data from your pipes to external destinations on a schedule or on-demand.

Tinybird Classic users will be familiar with S3 sinks, and Forward expands functionality to include two additional destinations with enhanced partitioning, multiple file formats, and local development support.

{% html %}
<iframe width="560" height="315" src="https://www.youtube.com/embed/BxK0J6sZbbM?si=2Fl9tMVcIiklUQQr" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen></iframe>
{% /html %}

## What are sinks?

Sinks export the results of your SQL queries to external systems. They work as an extension of Tinybird pipes - instead of exposing an API endpoint, a sink pipe writes query results to a file or stream.

Here's a basic sink that exports daily user metrics to S3:

```sql
-- daily_metrics.pipe
NODE user_summary
SQL >
    SELECT 
        date,
        user_id,
        count(*) as page_views,
        sum(session_duration) as total_session_time
    FROM web_analytics
    WHERE date = yesterday()
    GROUP BY date, user_id

TYPE sink
EXPORT_CONNECTION_NAME "s3_warehouse"
EXPORT_BUCKET_URI "s3://analytics-bucket/daily-exports/"
EXPORT_FILE_TEMPLATE "user_metrics_{date}.csv"
EXPORT_FORMAT "csv"
EXPORT_SCHEDULE "0 2 * * *"
```

This sink runs every day at 2 AM, queries yesterday's web analytics data, and exports it as a CSV file to S3.

## File partitioning and templates

Sinks support sophisticated file partitioning through template variables. You can partition by any column in your query results:

```sql
-- Partition by customer and date
EXPORT_FILE_TEMPLATE "exports/{customer_id}/metrics_{date}.parquet"

-- Time-based partitioning with custom formatting
EXPORT_FILE_TEMPLATE "data/{timestamp, '%Y/%m/%d'}/hourly_{timestamp, '%H'}.json"

-- Split large datasets into multiple files
EXPORT_FILE_TEMPLATE "bulk_export/part_{8}.csv"
```

For timestamp columns, format strings prevent creating too many files. Without formatting, a timestamp column could create thousands of files (one per second or millisecond). With `{timestamp, '%Y-%m-%d'}`, you get one file per day instead.

The partitioning also supports combining strategies:

```sql
EXPORT_FILE_TEMPLATE "data/{customer_tier}/date={date}/hour={timestamp, '%H'}/part_{4}.parquet"
```

This creates a Hive-style partitioned structure that works well with tools like Athena, BigQuery, or Spark.

## Supported formats and compression

Forward sinks support three file formats:

- **CSV** - Standard comma-separated values with gzip compression
- **NDJSON** - Newline-delimited JSON with gzip compression  
- **Parquet** - Columnar format with snappy, gzip, lzo, brotli, lz4, or zstd compression

Format choice depends on your downstream systems. Parquet works well for data warehouses and analytics tools, while NDJSON is useful for event streaming and log processing.

## Amazon S3 sinks

S3 sinks require an IAM role with permissions to write to your bucket:

```bash
# s3_warehouse.connection
TYPE s3
S3_REGION "us-east-1"
S3_ARN "arn:aws:iam::123456789012:role/TinybirdExportRole"
```

The IAM role needs these permissions:

- `s3:PutObject` and `s3:PutObjectAcl` for writing files
- `s3:ListBucket` and `s3:GetBucketLocation` for bucket access

Here's a complete example that exports e-commerce order data with customer partitioning:

```sql
-- ecommerce_orders.pipe
NODE daily_orders
SQL >
    SELECT 
        order_date,
        customer_id,
        order_id,
        product_category,
        revenue,
        items_count
    FROM orders
    WHERE order_date = yesterday()
    AND status = 'completed'

TYPE sink
EXPORT_CONNECTION_NAME "s3_warehouse"
EXPORT_BUCKET_URI "s3://ecommerce-data-lake/orders/"
EXPORT_FILE_TEMPLATE "year={order_date, '%Y'}/month={order_date, '%m'}/customer_{customer_id}.parquet"
EXPORT_FORMAT "parquet"
EXPORT_COMPRESSION "snappy"
EXPORT_SCHEDULE "0 1 * * *"
```

## Google Cloud Storage sinks

GCS sinks work similarly to S3 but use Service Account authentication:

```bash
# gcs_warehouse.connection
TYPE gcs
GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT") }}
```

The Service Account needs these roles:

- `roles/storage.objectCreator` - Create objects
- `roles/storage.objectViewer` - Read objects and metadata
- `roles/storage.legacyBucketReader` - List bucket contents

Here's an example that exports IoT sensor data to BigQuery-compatible partitions:

```sql
-- sensor_data_export.pipe
NODE sensor_readings
SQL >
    SELECT 
        device_id,
        sensor_type,
        reading_value,
        temperature,
        humidity,
        timestamp
    FROM iot_sensors
    WHERE timestamp >= subtractHours(now(), 1)

TYPE sink
EXPORT_CONNECTION_NAME "gcs_warehouse"
EXPORT_BUCKET_URI "gs://iot-analytics-bucket/sensor-data/"
EXPORT_FILE_TEMPLATE "device_type={sensor_type}/date={timestamp, '%Y-%m-%d'}/hour={timestamp, '%H'}/readings.parquet"
EXPORT_FORMAT "parquet"
EXPORT_COMPRESSION "gzip"
EXPORT_SCHEDULE "0 * * * *"
```

## Kafka sinks

Kafka sinks enable streaming analytics-like capabilities in event-driven architectures:

```bash
# kafka_events.connection
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS "broker1:9092,broker2:9092"
KAFKA_SECURITY_PROTOCOL "SASL_SSL"
KAFKA_SASL_MECHANISM "PLAIN"
KAFKA_KEY {{ tb_secret("KAFKA_KEY") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET") }}
```

Kafka sinks are particularly useful for triggering downstream processes. Here's an example that streams fraud alerts:

```sql
-- fraud_alerts.pipe
NODE suspicious_transactions
SQL >
    SELECT 
        transaction_id,
        user_id,
        amount,
        risk_score,
        flagged_reasons,
        transaction_timestamp
    FROM transactions_with_risk_score
    WHERE risk_score > 0.8
    AND transaction_timestamp >= subtractMinutes(now(), 1)

TYPE sink
EXPORT_CONNECTION_NAME "kafka_alerts"
EXPORT_KAFKA_TOPIC "fraud_detection_alerts"
EXPORT_SCHEDULE "*/1 * * * *"
```

This sink runs every minute, finds high-risk transactions, and sends them to a Kafka topic where fraud detection services can consume them.

## Real-world use cases

### Data warehouse loading

Export processed analytics data to your data warehouse on a schedule. Many companies use this to replace complex ETL pipelines:

```sql
-- warehouse_daily_load.pipe
NODE daily_summary
SQL >
    SELECT 
        date,
        product_id,
        sum(revenue) as daily_revenue,
        count(distinct user_id) as unique_customers,
        sum(quantity) as items_sold
    FROM sales_events
    WHERE date = yesterday()
    GROUP BY date, product_id

TYPE sink
EXPORT_CONNECTION_NAME "snowflake_s3"
EXPORT_BUCKET_URI "s3://warehouse-staging/daily-loads/"
EXPORT_FILE_TEMPLATE "sales_summary/{date}/products.parquet"
EXPORT_FORMAT "parquet"
EXPORT_SCHEDULE "0 3 * * *"
```

### Customer data exports

Provide customers with their usage data for compliance or transparency:

```sql
-- customer_data_export.pipe
NODE customer_usage
SQL >
    SELECT *
    FROM customer_analytics_view
    WHERE customer_id = {{ String(customer_id, required=True) }}
    AND date >= {{ Date(start_date, required=True) }}
    AND date <= {{ Date(end_date, required=True) }}

TYPE sink
EXPORT_CONNECTION_NAME "s3_customer_exports"
EXPORT_BUCKET_URI "s3://customer-data-exports/"
EXPORT_FILE_TEMPLATE "customer_{customer_id}/usage_{start_date}_to_{end_date}.csv"
EXPORT_FORMAT "csv"
EXPORT_SCHEDULE "@on-demand"
```

### Event streaming for microservices

Stream processed events to other systems using Kafka:

```sql
-- inventory_alerts.pipe
NODE low_stock_items
SQL >
    SELECT 
        product_id,
        current_stock,
        reorder_threshold,
        warehouse_location,
        supplier_id
    FROM inventory_levels
    WHERE current_stock <= reorder_threshold

TYPE sink
EXPORT_CONNECTION_NAME "kafka_inventory"
EXPORT_KAFKA_TOPIC "inventory_alerts"
EXPORT_SCHEDULE "*/15 * * * *"
```

## Scheduling and execution

Sinks support cron expressions for scheduling or `@on-demand` for manual execution:

```bash
# Run every hour
EXPORT_SCHEDULE "0 * * * *"

# Run daily at 2:30 AM
EXPORT_SCHEDULE "30 2 * * *"

# Run every 15 minutes
EXPORT_SCHEDULE "*/15 * * * *"

# Manual execution only
EXPORT_SCHEDULE "@on-demand"
```

You can trigger on-demand sinks manually:

```bash
# Using the CLI
tb sink run customer_data_export --customer_id=123 --start_date=2024-01-01 --end_date=2024-01-31
```

```bash
# Using the Sink Pipes API
curl \
  -X POST "https://api.tinybird.co/v0/pipes/p_test/sink" \
  -H "Authorization: Bearer <PIPES:READ token>" \  
  -d "file_template=export_file" \
  -d "format=csv" \
  -d "compression=gz" \
  -d "write_strategy=truncate" \
  -d {key}={val}
```

Note that in local environments, scheduling is not supported; you can only run sinks on demand.

## Observability

All sink operations are logged to Service Data Sources for monitoring:

```sql
-- Monitor sink performance
SELECT 
    pipe_id,
    job_status,
    duration_ms,
    bytes_processed
FROM tinybird.sinks_ops_log
WHERE created_at >= yesterday()
ORDER BY created_at DESC

-- Track data transfer costs
SELECT 
    sum(bytes) / 1024 / 1024 / 1024 as gb_transferred,
    sum(cost_usd) as total_cost
FROM tinybird.data_transfer
WHERE service = 'sink'
AND date >= startOfMonth(today())
```

## Limits

Sinks have some limits including execution time, frequency, and memory. Refer to the [Sinks limits documentation](https://www.tinybird.co/docs/forward/pricing/limits#sink-pipe-limits) for more details on sinks limits for your workspace.

## Getting started

Sinks are available today in Developer and Enterprise plans. To create your first sink:

1. Set up a connection using `tb connection create s3|gcs|kafka`
2. Create a `.pipe` file with `TYPE sink` and export configuration
3. Deploy with `tb deploy`
4. Monitor execution in the Tinybird UI or via Service Data Sources

Check the [sinks documentation](https://www.tinybird.co/docs/forward/work-with-data/publish-data/sinks) for detailed setup guides for each destination type and additional info about observability, billing, and limits.

### Have questions or feedback?

Let us know if you get stuck or have feedback, including new sink destinations you'd like us to support. You can post feedback in the #feedback channel of our [Slack community](https://www.tinybird.co/docs/community).
