Schema management and evolution¶
This guide covers managing schemas for Kafka messages in Tinybird, including adding and modifying fields, handling nullable types, data type mapping between Kafka and ClickHouse®, and implementing schema evolution strategies.
Prerequisites¶
- Understanding of Kafka message formats (JSON, Avro)
- Basic knowledge of ClickHouse® data types
- Access to Schema Registry (if using Avro or JSON Schema)
Supported serialization formats¶
Tinybird supports the following serialization formats:
- JSON (
json_without_schema) - Plain JSON messages - JSON Schema (
json_with_schema) - JSON with Schema Registry - Avro (
avro) - Avro with Schema Registry
Adding and modifying fields¶
Adding new fields¶
When adding new fields to your Kafka messages, the key is making them backward compatible. Old messages (without the field) should still work.
The pattern:
- Update your Data Source schema to include the new field
- Make it nullable if the field may be missing in older messages
- Provide default values in JSONPath expressions if needed
Example:
datasources/events_updated.datasource
SCHEMA >
`order_id` String `json:$.order_id`,
`customer_id` String `json:$.customer_id`,
`order_total` Float64 `json:$.order_total`,
`payment_method` Nullable(String) `json:$.payment_method`, -- New field, nullable
`data` String `json:$`
Why nullable? Old messages don't have payment_method, so it will be null. After all messages include it, you can make it non-nullable if needed.
For Avro/JSON Schema:
- Add fields as optional (not in
requiredarray) - Provide default values in Schema Registry
- Use nullable types in ClickHouse
Modifying existing fields¶
Modifying existing fields requires more care. Some changes are safe, others require migration.
Safe modifications:
- Adding default values to existing fields
- Making fields nullable (if they weren't before)
- Widening types (Int32 → Int64, String → Nullable(String))
Example of safe modification:
-- Before: String field `status` String `json:$.status` -- After: Still String, but with default `status` String `json:$.status DEFAULT 'pending'`
Unsafe modifications (require FORWARD_QUERY):
- Changing field types (String → Int32)
- Narrowing types (Int64 → Int32)
- Removing fields
Example of type change:
-- Before `count` Int32 `json:$.count` -- After: Changed to Int64 `count` Int64 `json:$.count`
Use FORWARD_QUERY to convert existing data:
FORWARD_QUERY >
SELECT
*,
toInt64(count) as count -- Convert Int32 to Int64
FROM previous_datasource
This migrates existing data while new messages use the new type.
Deploying changes:
- Test locally:
tb deploy - Deploy to production:
tb --cloud deploy - Always test schema changes in development first
Nullable vs non-nullable¶
When to use nullable¶
Use Nullable() types when:
- Fields may be missing in some messages
- Fields can have
nullvalues - You're adding new optional fields
- You want to handle schema evolution gracefully
Example:
SCHEMA >
`order_id` String `json:$.order_id`, -- Required
`email` Nullable(String) `json:$.email`, -- Optional
`preferences` Nullable(String) `json:$.preferences`, -- Optional
`data` String `json:$`
When to use non-nullable¶
Use non-nullable types when:
- Fields are always present in messages
- You want to enforce data quality
- Fields are required for your use case
Example:
SCHEMA >
`order_id` String `json:$.order_id`, -- Always present
`timestamp` DateTime `json:$.timestamp`, -- Always present
`event_type` String `json:$.event_type`, -- Always present
`data` String `json:$`
Handling missing fields¶
If a field might be missing, provide a default value:
`status` String `json:$.status DEFAULT 'unknown'` `count` Int32 `json:$.count DEFAULT 0` `is_active` UInt8 `json:$.is_active DEFAULT 0`
Defaults handle missing fields gracefully without requiring nullable types.
Data type mapping¶
Kafka to ClickHouse® mapping¶
| Kafka/JSON Type | ClickHouse Type | Notes |
|---|---|---|
string | String | Text data |
number (integer) | Int32, Int64 | Use Int64 for large numbers |
number (float) | Float32, Float64 | Use Float64 for precision |
boolean | UInt8, Boolean | 0 or 1, or true/false |
null | Nullable(T) | Wrap in Nullable |
array | Array(T) | Specify element type |
object | String (JSON) or Map(K, V) | Store as JSON string or Map |
Common type conversions¶
String to DateTime:
`timestamp` DateTime `json:$.timestamp` -- Assumes ISO 8601 format: "2024-01-01T00:00:00Z"
String to number:
`price` Float64 `json:$.price` -- JSON numbers are automatically converted
Array handling:
`tags` Array(String) `json:$.tags[:]` -- For JSON arrays of strings
Nested objects:
`metadata` String `json:$.metadata` -- Store nested object as JSON string, extract later with JSONExtract
Nested fields:
For nested JSON structures, use JSONPath to extract specific fields:
SCHEMA >
`customer_id` String `json:$.customer.id`,
`customer_email` String `json:$.customer.email`,
`order_data` String `json:$.order.data`,
`data` String `json:$`
Maps:
Use Map type for key-value structures:
`headers` Map(String, String) `json:$.headers` -- For Kafka headers or metadata maps
Schema Registry integration¶
Avro schemas¶
When using Avro with Schema Registry:
- Register schema in Schema Registry
- Set format to
avroin Data Source - Configure Schema Registry URL in connection
KAFKA_VALUE_FORMAT avro KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com
Schema evolution rules:
- Add optional fields (with defaults)
- Remove fields (backward compatible)
- Change field types only if compatible
- Don't change required fields to optional without defaults
JSON Schema¶
When using JSON Schema with Schema Registry:
KAFKA_VALUE_FORMAT json_with_schema KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com
Best practices:
- Use schema versioning
- Test schema changes in development
- Monitor schema compatibility
- Add new properties without adding them to
required
Schema evolution strategies¶
Backward compatibility¶
Ensure schema changes are backward compatible:
Safe changes:
- Adding optional fields
- Removing fields
- Making fields nullable
- Adding default values
- Widening types (Int32 → Int64)
Breaking changes:
- Removing required fields
- Changing field types incompatibly
- Renaming fields
- Narrowing types (Int64 → Int32)
Versioning strategy¶
For breaking changes, use topic versioning:
orders-v1 orders-v2 orders-v3
Create separate Data Sources for each version, then merge in queries if needed. This is safer than trying to migrate in place.
Gradual rollout¶
When adding new fields:
- Add new field as nullable in Data Source
- Update producers to include new field
- Monitor for any issues
- Make field required once all messages include it (optional)
This approach minimizes risk and allows you to roll back if needed.
Monitoring schema changes¶
Detect schema mismatches¶
Query kafka_ops_log for deserialization errors:
SELECT
timestamp,
datasource_id,
topic,
msg,
count(*) as error_count
FROM tinybird.kafka_ops_log
WHERE msg_type = 'warning'
AND msg LIKE '%schema%'
AND timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp, datasource_id, topic, msg
ORDER BY error_count DESC
This helps you catch schema evolution issues early.
Monitor quarantined messages¶
Check Quarantine Data Source for schema-related issues:
SELECT
timestamp,
count(*) as quarantined_count
FROM your_datasource_quarantine
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp
ORDER BY timestamp DESC
Quarantined messages often indicate schema mismatches or data quality issues.
Common pitfalls¶
Changing field types incompatibly¶
Problem: Changing count from String to Int32 breaks old messages.
Solution: Use FORWARD_QUERY to convert types, or add a new field and migrate gradually.
Removing required fields¶
Problem: Removing a field that old messages require causes deserialization failures.
Solution: Make the field optional first, wait for all consumers to update, then remove it.
Not handling nullable fields¶
Problem: Adding a new field as non-nullable breaks old messages that don't have it.
Solution: Always make new fields nullable initially. After all messages include it, you can make it non-nullable.
Ignoring Schema Registry compatibility¶
Problem: Schema changes that violate compatibility settings cause failures.
Solution: Check your compatibility mode (BACKWARD, FORWARD, FULL) and test schema changes before deploying.
Best practices¶
- Use nullable types for optional fields
- Provide default values for missing fields
- Test schema changes in development first with
tb deploy - Deploy to production using
tb --cloud deployafter testing - Monitor schema evolution using
kafka_ops_log - Use Schema Registry for Avro and JSON Schema
- Document schema changes in your team
- Version schemas for breaking changes
- Keep schemas backward compatible when possible
- Start with nullable when adding new fields
Troubleshooting schema issues¶
Type conversion failed¶
Solutions:
- Verify JSONPath expressions match message structure
- Check data types are compatible
- Use type conversion functions if needed
- Make fields nullable if data might be missing
Schema mismatch¶
Solutions:
- Review Quarantine Data Source for actual message format
- Update Data Source schema to match messages
- Check Schema Registry if using Avro/JSON Schema
- Verify schema evolution is backward compatible
Missing fields¶
Solutions:
- Make fields nullable
- Provide default values
- Update producers to include required fields
- Handle missing fields in queries
Related documentation¶
- Kafka connector documentation - Main setup and configuration guide
- Troubleshooting guide - Schema-related error troubleshooting
- Evolve Data Sources - Managing schema changes
- ClickHouse® data types - Available data types