---
title: "Python SDK resources"
meta:
    description: Define Tinybird resources in Python with the Tinybird SDK.
---

# Python SDK resources

Use the Python SDK to define Tinybird resources as code. You can define Data Sources, Pipes, Endpoints, Materialized Views, Copy Pipes, Sink Pipes, and Connections in Python and sync them to Tinybird.

{% callout type="note" %}
Supported Python versions for Python SDK workflows are 3.11, 3.12, and 3.13. Python 3.14 is not supported yet because these workflows rely on the Tinybird CLI.
{% /callout %}

## Project layout

A Python SDK project usually includes:

- `tinybird.config.json`
- `tb_project/resources.py`
- `tb_project/client.py`

`tinybird.config.json` points `include` at the folder or files where you define resources.

## Configure credentials

Create a `.env.local` file in your project root:

```shell
TINYBIRD_TOKEN=p.your_token_here
TINYBIRD_URL={% user("apiHost", "https://api.tinybird.co") %}
```

## Tokens

Use `define_token` to declare reusable token names, then attach them to Data Sources or Pipes through each resource's `tokens` option.

```python
from tinybird_sdk import define_datasource, define_endpoint, define_token, engine, node, t

events_append = define_token("events_append")
events_read = define_token("events_read")

events = define_datasource("events", {
    "schema": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
    "tokens": [
        {"token": events_append, "scope": "APPEND"},
        {"token": events_read, "scope": "READ"},
    ],
})

events_api = define_endpoint("events_api", {
    "nodes": [
        node({
            "name": "latest",
            "sql": "SELECT timestamp, payload FROM events ORDER BY timestamp DESC LIMIT 100",
        }),
    ],
    "output": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "tokens": [
        {"token": events_read, "scope": "READ"},
    ],
})
```

Token names must start with a letter or underscore and then use only alphanumeric characters or underscores. In Data Source token bindings, use `READ` or `APPEND` scopes. In Pipe or Endpoint token bindings, use `READ`.

## Datasources

Define Data Sources with `define_datasource`:

```python
from tinybird_sdk import define_datasource, engine, t

page_views = define_datasource("page_views", {
    "description": "Page view tracking data",
    "schema": {
        "timestamp": t.date_time(),
        "pathname": t.string(),
        "session_id": t.string(),
        "country": t.string().low_cardinality().nullable(),
    },
    "engine": engine.merge_tree({
        "sorting_key": ["pathname", "timestamp"],
    }),
})
```

### Column JSON paths

Use `column()` when you need custom JSONPath mappings:

```python
from tinybird_sdk import column, define_datasource, engine, t

events = define_datasource("events", {
    "schema": {
        "event_id": column(t.string(), {"json_path": "$.id"}),
        "timestamp": column(t.date_time(), {"json_path": "$.ts"}),
    },
    "engine": engine.merge_tree({"sorting_key": ["event_id"]}),
})
```

## Endpoints

Define API endpoints with `define_endpoint`:

```python
from tinybird_sdk import define_endpoint, node, p, t

top_pages = define_endpoint("top_pages", {
    "description": "Get the most visited pages",
    "params": {
        "start_date": p.date_time(),
        "end_date": p.date_time(),
        "limit": p.int32().optional(10),
    },
    "nodes": [
        node({
            "name": "aggregated",
            "sql": """
                SELECT pathname, count() AS views
                FROM page_views
                WHERE timestamp >= {{DateTime(start_date)}}
                  AND timestamp <= {{DateTime(end_date)}}
                GROUP BY pathname
                ORDER BY views DESC
                LIMIT {{Int32(limit, 10)}}
            """,
        }),
    ],
    "output": {
        "pathname": t.string(),
        "views": t.uint64(),
    },
})
```

## Pipes

Define internal pipes with `define_pipe`:

```python
from tinybird_sdk import define_pipe, node, p

filtered_events = define_pipe("filtered_events", {
    "params": {
        "start_date": p.date_time(),
        "end_date": p.date_time(),
    },
    "nodes": [
        node({
            "name": "filtered",
            "sql": """
                SELECT *
                FROM events
                WHERE timestamp >= {{DateTime(start_date)}}
                  AND timestamp <= {{DateTime(end_date)}}
            """,
        }),
    ],
})
```

## Materialized views

Use `define_materialized_view` to populate derived Data Sources:

```python
from tinybird_sdk import define_datasource, define_materialized_view, engine, node, t

daily_stats = define_datasource("daily_stats", {
    "schema": {
        "date": t.date(),
        "pathname": t.string(),
        "views": t.simple_aggregate_function("sum", t.uint64()),
    },
    "engine": engine.aggregating_merge_tree({
        "sorting_key": ["date", "pathname"],
    }),
})

daily_stats_mv = define_materialized_view("daily_stats_mv", {
    "datasource": daily_stats,
    "nodes": [
        node({
            "name": "aggregate",
            "sql": """
                SELECT toDate(timestamp) AS date, pathname, count() AS views
                FROM page_views
                GROUP BY date, pathname
            """,
        }),
    ],
})
```

## Copy pipes

Use `define_copy_pipe` for scheduled snapshots:

```python
from tinybird_sdk import define_copy_pipe, node

daily_snapshot = define_copy_pipe("daily_snapshot", {
    "datasource": daily_stats,
    "copy_schedule": "0 0 * * *",
    "copy_mode": "append",
    "nodes": [
        node({
            "name": "snapshot",
            "sql": "SELECT * FROM daily_stats WHERE date = today() - 1",
        }),
    ],
})
```

## Sink pipes

Use `define_sink_pipe` to publish query results to Kafka or S3:

```python
from tinybird_sdk import define_sink_pipe, node

kafka_events_sink = define_sink_pipe("kafka_events_sink", {
    "sink": {
        "connection": events_kafka,
        "topic": "events_export",
        "schedule": "@on-demand",
    },
    "nodes": [
        node({
            "name": "publish",
            "sql": "SELECT timestamp, payload FROM kafka_events",
        }),
    ],
})

s3_events_sink = define_sink_pipe("s3_events_sink", {
    "sink": {
        "connection": landing_s3,
        "bucket_uri": "s3://my-bucket/exports/",
        "file_template": "events_{date}",
        "format": "csv",
        "schedule": "@once",
    },
    "nodes": [
        node({
            "name": "export",
            "sql": "SELECT timestamp, session_id FROM events",
        }),
    ],
})
```

## Connections

Define connector connections with `define_kafka_connection`, `define_s3_connection`, and `define_gcs_connection`:

```python
from tinybird_sdk import (
    define_gcs_connection,
    define_kafka_connection,
    define_s3_connection,
    secret,
)

events_kafka = define_kafka_connection("events_kafka", {
    "bootstrap_servers": "kafka.example.com:9092",
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "PLAIN",
    "key": secret("KAFKA_KEY"),
    "secret": secret("KAFKA_SECRET"),
})

landing_s3 = define_s3_connection("landing_s3", {
    "region": "eu-west-1",
    "arn": secret("AWS_ROLE_ARN"),
})

landing_gcs = define_gcs_connection("landing_gcs", {
    "service_account_credentials_json": secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
})
```

## Datasource ingestion config

You can attach Kafka, S3, or GCS ingestion directly to a Data Source:

```python
from tinybird_sdk import define_datasource, engine, t

kafka_events = define_datasource("kafka_events", {
    "schema": {
        "timestamp": t.date_time(),
        "payload": t.string(),
    },
    "engine": engine.merge_tree({"sorting_key": ["timestamp"]}),
    "kafka": {
        "connection": events_kafka,
        "topic": "events",
        "group_id": "events-consumer",
    },
})
```

## Runtime client

Create a typed runtime client with `Tinybird`:

```python
from tinybird_sdk import Tinybird
from .resources import page_views, top_pages

tinybird = Tinybird({
    "datasources": {"page_views": page_views},
    "pipes": {"top_pages": top_pages},
})
```

Then ingest and query:

```python
tinybird.page_views.ingest({
    "timestamp": "2026-01-01 10:00:00",
    "pathname": "/home",
    "session_id": "abc123",
})

result = tinybird.top_pages.query({
    "start_date": "2026-01-01 00:00:00",
    "end_date": "2026-01-02 00:00:00",
    "limit": 5,
})
```

## Next steps

- Review CLI commands: [Python SDK CLI commands](/forward/dev-reference/commands/python-sdk-cli)
- Follow the setup and deployment flow: [Quick start: Python SDK](/forward/get-started/quick-start-python-sdk)
