Python SDK resources¶
Use the Python SDK to define Tinybird resources as code. You can define Data Sources, Pipes, Endpoints, Materialized Views, Copy Pipes, Sink Pipes, and Connections in Python and sync them to Tinybird.
Supported Python versions for Python SDK workflows are 3.11, 3.12, and 3.13. Python 3.14 is not supported yet because these workflows rely on the Tinybird CLI.
Project layout¶
A Python SDK project usually includes:
tinybird.config.jsontb_project/resources.pytb_project/client.py
tinybird.config.json points include at the folder or files where you define resources.
Configure credentials¶
Create a .env.local file in your project root:
TINYBIRD_TOKEN=p.your_token_here TINYBIRD_URL=https://api.tinybird.co
Tokens¶
Use define_token to declare reusable token names, then attach them to Data Sources or Pipes through each resource's tokens option.
from tinybird_sdk import define_datasource, define_endpoint, define_token, engine, node, t
events_append = define_token("events_append")
events_read = define_token("events_read")
events = define_datasource("events", {
"schema": {
"timestamp": t.date_time(),
"payload": t.string(),
},
"engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
"tokens": [
{"token": events_append, "scope": "APPEND"},
{"token": events_read, "scope": "READ"},
],
})
events_api = define_endpoint("events_api", {
"nodes": [
node({
"name": "latest",
"sql": "SELECT timestamp, payload FROM events ORDER BY timestamp DESC LIMIT 100",
}),
],
"output": {
"timestamp": t.date_time(),
"payload": t.string(),
},
"tokens": [
{"token": events_read, "scope": "READ"},
],
})
Token names must start with a letter or underscore and then use only alphanumeric characters or underscores. In Data Source token bindings, use READ or APPEND scopes. In Pipe or Endpoint token bindings, use READ.
Datasources¶
Define Data Sources with define_datasource:
from tinybird_sdk import define_datasource, engine, t
page_views = define_datasource("page_views", {
"description": "Page view tracking data",
"schema": {
"timestamp": t.date_time(),
"pathname": t.string(),
"session_id": t.string(),
"country": t.string().low_cardinality().nullable(),
},
"engine": engine.merge_tree({
"sorting_key": ["pathname", "timestamp"],
}),
})
Column JSON paths¶
Use column() when you need custom JSONPath mappings:
from tinybird_sdk import column, define_datasource, engine, t
events = define_datasource("events", {
"schema": {
"event_id": column(t.string(), {"json_path": "$.id"}),
"timestamp": column(t.date_time(), {"json_path": "$.ts"}),
},
"engine": engine.merge_tree({"sorting_key": ["event_id"]}),
})
Endpoints¶
Define API endpoints with define_endpoint:
from tinybird_sdk import define_endpoint, node, p, t
top_pages = define_endpoint("top_pages", {
"description": "Get the most visited pages",
"params": {
"start_date": p.date_time(),
"end_date": p.date_time(),
"limit": p.int32().optional(10),
},
"nodes": [
node({
"name": "aggregated",
"sql": """
SELECT pathname, count() AS views
FROM page_views
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
GROUP BY pathname
ORDER BY views DESC
LIMIT {{Int32(limit, 10)}}
""",
}),
],
"output": {
"pathname": t.string(),
"views": t.uint64(),
},
})
Pipes¶
Define internal pipes with define_pipe:
from tinybird_sdk import define_pipe, node, p
filtered_events = define_pipe("filtered_events", {
"params": {
"start_date": p.date_time(),
"end_date": p.date_time(),
},
"nodes": [
node({
"name": "filtered",
"sql": """
SELECT *
FROM events
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
""",
}),
],
})
Materialized views¶
Use define_materialized_view to populate derived Data Sources:
from tinybird_sdk import define_datasource, define_materialized_view, engine, node, t
daily_stats = define_datasource("daily_stats", {
"schema": {
"date": t.date(),
"pathname": t.string(),
"views": t.simple_aggregate_function("sum", t.uint64()),
},
"engine": engine.aggregating_merge_tree({
"sorting_key": ["date", "pathname"],
}),
})
daily_stats_mv = define_materialized_view("daily_stats_mv", {
"datasource": daily_stats,
"nodes": [
node({
"name": "aggregate",
"sql": """
SELECT toDate(timestamp) AS date, pathname, count() AS views
FROM page_views
GROUP BY date, pathname
""",
}),
],
})
Copy pipes¶
Use define_copy_pipe for scheduled snapshots:
from tinybird_sdk import define_copy_pipe, node
daily_snapshot = define_copy_pipe("daily_snapshot", {
"datasource": daily_stats,
"copy_schedule": "0 0 * * *",
"copy_mode": "append",
"nodes": [
node({
"name": "snapshot",
"sql": "SELECT * FROM daily_stats WHERE date = today() - 1",
}),
],
})
Sink pipes¶
Use define_sink_pipe to publish query results to Kafka or S3:
from tinybird_sdk import define_sink_pipe, node
kafka_events_sink = define_sink_pipe("kafka_events_sink", {
"sink": {
"connection": events_kafka,
"topic": "events_export",
"schedule": "@on-demand",
},
"nodes": [
node({
"name": "publish",
"sql": "SELECT timestamp, payload FROM kafka_events",
}),
],
})
s3_events_sink = define_sink_pipe("s3_events_sink", {
"sink": {
"connection": landing_s3,
"bucket_uri": "s3://my-bucket/exports/",
"file_template": "events_{date}",
"format": "csv",
"schedule": "@once",
},
"nodes": [
node({
"name": "export",
"sql": "SELECT timestamp, session_id FROM events",
}),
],
})
Connections¶
Define connector connections with define_kafka_connection, define_s3_connection, and define_gcs_connection:
from tinybird_sdk import (
define_gcs_connection,
define_kafka_connection,
define_s3_connection,
secret,
)
events_kafka = define_kafka_connection("events_kafka", {
"bootstrap_servers": "kafka.example.com:9092",
"security_protocol": "SASL_SSL",
"sasl_mechanism": "PLAIN",
"key": secret("KAFKA_KEY"),
"secret": secret("KAFKA_SECRET"),
})
landing_s3 = define_s3_connection("landing_s3", {
"region": "eu-west-1",
"arn": secret("AWS_ROLE_ARN"),
})
landing_gcs = define_gcs_connection("landing_gcs", {
"service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
})
Datasource ingestion config¶
You can attach Kafka, S3, or GCS ingestion directly to a Data Source:
from tinybird_sdk import define_datasource, engine, t
kafka_events = define_datasource("kafka_events", {
"schema": {
"timestamp": t.date_time(),
"payload": t.string(),
},
"engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
"kafka": {
"connection": events_kafka,
"topic": "events",
"group_id": "events-consumer",
},
})
Runtime client¶
Create a typed runtime client with Tinybird:
from tinybird_sdk import Tinybird
from .resources import page_views, top_pages
tinybird = Tinybird({
"datasources": {"page_views": page_views},
"pipes": {"top_pages": top_pages},
})
Then ingest and query:
tinybird.page_views.ingest({
"timestamp": "2026-01-01 10:00:00",
"pathname": "/home",
"session_id": "abc123",
})
result = tinybird.top_pages.query({
"start_date": "2026-01-01 00:00:00",
"end_date": "2026-01-02 00:00:00",
"limit": 5,
})
Next steps¶
- Review CLI commands: Python SDK CLI commands
- Follow the setup and deployment flow: Quick start: Python SDK