Connect ClickHouse Python Client to Tinybird¶
The official ClickHouse Python client can connect to Tinybird using the ClickHouse® HTTP protocol compatibility. This enables you to query your Tinybird data sources programmatically from Python applications with excellent performance and integration with the Python data ecosystem.
The ClickHouse® connection to Tinybird is read-only. You can use it to query and analyze data from your Tinybird data sources, but you cannot modify data through this connection.
Prerequisites¶
- Python 3.8 or later
- A Tinybird workspace with data sources
- A Tinybird Auth Token with read permissions for the workspace data sources
Installation¶
Install the official ClickHouse Python client:
pip install clickhouse-connect
Configuration¶
Create a client instance with the following configuration:
import clickhouse_connect client = clickhouse_connect.get_client( interface='https', host='clickhouse.tinybird.co', username='<WORKSPACE_NAME>', # Optional, for identification password='<TOKEN>', # Your Tinybird Auth Token port=443, )
See the list of ClickHouse hosts to find the correct one for your region.
Test the connection¶
Create a simple query to verify your connection:
def test_connection(): try: result = client.query('SELECT database, name, engine FROM system.tables LIMIT 5') print("Connection successful!") # Print column names print(" | ".join(result.column_names)) # Print data rows for row in result.result_rows: print(" | ".join(str(cell) for cell in row)) except Exception as error: print(f"Connection failed: {error}") test_connection()
Query your data¶
Once connected, you can query your Tinybird data sources:
# Query a specific data source def query_data_source(): result = client.query('SELECT * FROM your_data_source_name LIMIT 10') return result.result_rows # Query with parameters def query_with_params(start_date, limit): result = client.query( """ SELECT timestamp, user_id, event_name FROM events WHERE timestamp >= %(start_date)s LIMIT %(limit)s """, parameters={ 'start_date': start_date, 'limit': limit } ) return result.result_rows # Convert to pandas DataFrame def query_to_dataframe(): import pandas as pd result = client.query('SELECT * FROM your_data_source_name LIMIT 100') # Create DataFrame from result df = pd.DataFrame(result.result_rows, columns=result.column_names) return df
Observability¶
You can also explore Tinybird's observability data from internal service data sources:
# View recent API endpoint usage and performance def query_recent_usage(): result = client.query( """ SELECT start_datetime, pipe_name, duration, result_rows, read_bytes, status_code FROM tinybird.pipe_stats_rt WHERE start_datetime >= now() - INTERVAL 1 DAY ORDER BY start_datetime DESC LIMIT 10 """ ) print("Recent API endpoint usage:") for row in result.result_rows: start_time, pipe_name, duration, result_rows, read_bytes, status_code = row duration_ms = duration * 1000 print(f"[{start_time}] {pipe_name} - {duration_ms:.2f}ms, {result_rows} rows, status {status_code}") # Analyze endpoint performance metrics def query_performance_metrics(): result = client.query( """ SELECT pipe_name, count() as request_count, avg(duration) as avg_duration_ms, avg(result_rows) as avg_result_rows, sum(read_bytes) as total_bytes_read FROM tinybird.pipe_stats_rt WHERE start_datetime >= now() - INTERVAL 1 HOUR GROUP BY pipe_name ORDER BY request_count DESC """ ) print("\nEndpoint performance metrics (last hour):") for row in result.result_rows: pipe_name, request_count, avg_duration, avg_result_rows, total_bytes_read = row avg_duration_ms = avg_duration * 1000 print(f"{pipe_name}: {request_count} requests, {avg_duration_ms:.2f}ms avg, {avg_result_rows:.0f} avg rows, {total_bytes_read:,} bytes") # Convert to pandas DataFrame for analysis def query_to_dataframe(): import pandas as pd result = client.query( """ SELECT start_datetime, pipe_name, duration, result_rows, read_bytes, status_code FROM tinybird.pipe_stats_rt WHERE start_datetime >= now() - INTERVAL 1 DAY ORDER BY start_datetime DESC """ ) df = pd.DataFrame(result.result_rows, columns=result.column_names) return df
Working with data types¶
Handle various ClickHouse data types:
from datetime import datetime from typing import Any, List, Dict def handle_query_results(): result = client.query( """ SELECT toUInt32(42) as number, 'hello world' as text, now() as timestamp, [1, 2, 3] as array_col, map('key1', 'value1', 'key2', 'value2') as map_col """ ) for row in result.result_rows: number, text, timestamp, array_col, map_col = row print(f"Number: {number} (type: {type(number)})") print(f"Text: {text} (type: {type(text)})") print(f"Timestamp: {timestamp} (type: {type(timestamp)})") print(f"Array: {array_col} (type: {type(array_col)})") print(f"Map: {map_col} (type: {type(map_col)})")
Error handling¶
Handle connection and query errors appropriately:
from clickhouse_connect.driver.exceptions import ClickHouseError def safe_query(query, parameters=None): try: result = client.query(query, parameters=parameters) return result except ClickHouseError as error: print(f"ClickHouse error: {error}") print(f"Error code: {error.code}") raise except Exception as error: print(f"Connection error: {error}") raise # Example usage try: result = safe_query('SELECT COUNT(*) FROM your_data_source') count = result.result_rows[0][0] print(f"Total rows: {count:,}") except Exception: print("Query failed")
Advanced usage¶
Streaming large results¶
For large datasets, use streaming to avoid memory issues:
def stream_large_query(): # Stream results for large datasets result = client.query_row_block_stream( 'SELECT * FROM large_data_source', settings={'max_block_size': 10000} ) for block in result: # Process each block of rows for row in block: process_row(row) def process_row(row): # Your row processing logic here pass
Custom settings¶
Pass ClickHouse settings for query optimization:
# Query with custom settings result = client.query( 'SELECT * FROM system.numbers LIMIT 5', settings={ 'max_result_rows': 5, 'max_execution_time': 10 } )