Python SDK resources

Use the Python SDK to define Tinybird resources as code. You can define Data Sources, Pipes, Endpoints, Materialized Views, Copy Pipes, Sink Pipes, and Connections in Python and sync them to Tinybird.

Supported Python versions for Python SDK workflows are 3.11, 3.12, and 3.13. Python 3.14 is not supported yet because these workflows rely on the Tinybird CLI.

Project layout

A Python SDK project usually includes:

  • tinybird.config.json
  • tb_project/resources.py
  • tb_project/client.py

tinybird.config.json points include at the folder or files where you define resources.

Configure credentials

Create a .env.local file in your project root:

TINYBIRD_TOKEN=p.your_token_here
TINYBIRD_URL=https://api.tinybird.co

Tokens

Use define_token to declare reusable token names, then attach them to Data Sources or Pipes through each resource's tokens option.

from tinybird_sdk import define_datasource, define_endpoint, define_token, engine, node, t

events_append = define_token("events_append")
events_read = define_token("events_read")

events = define_datasource("events", {
    "schema": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
    "tokens": [
        {"token": events_append, "scope": "APPEND"},
        {"token": events_read, "scope": "READ"},
    ],
})

events_api = define_endpoint("events_api", {
    "nodes": [
        node({
            "name": "latest",
            "sql": "SELECT timestamp, payload FROM events ORDER BY timestamp DESC LIMIT 100",
        }),
    ],
    "output": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "tokens": [
        {"token": events_read, "scope": "READ"},
    ],
})

Token names must start with a letter or underscore and then use only alphanumeric characters or underscores. In Data Source token bindings, use READ or APPEND scopes. In Pipe or Endpoint token bindings, use READ.

Datasources

Define Data Sources with define_datasource:

from tinybird_sdk import define_datasource, engine, t

page_views = define_datasource("page_views", {
    "description": "Page view tracking data",
    "schema": {
        "timestamp": t.date_time(),
        "pathname": t.string(),
        "session_id": t.string(),
        "country": t.string().low_cardinality().nullable(),
    },
    "engine": engine.merge_tree({
        "sorting_key": ["pathname", "timestamp"],
    }),
})

Column JSON paths

Use column() when you need custom JSONPath mappings:

from tinybird_sdk import column, define_datasource, engine, t

events = define_datasource("events", {
    "schema": {
        "event_id": column(t.string(), {"json_path": "$.id"}),
        "timestamp": column(t.date_time(), {"json_path": "$.ts"}),
    },
    "engine": engine.merge_tree({"sorting_key": ["event_id"]}),
})

Endpoints

Define API endpoints with define_endpoint:

from tinybird_sdk import define_endpoint, node, p, t

top_pages = define_endpoint("top_pages", {
    "description": "Get the most visited pages",
    "params": {
        "start_date": p.date_time(),
        "end_date": p.date_time(),
        "limit": p.int32().optional(10),
    },
    "nodes": [
        node({
            "name": "aggregated",
            "sql": """
                SELECT pathname, count() AS views
                FROM page_views
                WHERE timestamp >= {{DateTime(start_date)}}
                  AND timestamp <= {{DateTime(end_date)}}
                GROUP BY pathname
                ORDER BY views DESC
                LIMIT {{Int32(limit, 10)}}
            """,
        }),
    ],
    "output": {
        "pathname": t.string(),
        "views": t.uint64(),
    },
})

Pipes

Define internal pipes with define_pipe:

from tinybird_sdk import define_pipe, node, p

filtered_events = define_pipe("filtered_events", {
    "params": {
        "start_date": p.date_time(),
        "end_date": p.date_time(),
    },
    "nodes": [
        node({
            "name": "filtered",
            "sql": """
                SELECT *
                FROM events
                WHERE timestamp >= {{DateTime(start_date)}}
                  AND timestamp <= {{DateTime(end_date)}}
            """,
        }),
    ],
})

Materialized views

Use define_materialized_view to populate derived Data Sources:

from tinybird_sdk import define_datasource, define_materialized_view, engine, node, t

daily_stats = define_datasource("daily_stats", {
    "schema": {
        "date": t.date(),
        "pathname": t.string(),
        "views": t.simple_aggregate_function("sum", t.uint64()),
    },
    "engine": engine.aggregating_merge_tree({
        "sorting_key": ["date", "pathname"],
    }),
})

daily_stats_mv = define_materialized_view("daily_stats_mv", {
    "datasource": daily_stats,
    "nodes": [
        node({
            "name": "aggregate",
            "sql": """
                SELECT toDate(timestamp) AS date, pathname, count() AS views
                FROM page_views
                GROUP BY date, pathname
            """,
        }),
    ],
})

Copy pipes

Use define_copy_pipe for scheduled snapshots:

from tinybird_sdk import define_copy_pipe, node

daily_snapshot = define_copy_pipe("daily_snapshot", {
    "datasource": daily_stats,
    "copy_schedule": "0 0 * * *",
    "copy_mode": "append",
    "nodes": [
        node({
            "name": "snapshot",
            "sql": "SELECT * FROM daily_stats WHERE date = today() - 1",
        }),
    ],
})

Sink pipes

Use define_sink_pipe to publish query results to Kafka or S3:

from tinybird_sdk import define_sink_pipe, node

kafka_events_sink = define_sink_pipe("kafka_events_sink", {
    "sink": {
        "connection": events_kafka,
        "topic": "events_export",
        "schedule": "@on-demand",
    },
    "nodes": [
        node({
            "name": "publish",
            "sql": "SELECT timestamp, payload FROM kafka_events",
        }),
    ],
})

s3_events_sink = define_sink_pipe("s3_events_sink", {
    "sink": {
        "connection": landing_s3,
        "bucket_uri": "s3://my-bucket/exports/",
        "file_template": "events_{date}",
        "format": "csv",
        "schedule": "@once",
    },
    "nodes": [
        node({
            "name": "export",
            "sql": "SELECT timestamp, session_id FROM events",
        }),
    ],
})

Connections

Define connector connections with define_kafka_connection, define_s3_connection, and define_gcs_connection:

from tinybird_sdk import (
    define_gcs_connection,
    define_kafka_connection,
    define_s3_connection,
    secret,
)

events_kafka = define_kafka_connection("events_kafka", {
    "bootstrap_servers": "kafka.example.com:9092",
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "PLAIN",
    "key": secret("KAFKA_KEY"),
    "secret": secret("KAFKA_SECRET"),
})

landing_s3 = define_s3_connection("landing_s3", {
    "region": "eu-west-1",
    "arn": secret("AWS_ROLE_ARN"),
})

landing_gcs = define_gcs_connection("landing_gcs", {
    "service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
})

Datasource ingestion config

You can attach Kafka, S3, or GCS ingestion directly to a Data Source:

from tinybird_sdk import define_datasource, engine, t

kafka_events = define_datasource("kafka_events", {
    "schema": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
    "kafka": {
        "connection": events_kafka,
        "topic": "events",
        "group_id": "events-consumer",
    },
})

Runtime client

Create a typed runtime client with Tinybird:

from tinybird_sdk import Tinybird
from .resources import page_views, top_pages

tinybird = Tinybird({
    "datasources": {"page_views": page_views},
    "pipes": {"top_pages": top_pages},
})

Then ingest and query:

tinybird.page_views.ingest({
    "timestamp": "2026-01-01 10:00:00",
    "pathname": "/home",
    "session_id": "abc123",
})

result = tinybird.top_pages.query({
    "start_date": "2026-01-01 00:00:00",
    "end_date": "2026-01-02 00:00:00",
    "limit": 5,
})

Next steps

Updated