Schema management and evolution

This guide covers managing schemas for Kafka messages in Tinybird, including adding and modifying fields, handling nullable types, data type mapping between Kafka and ClickHouse®, and implementing schema evolution strategies.

Prerequisites

  • Understanding of Kafka message formats (JSON, Avro)
  • Basic knowledge of ClickHouse® data types
  • Access to Schema Registry (if using Avro or JSON Schema)

Supported serialization formats

Tinybird supports the following serialization formats:

  • JSON (json_without_schema) - Plain JSON messages
  • JSON Schema (json_with_schema) - JSON with Schema Registry
  • Avro (avro) - Avro with Schema Registry

Adding and modifying fields

Adding new fields

When adding new fields to your Kafka messages, the key is making them backward compatible. Old messages (without the field) should still work.

The pattern:

  1. Update your Data Source schema to include the new field
  2. Make it nullable if the field may be missing in older messages
  3. Provide default values in JSONPath expressions if needed

Example:

datasources/events_updated.datasource
SCHEMA >
    `order_id` String `json:$.order_id`,
    `customer_id` String `json:$.customer_id`,
    `order_total` Float64 `json:$.order_total`,
    `payment_method` Nullable(String) `json:$.payment_method`,  -- New field, nullable
    `data` String `json:$`

Why nullable? Old messages don't have payment_method, so it will be null. After all messages include it, you can make it non-nullable if needed.

For Avro/JSON Schema:

  • Add fields as optional (not in required array)
  • Provide default values in Schema Registry
  • Use nullable types in ClickHouse

Modifying existing fields

Modifying existing fields requires more care. Some changes are safe, others require migration.

Safe modifications:

  • Adding default values to existing fields
  • Making fields nullable (if they weren't before)
  • Widening types (Int32 → Int64, String → Nullable(String))

Example of safe modification:

-- Before: String field
`status` String `json:$.status`

-- After: Still String, but with default
`status` String `json:$.status DEFAULT 'pending'`

Unsafe modifications (require FORWARD_QUERY):

  • Changing field types (String → Int32)
  • Narrowing types (Int64 → Int32)
  • Removing fields

Example of type change:

-- Before
`count` Int32 `json:$.count`

-- After: Changed to Int64
`count` Int64 `json:$.count`

Use FORWARD_QUERY to convert existing data:

FORWARD_QUERY >
    SELECT
        *,
        toInt64(count) as count  -- Convert Int32 to Int64
    FROM previous_datasource

This migrates existing data while new messages use the new type.

Deploying changes:

  • Test locally: tb deploy
  • Deploy to production: tb --cloud deploy
  • Always test schema changes in development first

Nullable vs non-nullable

When to use nullable

Use Nullable() types when:

  • Fields may be missing in some messages
  • Fields can have null values
  • You're adding new optional fields
  • You want to handle schema evolution gracefully

Example:

SCHEMA >
    `order_id` String `json:$.order_id`,                    -- Required
    `email` Nullable(String) `json:$.email`,              -- Optional
    `preferences` Nullable(String) `json:$.preferences`, -- Optional
    `data` String `json:$`

When to use non-nullable

Use non-nullable types when:

  • Fields are always present in messages
  • You want to enforce data quality
  • Fields are required for your use case

Example:

SCHEMA >
    `order_id` String `json:$.order_id`,        -- Always present
    `timestamp` DateTime `json:$.timestamp`,  -- Always present
    `event_type` String `json:$.event_type`,  -- Always present
    `data` String `json:$`

Handling missing fields

If a field might be missing, provide a default value:

`status` String `json:$.status DEFAULT 'unknown'`
`count` Int32 `json:$.count DEFAULT 0`
`is_active` UInt8 `json:$.is_active DEFAULT 0`

Defaults handle missing fields gracefully without requiring nullable types.

Data type mapping

Kafka to ClickHouse® mapping

Kafka/JSON TypeClickHouse TypeNotes
stringStringText data
number (integer)Int32, Int64Use Int64 for large numbers
number (float)Float32, Float64Use Float64 for precision
booleanUInt8, Boolean0 or 1, or true/false
nullNullable(T)Wrap in Nullable
arrayArray(T)Specify element type
objectString (JSON) or Map(K, V)Store as JSON string or Map

Common type conversions

String to DateTime:

`timestamp` DateTime `json:$.timestamp`
-- Assumes ISO 8601 format: "2024-01-01T00:00:00Z"

String to number:

`price` Float64 `json:$.price`
-- JSON numbers are automatically converted

Array handling:

`tags` Array(String) `json:$.tags[:]`
-- For JSON arrays of strings

Nested objects:

`metadata` String `json:$.metadata`
-- Store nested object as JSON string, extract later with JSONExtract

Nested fields:

For nested JSON structures, use JSONPath to extract specific fields:

SCHEMA >
    `customer_id` String `json:$.customer.id`,
    `customer_email` String `json:$.customer.email`,
    `order_data` String `json:$.order.data`,
    `data` String `json:$`

Maps:

Use Map type for key-value structures:

`headers` Map(String, String) `json:$.headers`
-- For Kafka headers or metadata maps

Schema Registry integration

Avro schemas

When using Avro with Schema Registry:

  1. Register schema in Schema Registry
  2. Set format to avro in Data Source
  3. Configure Schema Registry URL in connection
KAFKA_VALUE_FORMAT avro
KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com

Schema evolution rules:

  • Add optional fields (with defaults)
  • Remove fields (backward compatible)
  • Change field types only if compatible
  • Don't change required fields to optional without defaults

JSON Schema

When using JSON Schema with Schema Registry:

KAFKA_VALUE_FORMAT json_with_schema
KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com

Best practices:

  • Use schema versioning
  • Test schema changes in development
  • Monitor schema compatibility
  • Add new properties without adding them to required

Schema evolution strategies

Backward compatibility

Ensure schema changes are backward compatible:

Safe changes:

  • Adding optional fields
  • Removing fields
  • Making fields nullable
  • Adding default values
  • Widening types (Int32 → Int64)

Breaking changes:

  • Removing required fields
  • Changing field types incompatibly
  • Renaming fields
  • Narrowing types (Int64 → Int32)

Versioning strategy

For breaking changes, use topic versioning:

orders-v1
orders-v2
orders-v3

Create separate Data Sources for each version, then merge in queries if needed. This is safer than trying to migrate in place.

Gradual rollout

When adding new fields:

  1. Add new field as nullable in Data Source
  2. Update producers to include new field
  3. Monitor for any issues
  4. Make field required once all messages include it (optional)

This approach minimizes risk and allows you to roll back if needed.

Monitoring schema changes

Detect schema mismatches

Query kafka_ops_log for deserialization errors:

SELECT
    timestamp,
    datasource_id,
    topic,
    msg,
    count(*) as error_count
FROM tinybird.kafka_ops_log
WHERE msg_type = 'warning'
  AND msg LIKE '%schema%'
  AND timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp, datasource_id, topic, msg
ORDER BY error_count DESC

This helps you catch schema evolution issues early.

Monitor quarantined messages

Check Quarantine Data Source for schema-related issues:

SELECT
    timestamp,
    count(*) as quarantined_count
FROM your_datasource_quarantine
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp
ORDER BY timestamp DESC

Quarantined messages often indicate schema mismatches or data quality issues.

Common pitfalls

Changing field types incompatibly

Problem: Changing count from String to Int32 breaks old messages.

Solution: Use FORWARD_QUERY to convert types, or add a new field and migrate gradually.

Removing required fields

Problem: Removing a field that old messages require causes deserialization failures.

Solution: Make the field optional first, wait for all consumers to update, then remove it.

Not handling nullable fields

Problem: Adding a new field as non-nullable breaks old messages that don't have it.

Solution: Always make new fields nullable initially. After all messages include it, you can make it non-nullable.

Ignoring Schema Registry compatibility

Problem: Schema changes that violate compatibility settings cause failures.

Solution: Check your compatibility mode (BACKWARD, FORWARD, FULL) and test schema changes before deploying.

Best practices

  1. Use nullable types for optional fields
  2. Provide default values for missing fields
  3. Test schema changes in development first with tb deploy
  4. Deploy to production using tb --cloud deploy after testing
  5. Monitor schema evolution using kafka_ops_log
  6. Use Schema Registry for Avro and JSON Schema
  7. Document schema changes in your team
  8. Version schemas for breaking changes
  9. Keep schemas backward compatible when possible
  10. Start with nullable when adding new fields

Troubleshooting schema issues

Type conversion failed

Solutions:

  1. Verify JSONPath expressions match message structure
  2. Check data types are compatible
  3. Use type conversion functions if needed
  4. Make fields nullable if data might be missing

Schema mismatch

Solutions:

  1. Review Quarantine Data Source for actual message format
  2. Update Data Source schema to match messages
  3. Check Schema Registry if using Avro/JSON Schema
  4. Verify schema evolution is backward compatible

Missing fields

Solutions:

  1. Make fields nullable
  2. Provide default values
  3. Update producers to include required fields
  4. Handle missing fields in queries
Updated