Iceberg table function

The Tinybird iceberg() table function allows you to read data from your existing Apache Iceberg database in S3 into Tinybird, then schedule a regular copy pipe to orchestrate synchronization. You can load full tables, and every run performs a full replace on the data source.

To use it, define a node using standard SQL and the iceberg function keyword, then publish the node as a copy pipe that does a sync on every run. See Table functions for general information and tips.

Additionally you can use the iceberg table function in API endpoints.

Setting secrets

Table functions require authentication credentials that must be stored securely. In Tinybird Forward, manage those credentials with tb secret:

tb secret set AWS_ACCESS_KEY_ID <access_key>
tb secret set AWS_SECRET_ACCESS_KEY <secret_key>

Set the secret in each environment where the copy pipe runs. For example, use tb --cloud secret set for Cloud and tb --branch=<branch_name> secret set for a branch. Then reference the secrets in SQL with tb_secret().

In Tinybird Classic, use the Environment Variables API to create the same secret values.

For more details, see tb secret.

Syntax

Create a new pipe node. Call the iceberg table function and pass the AWS access key and secret as Tinybird secrets:

Example query logic
SELECT *
FROM iceberg(
  's3://your_bucket/iceberg/db/table',
  {{ tb_secret("AWS_ACCESS_KEY_ID") }},
  {{ tb_secret("AWS_SECRET_ACCESS_KEY") }}
)

Publish this node as a copy pipe. You can choose to append only new data or replace all data.

Check a full working example in this GitHub repository

Example: sync an Iceberg table from S3

The following example copies an Apache Iceberg table stored in S3 into a Tinybird Data Source every hour.

First, define the target Data Source. Then define the copy pipe that reads from Iceberg.

datasources/iceberg_orders.datasource
SCHEMA >
    `order_id` UInt64,
    `customer_id` UInt64,
    `status` String,
    `amount` Float64,
    `updated_at` DateTime

ENGINE "ReplacingMergeTree(updated_at)"
ENGINE_SORTING_KEY "order_id"
pipes/iceberg_orders_sync.pipe
NODE iceberg_orders
SQL >
    %
    SELECT
        order_id,
        customer_id,
        status,
        amount,
        updated_at
    FROM iceberg(
        's3://my-lakehouse/warehouse/orders',
        {{ tb_secret("AWS_ACCESS_KEY_ID") }},
        {{ tb_secret("AWS_SECRET_ACCESS_KEY") }}
    )

TYPE copy
TARGET_DATASOURCE iceberg_orders
COPY_MODE replace
COPY_SCHEDULE 0 * * * *

Use COPY_MODE replace when the Iceberg table is the source of truth and the full table is small enough to refresh on the schedule. For larger tables, filter by an update timestamp and use COPY_MODE append with a deduplicating engine.

See also

Updated