Best Practices for Copy Pipes

Easy

Round datetime filters to your schedule

Queries in Copy Pipes should always have a time window filter that aligns with the execution schedule, for example, a Copy Pipe that runs once a day typically has a filter that filters for yesterday's data, and an hourly schedule usually means a filter to get results from the previous hour.

It is important to remember that a Copy Pipe job is not guaranteed to run exactly at the scheduled time. For example, if a Copy Pipe is scheduled to run at 16:00:00, the job could run at 16:00:01 or even 16:00:05. To account for a potential delay, we recommend that you round your time window filter to align with the schedule window.

For example, if your Copy Pipe is scheduled hourly, instead of writing:

SELECT *
FROM datasource
WHERE datetime >= now() - interval 1 hour
    AND datetime < now()

You should use toStartOfHour() to round the time filter to the hour:

SELECT *
FROM datasource
WHERE datetime >= toStartOfHour(now()) - interval 1 hour
    AND datetime < toStartOfHour(now())

Using this, if the Copy Pipe's execution is delayed, perhaps being triggered at 16:00:01, 17:00:30 and 18:0:002, you will still maintain consistent copies of data regardless of the delay, with no gaps or overlaps.

Account for late data in your schedule

There are many reasons why you might have late-arriving data, such as system downtime in your message queues or network outages. These are largely unavoidable and will occur at some point in your streaming journey. You should account for potential delays ahead of time.

When using Copy Pipes, we recommend that you give some headroom in your schedule to allow for any late data. How much headroom you give to your schedule is up to you, so you should carefully consider the SLAs both up and downstream of Tinybird, i.e., if your streaming pipeline has 5 minute downtime SLAs, then most of your late data should be less than 5 minutes. Similarly, consider if you have any SLAs from data consumers who are expecting timely data in Tinybird.

For example, if you schedule a Copy Pipe every 5 minutes (16:00, 16:05, 16:10...), there could be events with a timestamp of 15:59:59 that do not arrive in Tinybird until 16:00:01 (2 seconds late!) If the Copy Pipe executes at exactly 16:00:00, these events could be lost.

There are two ways to add headroom to your schedule.

Option 1: Delay the execution

The first option is to simply delay the schedule.

For example, if you want to create a Copy Pipe that creates 5 minute snapshots, you could delay the schedule by 1 minute, i.e., so that instead of running at 17:00, 17:05, 17:10, etc. it would instead run at 17:01, 17:06, 17:11.

To achieve this, you could use this cron expression: 1-59/5 * * * *

If you use this method, you must combine it with the advice from the first tip in this best practices guide to Round datetime filters to your schedule. For example:

SELECT *
FROM datasource
WHERE datetime >= toStartOfFiveMinutes(now()) - interval 5 minute
    AND datetime < toStartOfFiveMinutes(now())

Option 2: Filter the result with headroom

Another strategy is to keep your schedule as desired (17:00, 17:05, 17:10, etc.) but apply a filter in the query to add some headroom.

For example, you can move the copy window by 15 seconds:

WITH (SELECT toStartOfFiveMinutes(now()) - interval 15 second) AS snapshot
SELECT snapshot, *
FROM datasource
WHERE timestamp >= snapshot - interval 5 minute
    AND timestamp < snapshot

With this method, a Copy Pipe that executes at 17:00 will copy data from 16:54:45 to 16:59:45. At 17:05, it would copy data from 16:59:45 to 17:04:45, and so on.

It is worth noting that this can be confusing to data consumers who might notice that the data timestamps don't perfectly align with the schedule, so consider whether you'll need extra documentation.

Write a snapshot timestamp

There are many reasons why you might want to capture a snapshot timestamp, as it documents when a particular row was written. This can let you identify which execution of the Copy Pipe is responsible for which row, which is useful for debugging or auditing.

For example:

SELECT toStartOfHour(now()) as snapshot_id, *
FROM datasource
WHERE timestamp >= snapshot_id - interval 1 hour
    AND timestamp < snapshot_id

In this example, you're adding a new column at the start of the result which contains the rounded timestamp of the execution time. By applying an alias to this column, you can re-use it in the query as your rounded datetime filter, saving you a bit of typing.

Use parameters in your Copy Pipe

Copy Pipes can be executed following a schedule or on-demand. All the previous best practices in this guide are focused on scheduled executions.

But, what happens if you want to use the same Copy Pipe to do a backfill? For example, you want to execute the Copy Pipe only on data from last year, to fill in a gap behind your fresh data.

To do this, you can parameterize the filters. When you run an on-demand Copy Pipe with parameters, you can modify the values of the parameters before execution. Scheduled Copy Pipes with parameters will use the default value for any parameters. This means you can simply re-use the same Copy Pipe for your fresh, scheduled runs as well as any ad-hoc backfills.

The following example will create a Pipe with two Nodes that break up our Copy Pipe logic to be more readable.

In the first Node called date_params:

%
{% if defined(snapshot_id) %}
    SELECT parseDateTimeBestEffort({{String(snapshot_id)}}) as snapshot_id,
    snapshot_id - interval 5 minute as start
{% else %}
    SELECT toStartOfFiveMinutes(now()) as snapshot_id,
    snapshot_id - interval 5 minute as start
{% end %}

The date_params Node looks for a parameter called snapshot_id. If we encounter this parameter, we know that this is an on-demand execution, because a scheduled Copy Pipe will not be passed any parameters by the scheduler. The scheduled execution of the Copy Pipe will create a time filter based on now(). An on-demand execution of the Copy Pipe will use this snapshot_id parameter to create a dynamic time filter. In both cases, the final result of this Node is a time filter called snapshot_id.

In a second Node:

SELECT (SELECT snapshot_id FROM date_params) as snapshot_id, *
FROM datasource
WHERE timestamp >= (SELECT start FROM date_params)
    AND timestamp < snapshot_id

First, you select the result of the previous date_params Node, which is the snapshot_id time filter. We do not need to worry about whether this is a scheduled or on-demand execution here, as this has already been handled by the previous Node.

This also retrieves the other side of the time filter from the first Node with (SELECT start FROM date_params). This is not strictly needed but it's convenient so you don't have to write the - interval 5 minute in multiple places, making it easier to update in the future.

With this, the same Copy Pipe can perform both functions: being executed on a regular schedule to keep up with fresh data, and being executed on-demand when needed.