Connect ClickHouse Python Client to Tinybird

The official ClickHouse Python client can connect to Tinybird using the ClickHouse® HTTP protocol compatibility. This enables you to query your Tinybird data sources programmatically from Python applications with excellent performance and integration with the Python data ecosystem.

The ClickHouse® connection to Tinybird is read-only. You can use it to query and analyze data from your Tinybird data sources, but you cannot modify data through this connection.

Prerequisites

  • Python 3.8 or later
  • A Tinybird workspace with data sources
  • A Tinybird Auth Token with read permissions for the workspace data sources

Installation

Install the official ClickHouse Python client:

pip install clickhouse-connect

Configuration

Create a client instance with the following configuration:

import clickhouse_connect

client = clickhouse_connect.get_client(
    interface='https',
    host='clickhouse.tinybird.co',
    username='<WORKSPACE_NAME>',  # Optional, for identification
    password='<TOKEN>',  # Your Tinybird Auth Token
    port=443,
)

See the list of ClickHouse hosts to find the correct one for your region.

Test the connection

Create a simple query to verify your connection:

def test_connection():
    try:
        result = client.query('SELECT database, name, engine FROM system.tables LIMIT 5')
        print("Connection successful!")

        # Print column names
        print(" | ".join(result.column_names))

        # Print data rows
        for row in result.result_rows:
            print(" | ".join(str(cell) for cell in row))

    except Exception as error:
        print(f"Connection failed: {error}")

test_connection()

Query your data

Once connected, you can query your Tinybird data sources:

# Query a specific data source
def query_data_source():
    result = client.query('SELECT * FROM your_data_source_name LIMIT 10')
    return result.result_rows

# Query with parameters
def query_with_params(start_date, limit):
    result = client.query(
        """
        SELECT timestamp, user_id, event_name
        FROM events
        WHERE timestamp >= %(start_date)s
        LIMIT %(limit)s
        """,
        parameters={
            'start_date': start_date,
            'limit': limit
        }
    )
    return result.result_rows

# Convert to pandas DataFrame
def query_to_dataframe():
    import pandas as pd

    result = client.query('SELECT * FROM your_data_source_name LIMIT 100')

    # Create DataFrame from result
    df = pd.DataFrame(result.result_rows, columns=result.column_names)
    return df

Observability

You can also explore Tinybird's observability data from internal service data sources:

# View recent API endpoint usage and performance
def query_recent_usage():
    result = client.query(
        """
        SELECT
            start_datetime,
            pipe_name,
            duration,
            result_rows,
            read_bytes,
            status_code
        FROM tinybird.pipe_stats_rt
        WHERE start_datetime >= now() - INTERVAL 1 DAY
        ORDER BY start_datetime DESC
        LIMIT 10
        """
    )

    print("Recent API endpoint usage:")
    for row in result.result_rows:
        start_time, pipe_name, duration, result_rows, read_bytes, status_code = row
        duration_ms = duration * 1000
        print(f"[{start_time}] {pipe_name} - {duration_ms:.2f}ms, {result_rows} rows, status {status_code}")

# Analyze endpoint performance metrics
def query_performance_metrics():
    result = client.query(
        """
        SELECT
            pipe_name,
            count() as request_count,
            avg(duration) as avg_duration_ms,
            avg(result_rows) as avg_result_rows,
            sum(read_bytes) as total_bytes_read
        FROM tinybird.pipe_stats_rt
        WHERE start_datetime >= now() - INTERVAL 1 HOUR
        GROUP BY pipe_name
        ORDER BY request_count DESC
        """
    )

    print("\nEndpoint performance metrics (last hour):")
    for row in result.result_rows:
        pipe_name, request_count, avg_duration, avg_result_rows, total_bytes_read = row
        avg_duration_ms = avg_duration * 1000
        print(f"{pipe_name}: {request_count} requests, {avg_duration_ms:.2f}ms avg, {avg_result_rows:.0f} avg rows, {total_bytes_read:,} bytes")

# Convert to pandas DataFrame for analysis
def query_to_dataframe():
    import pandas as pd

    result = client.query(
        """
        SELECT
            start_datetime,
            pipe_name,
            duration,
            result_rows,
            read_bytes,
            status_code
        FROM tinybird.pipe_stats_rt
        WHERE start_datetime >= now() - INTERVAL 1 DAY
        ORDER BY start_datetime DESC
        """
    )

    df = pd.DataFrame(result.result_rows, columns=result.column_names)
    return df

Working with data types

Handle various ClickHouse data types:

from datetime import datetime
from typing import Any, List, Dict

def handle_query_results():
    result = client.query(
        """
        SELECT
            toUInt32(42) as number,
            'hello world' as text,
            now() as timestamp,
            [1, 2, 3] as array_col,
            map('key1', 'value1', 'key2', 'value2') as map_col
        """
    )

    for row in result.result_rows:
        number, text, timestamp, array_col, map_col = row
        print(f"Number: {number} (type: {type(number)})")
        print(f"Text: {text} (type: {type(text)})")
        print(f"Timestamp: {timestamp} (type: {type(timestamp)})")
        print(f"Array: {array_col} (type: {type(array_col)})")
        print(f"Map: {map_col} (type: {type(map_col)})")

Error handling

Handle connection and query errors appropriately:

from clickhouse_connect.driver.exceptions import ClickHouseError

def safe_query(query, parameters=None):
    try:
        result = client.query(query, parameters=parameters)
        return result
    except ClickHouseError as error:
        print(f"ClickHouse error: {error}")
        print(f"Error code: {error.code}")
        raise
    except Exception as error:
        print(f"Connection error: {error}")
        raise

# Example usage
try:
    result = safe_query('SELECT COUNT(*) FROM your_data_source')
    count = result.result_rows[0][0]
    print(f"Total rows: {count:,}")
except Exception:
    print("Query failed")

Advanced usage

Streaming large results

For large datasets, use streaming to avoid memory issues:

def stream_large_query():
    # Stream results for large datasets
    result = client.query_row_block_stream(
        'SELECT * FROM large_data_source',
        settings={'max_block_size': 10000}
    )

    for block in result:
        # Process each block of rows
        for row in block:
            process_row(row)

def process_row(row):
    # Your row processing logic here
    pass

Custom settings

Pass ClickHouse settings for query optimization:

# Query with custom settings
result = client.query(
    'SELECT * FROM system.numbers LIMIT 5',
    settings={
        'max_result_rows': 5,
        'max_execution_time': 10
    }
)

Learn more

Updated