---
title: Schema management and evolution
meta:
  description: Complete guide to managing Kafka message schemas, including adding fields, handling nullable types, data type mapping, and schema evolution strategies.
---

# Schema management and evolution

This guide covers managing schemas for Kafka messages in Tinybird, including adding and modifying fields, handling nullable types, data type mapping between Kafka and ClickHouse®, and implementing schema evolution strategies.

## Prerequisites

- Understanding of Kafka message formats (JSON, Avro)
- Basic knowledge of ClickHouse® data types
- Access to Schema Registry (if using Avro or JSON Schema)

## Supported serialization formats

Tinybird supports the following serialization formats:

- **JSON** (`json_without_schema`) - Plain JSON messages
- **JSON Schema** (`json_with_schema`) - JSON with Schema Registry
- **Avro** (`avro`) - Avro with Schema Registry

## Adding and modifying fields

### Adding new fields

When adding new fields to your Kafka messages, the key is making them backward compatible. Old messages (without the field) should still work.

**The pattern:**

1. Update your Data Source schema to include the new field
2. Make it nullable if the field may be missing in older messages
3. Provide default values in JSONPath expressions if needed

**Example:**

```tb {% title="datasources/events_updated.datasource" %}
SCHEMA >
    `order_id` String `json:$.order_id`,
    `customer_id` String `json:$.customer_id`,
    `order_total` Float64 `json:$.order_total`,
    `payment_method` Nullable(String) `json:$.payment_method`,  -- New field, nullable
    `data` String `json:$`
```

Why nullable? Old messages don't have `payment_method`, so it will be `null`. After all messages include it, you can make it non-nullable if needed.

**For Avro/JSON Schema:**

- Add fields as optional (not in `required` array)
- Provide default values in Schema Registry
- Use nullable types in ClickHouse

### Modifying existing fields

Modifying existing fields requires more care. Some changes are safe, others require migration.

**Safe modifications:**

- Adding default values to existing fields
- Making fields nullable (if they weren't before)
- Widening types (Int32 → Int64, String → Nullable(String))

**Example of safe modification:**

```tb
-- Before: String field
`status` String `json:$.status`

-- After: Still String, but with default
`status` String `json:$.status DEFAULT 'pending'`
```

**Unsafe modifications (require FORWARD_QUERY):**

- Changing field types (String → Int32)
- Narrowing types (Int64 → Int32)
- Removing fields

**Example of type change:**

```tb
-- Before
`count` Int32 `json:$.count`

-- After: Changed to Int64
`count` Int64 `json:$.count`
```

Use FORWARD_QUERY to convert existing data:

```tb
FORWARD_QUERY >
    SELECT
        *,
        toInt64(count) as count  -- Convert Int32 to Int64
    FROM previous_datasource
```

This migrates existing data while new messages use the new type.

**Deploying changes:**

- Test locally: `tb deploy`
- Deploy to production: `tb --cloud deploy`
- Always test schema changes in development first

## Nullable vs non-nullable

### When to use nullable

Use `Nullable()` types when:

- Fields may be missing in some messages
- Fields can have `null` values
- You're adding new optional fields
- You want to handle schema evolution gracefully

**Example:**

```tb
SCHEMA >
    `order_id` String `json:$.order_id`,                    -- Required
    `email` Nullable(String) `json:$.email`,              -- Optional
    `preferences` Nullable(String) `json:$.preferences`, -- Optional
    `data` String `json:$`
```

### When to use non-nullable

Use non-nullable types when:

- Fields are always present in messages
- You want to enforce data quality
- Fields are required for your use case

**Example:**

```tb
SCHEMA >
    `order_id` String `json:$.order_id`,        -- Always present
    `timestamp` DateTime `json:$.timestamp`,  -- Always present
    `event_type` String `json:$.event_type`,  -- Always present
    `data` String `json:$`
```

### Handling missing fields

If a field might be missing, provide a default value:

```tb
`status` String `json:$.status DEFAULT 'unknown'`
`count` Int32 `json:$.count DEFAULT 0`
`is_active` UInt8 `json:$.is_active DEFAULT 0`
```

Defaults handle missing fields gracefully without requiring nullable types.

## Data type mapping

### Kafka to ClickHouse® mapping

| Kafka/JSON Type | ClickHouse Type | Notes |
|----------------|-----------------|-------|
| `string` | `String` | Text data |
| `number` (integer) | `Int32`, `Int64` | Use Int64 for large numbers |
| `number` (float) | `Float32`, `Float64` | Use Float64 for precision |
| `boolean` | `UInt8`, `Boolean` | 0 or 1, or true/false |
| `null` | `Nullable(T)` | Wrap in Nullable |
| `array` | `Array(T)` | Specify element type |
| `object` | `String` (JSON) or `Map(K, V)` | Store as JSON string or Map |

### Common type conversions

**String to DateTime:**

```tb
`timestamp` DateTime `json:$.timestamp`
-- Assumes ISO 8601 format: "2024-01-01T00:00:00Z"
```

**String to number:**

```tb
`price` Float64 `json:$.price`
-- JSON numbers are automatically converted
```

**Array handling:**

```tb
`tags` Array(String) `json:$.tags[:]`
-- For JSON arrays of strings
```

**Nested objects:**

```tb
`metadata` String `json:$.metadata`
-- Store nested object as JSON string, extract later with JSONExtract
```

**Nested fields:**

For nested JSON structures, use JSONPath to extract specific fields:

```tb
SCHEMA >
    `customer_id` String `json:$.customer.id`,
    `customer_email` String `json:$.customer.email`,
    `order_data` String `json:$.order.data`,
    `data` String `json:$`
```

**Maps:**

Use Map type for key-value structures:

```tb
`headers` Map(String, String) `json:$.headers`
-- For Kafka headers or metadata maps
```

## Schema Registry integration

### Avro schemas

When using Avro with Schema Registry:

1. Register schema in Schema Registry
2. Set format to `avro` in Data Source
3. Configure Schema Registry URL in connection

```tb
KAFKA_VALUE_FORMAT avro
KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com
```

**Schema evolution rules:**

- Add optional fields (with defaults)
- Remove fields (backward compatible)
- Change field types only if compatible
- Don't change required fields to optional without defaults

### JSON Schema

When using JSON Schema with Schema Registry:

```tb
KAFKA_VALUE_FORMAT json_with_schema
KAFKA_SCHEMA_REGISTRY_URL https://registry.example.com
```

**Best practices:**

- Use schema versioning
- Test schema changes in development
- Monitor schema compatibility
- Add new properties without adding them to `required`

## Schema evolution strategies

### Backward compatibility

Ensure schema changes are backward compatible:

**Safe changes:**

- Adding optional fields
- Removing fields
- Making fields nullable
- Adding default values
- Widening types (Int32 → Int64)

**Breaking changes:**

- Removing required fields
- Changing field types incompatibly
- Renaming fields
- Narrowing types (Int64 → Int32)

### Versioning strategy

For breaking changes, use topic versioning:

```
orders-v1
orders-v2
orders-v3
```

Create separate Data Sources for each version, then merge in queries if needed. This is safer than trying to migrate in place.

### Gradual rollout

When adding new fields:

1. Add new field as nullable in Data Source
2. Update producers to include new field
3. Monitor for any issues
4. Make field required once all messages include it (optional)

This approach minimizes risk and allows you to roll back if needed.

## Monitoring schema changes

### Detect schema mismatches

Query `kafka_ops_log` for deserialization errors:

```sql
SELECT
    timestamp,
    datasource_id,
    topic,
    msg,
    count(*) as error_count
FROM tinybird.kafka_ops_log
WHERE msg_type = 'warning'
  AND msg LIKE '%schema%'
  AND timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp, datasource_id, topic, msg
ORDER BY error_count DESC
```

This helps you catch schema evolution issues early.

### Monitor quarantined messages

Check Quarantine Data Source for schema-related issues:

```sql
SELECT
    timestamp,
    count(*) as quarantined_count
FROM your_datasource_quarantine
WHERE timestamp > now() - INTERVAL 1 hour
GROUP BY timestamp
ORDER BY timestamp DESC
```

Quarantined messages often indicate schema mismatches or data quality issues.

## Common pitfalls

### Changing field types incompatibly

**Problem:** Changing `count` from `String` to `Int32` breaks old messages.

**Solution:** Use `FORWARD_QUERY` to convert types, or add a new field and migrate gradually.

### Removing required fields

**Problem:** Removing a field that old messages require causes deserialization failures.

**Solution:** Make the field optional first, wait for all consumers to update, then remove it.

### Not handling nullable fields

**Problem:** Adding a new field as non-nullable breaks old messages that don't have it.

**Solution:** Always make new fields nullable initially. After all messages include it, you can make it non-nullable.

### Ignoring Schema Registry compatibility

**Problem:** Schema changes that violate compatibility settings cause failures.

**Solution:** Check your compatibility mode (`BACKWARD`, `FORWARD`, `FULL`) and test schema changes before deploying.

## Best practices

1. **Use nullable types** for optional fields
2. **Provide default values** for missing fields
3. **Test schema changes** in development first with `tb deploy`
4. **Deploy to production** using `tb --cloud deploy` after testing
5. **Monitor schema evolution** using `kafka_ops_log`
6. **Use Schema Registry** for Avro and JSON Schema
7. **Document schema changes** in your team
8. **Version schemas** for breaking changes
9. **Keep schemas backward compatible** when possible
10. **Start with nullable** when adding new fields

## Troubleshooting schema issues

### Type conversion failed

**Solutions:**

1. Verify JSONPath expressions match message structure
2. Check data types are compatible
3. Use type conversion functions if needed
4. Make fields nullable if data might be missing

### Schema mismatch

**Solutions:**

1. Review Quarantine Data Source for actual message format
2. Update Data Source schema to match messages
3. Check Schema Registry if using Avro/JSON Schema
4. Verify schema evolution is backward compatible

### Missing fields

**Solutions:**

1. Make fields nullable
2. Provide default values
3. Update producers to include required fields
4. Handle missing fields in queries

## Related documentation

- [Kafka connector documentation](/forward/get-data-in/connectors/kafka) - Main setup and configuration guide
- [Troubleshooting guide](../troubleshooting) - Schema-related error troubleshooting
- [Evolve Data Sources](/forward/test-and-deploy/evolve-data-source) - Managing schema changes
- [ClickHouse® data types](/sql-reference/data-types) - Available data types
