Recover data from Quarantine

In this guide you'll learn the basics about the Quarantine Data Source, and how to use it to detect and fix errors on any of your Data Sources.

The Quarantine Data Source

Check the Quarantine Data Source for more information if you are not familiar with the concept.

The Quarantine Data Source is named {datasource_name}_quarantine and can be queried using Pipes like a regular Data Source.

A practical example

This guide uses the Tinybird CLI, but all steps can be performed in the UI as well.

For this guide, we will use an NDJSON Data Source, with data that looks like this:

  "store_id": 1,
  "purchase": { "product_name": "shoes", "datetime": "2022-01-05 12:13:14" }

We'll start by generating a Data Source file from this JSON snippet, push the Data Source to Tinybird, and ingest the JSON as a single row.

The schema that is generated from the JSON will look like this:

You can check in the UI and see that your Data Source has been created and the row ingested.

Data Source details can be accessed from your Sidebar

Now, we will try to append some rows that don't match the Data Source schema. These rows will end up in the quarantine Data Source.

You can check in the UI and see that there is a notification warning you about quarantined rows.

Quarantine Data Source is always accessible (if it contains any rows) from the Data Source modal window

Check out the Log tab

In the Data Source view you'll find the Log tab, which shows you details about all operations performed on a Data Source. If you're following the steps of this guide, you should see a row with event_type as append and written_rows_quarantine as 2.

In the UI, you can review the rows that have been sent to quarantine by navigating to the quarantine Data Source page from the quarantine warning notificaiton.

Within the quarantine view you can see both, a summary of errors and the rows that have failed

In the Errors view you will see a summary of all the errors and the number of occurrences for each of those, so you can prioritize fixing the most common ones. In the Rows view, you will see all the rows that have failed, so you can further investigate why.

Fixing quarantine errors

There are generally three ways of fixing quarantine errors:

Modify your data producer

Usually, the best solution is to fix the problem at the source. This means updating the applications or systems that are producing the data, before they send it to Tinybird.

The benefit of this is that you don't need to do additional processing to normalize the data after it has been ingested, which helps to save cost and reduce overall latency. However, it can come at the cost of having to push changes into a production application, which can be complex or have side-affects on other systems.

Modify the Data Source schema

Often, the issue that causes a row to end up in quaratine is a mismatch of data types.

A simple solution is to modify the Data Source schema to accept the new type.

For example, if an application is starting to send integers that are too large for Int8, you might update the schema to use Int16.

Always avoid Nullable columns

Nullable columns can have significantly worse performance. Instead, send alternative values like 0 for any Int type, or an empty string for a String type.

Transform data with Pipes and Materialized Views

This is one of the most powerful capabilities of Tinybird.

If you are not able to modify the data producer, you can apply a transformation to the erroring columns at ingestion time and materialize the result into another Data Source.

You can read more about this in the Materialized Views docs.

Recovering rows from quarantine

The quickest way to recover rows from quarantine is to fix the cause of the errors and then re-ingest the data. However, that is not always possible.

You can recover rows from the quarantine using a Pipe and the Tinybird API:

Create a recovery pipe

You can create a Pipe to select the rows from the quarantine Data Source and transform them into the appropriate schema.

In the previous example, we received some rows where the purchase_product_name contained null or the store_id contained a String rather than an Int16.

Remember that quarantined columns are Nullable(String)

All columns in a quarantine Data Source are Nullable(), which means that you must use the coalesce() function if you want to transform them into a non-nullable type. In this example, we will use coalesce to set a default value of DateTime(0), '', or 0 for DateTime, String and Int16 types respectively.

Additionally, all columns in a quarantine Data Source are stored as String. This means that you must specifically transform any non-String column into it's desired type as part of the recovery pipe. In this example, we want to transform the purchase_datetime and store_id columns to DateTime and Int16 types respectively.

Lastly, any columns sent to quarantine that were already a String will be encased in double quotes "". This means that you may need to remove double quotes around them using substring().

The quarantine Data Source contains additional meta-columns c__error_column, c__error, c__import_id, and insertion_date with information about the errors and the rows, so you should not use SELECT * to recover rows from quarantine.

The following SQL transforms the quarantined rows from this example into the original Data Source schema:

            substring(purchase_datetime, 2, length(purchase_datetime) - 2)
   ) as purchase_datetime,
      substring(purchase_product_name, 2, length(purchase_product_name) - 2),
   ) as purchase_product_name,
            toInt16(substring(store_id, 2, length(store_id) - 2))
   ) as store_id
FROM ndjson_ds_quarantine
Recover endpoint

Just like with any other Pipe, you can publish the results of this recovery Pipe as an API Endpoint.

Ingest the fixed rows and truncate quarantine

You can then use the Tinybird CLI to append the fixed data back into the original Data Source, by hitting the API Endpoint published from the recovery Pipe.

The command will look like the following:

tb datasource append <datasource_name> <url>

To avoid dealing with JSONPaths, you can hit the recovery Pipe's CSV endpoint.

Here is an example of the complete command:

tb datasource append ndjson_ds<your_api_token>

You should check that your Data Source now has the fixed rows, either in the UI, or from the CLI using:

tb sql "select * from ndjson_ds"

Finally, you can truncate the quarantine Data Source to clear out the recovered rows, either in the UI, or from the CLI using:

tb datasource truncate ndjson_ds_quarantine --yes

You should see that your Data Source now has all of the rows, and the quarantine notification has disappeared.

Data Source with the recovered rows and truncated quarantine

If your quarantine has too many rows, you may need to add pagination based on the insertion_date and/or c__import_id columns.

If you are using a Kafka Data Source do not forget to add the Kafka metadata columns.

Recovering rows from quarantine with CI/CD

When you connect your Workspace to Git and it becomes read-only you want all your workflows to go through CI/CD. This is how you recover rows from quarantine in your data project using Git and automating the workflow.

Prototype the process in a Branch

This step is optional, but usually when you need to perform some change to your data project and it's read-only you create a new Branch and prototype the changes there to later bring them to Git.

The steps are:

  • Create an Branch
  • Ingest some file so there are rows in quarantine
  • Prototype a copy Pipe
  • Run it
  • Validate data is recovered

A practical example with Git

In this guide you can find a practical example on how to recover quarantine rows from Git using CI/CD.

In this case the data project is the Web Analytics Starter Kit, when some of the rows go to quarantine you receive an e-mail like this:

The issue is the timestamp column instead of being a DateTime is a String unix time, so rows cannot be properly ingested.

{"timestamp":"1697393030","session_id":"b7b1965c-620a-402a-afe5-2d0eea0f9a34","action":"page_hit","version":"1","payload":"{ \"user-agent\":\"Mozilla\/5.0 (Linux; Android 13; SM-A102U) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/106.0.5249.118 Mobile Safari\/537.36\", \"locale\":\"en-US\", \"location\":\"FR\", \"referrer\":\"https:\/\/\", \"pathname\":\"\/pricing\", \"href\":\"https:\/\/\/pricing\"}"}

To properly convert the timestamp values in quarantine to a DateTime you can build a copy Pipe like this:

NODE copy_quarantine
      toDateTime(fromUnixTimestamp64Milli(toUInt64(replaceAll(assumeNotNull(timestamp), '"', '')) * 1000)) timestamp,
      -- fields in an ndjson are saved in the quarantine with double quotes, we need to get rid of them
      replaceAll(assumeNotNull(session_id), '"', '') session_id,
      replaceAll(assumeNotNull(action), '"', '') action,
      replaceAll(assumeNotNull(version), '"', '') version,
      replaceAll(assumeNotNull(payload), '"', '') payload
   FROM analytics_events_quarantine

TARGET_DATASOURCE analytics_events

To test the changes you need to do a custom deployment like this:

# use set -e to raise errors for any of the commands below and make the CI pipeline to fail
set -e

tb datasource append analytics_events datasources/fixtures/analytics_events_errors.ndjson
tb deploy
tb pipe copy run analytics_events_quarantine_to_final --wait --yes
sleep 10

First you append a sample of the quarantined rows, then deploy the copy Pipe and finally run the copy operation.

Once changes have been deployed in a test Branch you have to write some data quality tests to validate the rows are effectively being copied:

- analytics_events_quarantine:
    max_bytes_read: null
    max_time: null
    sql: |
            count() as c
      FROM analytics_events_quarantine
            c <= 0

- copy_is_executed:
    max_bytes_read: null
    max_time: null
    sql: |
      SELECT count() c, sum(rows) rows
      FROM tinybird.datasources_ops_log
      WHERE datasource_name = 'analytics_events'
      AND event_type = 'copy'
      HAVING rows != 74 and c = 1

analytics_events_quarantine checks that effectively some of the rows are in quarantine while copy_is_executed tests that the rows in quarantine have been copied to the analytics_events Data Source.

For the last step, you need to deploy the Branch:

# use set -e to raise errors for any of the commands below and make the CI pipeline to fail
set -e

tb deploy
tb pipe copy run analytics_events_quarantine_to_final --wait

You can now merge the Pull Request, the copy Pipe will be deployed to the Workspace and the copy operation will be executed ingesting all rows in quarantine.

After that you can optionally truncate the quarantine Data Source like this tb datasource truncate analytics_events_quarantine.

This is a working Pull Request with all the steps mentioned above.