In my previous post, I showed you how to build CI/CD pipelines for real-time analytics projects with Kafka and Tinybird. One of the most repeated feedback about it was that it lacked details about how to test with a local Kafka. In this post I'll show you an example of a full local environment for development, and a custom CI to, beyond SQL logic, test also the Kafka - Tinybird connection.
Quick refresher on building analytics with Kafka and Tinybird
Here's the complete workflow to connect a Kafka topic to a Tinybird data source and build analytical features on top of the topic data, from local development to production:
# Install Tinybird CLI and log in
curl https://tinybird.co | sh
tb login
# Start Tinybird Local
tb local start
# Follow along to connect a Kafka topic to a Tinybird data source
tb connection create kafka
# Create analytical endpoints
tb create --prompt "an endpoint that returns latest value and today's average of the selected sensor"
# Deploy to Tinybird Cloud
tb --cloud deploy
That's it! Your real-time analytics pipeline is now running, processing data from Kafka and exposing it through API endpoints. Keep reading to learn how to set up the complete local development environment with Docker Compose and add proper testing.
What you'll learn
In this post, we'll explore how to set up a complete local development environment for real-time analytics using Kafka (via Redpanda) and Tinybird. You'll learn how to:
- Set up a Docker Compose environment with Redpanda and Tinybird Local
- Configure Kafka connections that work seamlessly in both local and production environments
- Create a script that ensures your services are ready before testing
- Write and run tests that validate both your SQL logic and Kafka-Tinybird connection
- Enhance your CI/CD pipeline to include connection testing
By the end, you'll have a development workflow that allows for rapid iteration while maintaining production-grade quality assurance.
Why local-first development matters
As developers, we're used to running our entire application stack locally. Why should analytics be any different? When building real-time analytics features, waiting for deployments to test changes can be frustrating. With Tinybird Local and Kafka, you can:
- Test changes instantly without waiting for deployments
- Debug data pipelines in real-time
- Iterate on your analytics features as fast as you do on your application code
- Validate your changes before pushing to production
This approach follows the same principles you use in your application development:
- Fast feedback loops
- Local testing
- CI/CD integration
- Version control
Enhance your existing project with local development
Starting with the water meters project from the previous post, we'll add Docker Compose to run the entire stack locally. I am using Redpanda cause it is lighter than Kafka and fully compatible, but apache/kafka:latest
works as well.
Add Docker Compose for local development
Create a docker-compose.yml
that spins up Redpanda and Tinybird Local:
networks:
redpanda_network:
driver: bridge
volumes:
redpanda-0:
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v25.1.4
container_name: redpanda
platform: linux/amd64
command:
- redpanda
- start
- --kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:19092
- --advertise-kafka-addr=internal://redpanda:9092,external://localhost:19092
- --pandaproxy-addr=internal://0.0.0.0:8082,external://0.0.0.0:18082
- --advertise-pandaproxy-addr=internal://redpanda:8082,external://localhost:18082
- --schema-registry-addr=internal://0.0.0.0:8081,external://0.0.0.0:18081
- --rpc-addr=redpanda:33145
- --advertise-rpc-addr=redpanda:33145
- --mode=dev-container
- --smp=1
- --default-log-level=info
environment:
RP_BOOTSTRAP_USER: "superuser:secretpassword"
ports:
- "18081:18081"
- "18082:18082"
- "19092:19092"
- "19644:9644"
volumes:
- redpanda-0:/var/lib/redpanda/data
networks:
- redpanda_network
healthcheck:
test: ["CMD", "rpk", "cluster", "info", "-X", "user=superuser", "-X", "pass=secretpassword"]
interval: 10s
timeout: 15s
retries: 10
tinybird-local:
image: tinybirdco/tinybird-local:latest
container_name: tinybird-local-rp
platform: linux/amd64
ports:
- "7181:7181"
networks:
- redpanda_network
depends_on:
- redpanda
Let's break down the key components of this Docker Compose configuration:
Networks and Volumes:
redpanda_network
: A bridge network that allows containers to communicate with each otherredpanda-0
: A persistent volume to store Redpanda's data
Redpanda Service:
- Uses Redpanda v25.1.4, a Kafka-compatible streaming platform
- Configured with both internal and external addresses for:
- Kafka (9092/19092): For message streaming
- Pandaproxy (8082/18082): For REST API access
- Schema Registry (8081/18081): For schema management
- Runs in development mode with minimal resources (
--smp=1
) - Includes basic authentication with
superuser:secretpassword
- Exposes ports for external access (18081, 18082, 19092, 19644)
- Has a healthcheck to ensure the service is ready
Tinybird Local Service:
- Uses the official Tinybird Local image
- Exposes port 7181 for API access
- Connects to the same network as Redpanda
- Depends on Redpanda, ensuring proper startup order
This setup creates a fully functional local development environment where Redpanda acts as your Kafka-compatible message broker and Tinybird Local processes the streaming data. The services are configured to work together seamlessly while being accessible from your host machine.
Update your Kafka connection for local development
Modify your connections/my_kafka_conn.connection
to work with both local and production environments:
TYPE kafka
KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_PROD_SERVER", "redpanda:9092") }}
KAFKA_SECURITY_PROTOCOL {{ tb_secret("KAFKA_PROD_SECURITY_PROTOCOL", "PLAINTEXT") }}
KAFKA_SASL_MECHANISM {{ tb_secret("KAFKA_PROD_SASL_MECHANISM", "PLAIN") }}
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "superuser") }}
KAFKA_SECRET {{ tb_secret("KAFKA_SECRET", "secretpassword") }}
Note that the bootstrap servers address is the internal one --advertise-kafka-addr=internal://redpanda:9092,external://localhost:19092
because Tinybird Local is in the same network as Redpanda.
In Tinybird you can have different secrets depending of the environment. In this case:
- Local development uses Redpanda with simple auth, relying on default value of
tb_secret
- Production uses your Prod Kafka credentials set with
tb --cloud secret set
Create a setup script for local development
Create a setup-local.sh
script that waits for services and loads your fixture data:
#!/bin/bash
# Function to check if Redpanda is ready
check_redpanda_health() {
local health_output
health_output=$(docker exec -i redpanda rpk cluster health --format json)
if [[ $health_output == *"\"is_healthy\":true"* ]]; then
return 0
else
return 1
fi
}
# Function to check if Tinybird is ready
check_tinybird_status() {
local status_output
status_output=$(tb local status)
if [[ $status_output == *"✓ Tinybird Local is ready!"* ]]; then
return 0
else
return 1
fi
}
# Maximum number of retries
MAX_RETRIES=30
RETRY_INTERVAL=5
# Wait for both Redpanda and Tinybird to be ready
echo "Waiting for both Redpanda and Tinybird Local to be ready..."
# Try until success or max retries reached
for ((i=1; i<=MAX_RETRIES; i++)); do
if check_redpanda_health && check_tinybird_status; then
echo "✓ Both Redpanda and Tinybird Local are ready!"
echo "Proceeding with deployment..."
# Create topic in Redpanda
docker exec -i redpanda rpk topic create water_metrics_demo -X brokers=redpanda:9092
# Deploy to Tinybird
tb deploy
# Produce to Redpanda topic
cat fixtures/kafka_water_meters.ndjson | docker exec -i redpanda rpk topic produce water_metrics_demo -X brokers=redpanda:9092
# Execute SQL query in Tinybird
tb sql "select meter_id, timestamp, flow_rate, temperature from kafka_water_meters"
echo -e "\n✅ Setup completed successfully!"
exit 0
else
echo "Attempt $i/$MAX_RETRIES: Services not ready yet. Waiting ${RETRY_INTERVAL}s..."
sleep $RETRY_INTERVAL
fi
done
echo "❌ Failed to connect to services after $MAX_RETRIES attempts"
exit 1
Make it executable and run your local stack:
chmod +x setup-local.sh
docker-compose up -d
./setup-local.sh
Test your endpoints without tb test
With your local stack running, you can test your endpoints instantly:
tb token copy "admin local_testing@tinybird.co" && TB_LOCAL_TOKEN=$(pbpaste)
curl -H "Authorization: Bearer $TB_LOCAL_TOKEN" -X GET "http://localhost:7181/v0/pipes/meter_measurements.json"
And you also have tb test
, but due to its nature —it spins up a new workspace different than the one you have running in local, appends fixtures, and run tests— the connections are not really tested. But we created /tests and /fixtures in the first post, so let's add a python script to reuse them and test connections:
import os
import subprocess
import time
import requests
import yaml
import pytest
import json
import glob
# Get the tb command from environment variable or use default
TB_COMMAND = os.getenv("TB_COMMAND", os.path.expanduser("~/.local/bin/tb"))
def check_local_server():
"""Check if the local Tinybird server is running."""
try:
response = requests.get("http://localhost:7181/tokens")
return response.status_code == 200
except requests.exceptions.ConnectionError:
return False
def load_test_cases(yaml_file):
"""Load test cases from a YAML file."""
with open(yaml_file, 'r') as f:
return yaml.safe_load(f)
def run_command(command, input_data=None):
"""Run a command with detailed logging."""
print(f"Running command: {' '.join(command)}")
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
input=input_data
)
print(f"Command output: {result.stdout}")
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Command failed with exit code {e.returncode}: {' '.join(command)}")
print(f"Error output: {e.stderr}")
raise
except Exception as e:
print(f"Unexpected error running command {' '.join(command)}: {str(e)}")
raise
def get_token_and_host():
"""Get the token and host for API authentication."""
print("\nGetting API token and host...")
result = run_command([TB_COMMAND, "--output", "json", "info"])
data = json.loads(result)
token = data["local"]["token"]
host = data["local"]["api"]
print(f"Got token: {token[:10]}...")
print(f"Got host: {host}")
return token, host
def run_test_cases(endpoint, test_cases, base_url, token):
"""Run test cases for a specific endpoint."""
print(f"\nTesting {endpoint} endpoint...")
url = f"{base_url}/{endpoint}.ndjson"
for test_case in test_cases:
print(f"\nRunning test case: {test_case['name']}")
# Prepare parameters
params = {"token": token}
if test_case.get("parameters"):
params.update(dict(param.split("=") for param in test_case["parameters"].split("&")))
# Make request
print(f"Making request to {url} with params: {params}")
response = requests.get(url, params=params)
# Check HTTP status
expected_status = test_case.get("expected_http_status", 200)
assert response.status_code == expected_status, \
f"Test '{test_case['name']}' failed: Expected status {expected_status}, got {response.status_code}"
# Check response content
if test_case["expected_result"]:
if expected_status == 400:
# For error responses, check the error message directly
error_data = response.json()
assert error_data["error"] == test_case["expected_result"], \
f"Test '{test_case['name']}' failed: Expected error '{test_case['expected_result']}', got '{error_data['error']}'"
else:
# For successful responses, parse as NDJSON
expected_data = [json.loads(line) for line in test_case["expected_result"].strip().split('\n') if line.strip()]
actual_data = [json.loads(line) for line in response.text.strip().split('\n') if line.strip()]
print(f"Expected data: {expected_data}")
print(f"Actual data: {actual_data}")
assert actual_data == expected_data, \
f"Test '{test_case['name']}' failed: Expected {expected_data}, got {actual_data}"
def test_all_endpoints():
"""Test all endpoints using their respective YAML test files."""
try:
print("\n=== Setting up test environment ===")
# Check if local server is running
print("\nChecking local Tinybird server...")
if not check_local_server():
raise Exception("Local Tinybird server is not running. Please start it with 'docker compose up -d'")
# Clear workspace
print("\nClearing workspace...")
try:
run_command([TB_COMMAND, "workspace", "clear", "--yes"])
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to clear workspace: {e.stderr}")
print("Continuing with test execution...")
# Deploy the project
print("\nDeploying project...")
run_command([TB_COMMAND, "deploy"])
# Send test data to Redpanda
print("\nSending test data to Redpanda...")
with open("fixtures/kafka_water_meters.ndjson", "r") as f:
data = f.read()
run_command(["docker", "exec", "-i", "redpanda", "rpk", "topic", "produce", "water_metrics_demo", "-X", "brokers=redpanda:9092"], input_data=data)
# Wait for data to be ingested
print("\nWaiting for data ingestion...")
time.sleep(45) # Increased wait time
print("\n=== Starting endpoint tests ===")
# Get API token and host
token, host = get_token_and_host()
base_url = f"{host}/v0/pipes"
# Find all YAML test files in the tests directory
test_files = glob.glob("tests/*.yaml")
# Run tests for each YAML file
for test_file in test_files:
# Extract endpoint name from filename (remove .yaml extension)
endpoint = os.path.splitext(os.path.basename(test_file))[0]
test_cases = load_test_cases(test_file)
run_test_cases(endpoint, test_cases, base_url, token)
finally:
# Cleanup
print("\n=== Cleaning up test environment ===")
run_command([TB_COMMAND, "workspace", "clear", "--yes"])
### Enhance your CI/CD pipeline with local testing
To adapt the CI to use the real connection, an update to the existing CI workflow is needed. Modify `.github/workflows/tinybird-ci.yml`:
```yaml
name: Tinybird - Kafka full CI Workflow
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
env:
TINYBIRD_HOST: ${{ secrets.TINYBIRD_HOST }}
TINYBIRD_TOKEN: ${{ secrets.TINYBIRD_TOKEN }}
jobs:
ci:
runs-on: ubuntu-latest
defaults:
run:
working-directory: '.'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest requests pyyaml
- name: Install Tinybird CLI
run: |
curl https://tinybird.co | sh
- name: Set up Docker Compose
run: |
docker compose up -d
# Wait for services to be ready
sleep 30
# Check if services are running
docker compose ps
# Check Redpanda logs
docker compose logs redpanda
# Check Tinybird logs
docker compose logs tinybird-local
- name: tb info
run: |
tb local status
- name: create topic
run: |
docker exec -i redpanda rpk topic create water_metrics_demo -X brokers=redpanda:9092
- name: Run tests
env:
TB_COMMAND: "tb"
DOCKER_HOST: "unix:///var/run/docker.sock"
run: |
pytest tests/tests.py -v -s
- name: Deployment check
run: tb --cloud --host ${{ env.TINYBIRD_HOST }} --token ${{ env.TINYBIRD_TOKEN }} deploy --check
Wrapping Up
We've covered how to set up a complete local development environment for real-time analytics with Kafka and Tinybird. The key takeaways are:
Local-First Development: By running your entire analytics stack locally, you can iterate faster and test changes instantly without waiting for deployments.
Docker Compose Setup: Using Redpanda (a lightweight Kafka-compatible broker) and Tinybird Local, you can create a development environment that mirrors production while being much faster to work with.
End-to-End Testing: The custom CI workflow ensures that not only your SQL logic but also your Kafka-Tinybird connection is working correctly.
This setup is particularly valuable for product engineers who need to build and test real-time analytics features quickly. By following these practices, you can maintain the same development velocity you're used to in your application code while building robust analytics features.
Remember, the goal is to make analytics development feel as natural and fast as your regular application development workflow. With this setup, you're well-equipped to build and test real-time analytics features with confidence.