Ingest from AWS Kinesis

Intermediate

In this guide you'll learn how to send data from AWS Kinesis to Tinybird.

Introduction

If you have a Kinesis Data Stream that you want to send to Tinybird, it is pretty quick thanks to Kinesis Firehose.

Push Messages From Kinesis To Tinybird

In the following section we are going to show you how to integrate Kinesia with Tinybird using Firehose.

Step 1. Create a token with the right scope

Go to your workspace and create a token with the Create new Data Sources or append data to existing ones scope

Create a token with the right scope

Step 2. Create a new Data Stream

Start by creating a new Data Stream in AWS Kinesis. You can find the detailed documentation from AWS here.

Create a Kinesis Data Stream

Step 3. Create a Firehose Delivery Stream

Next, create a Kinesis Data Firehose Delivery Stream.

The Source should be set to Amazon Kinesis Data Streams and the Destination to HTTP Endpoint

Create a Delivery Stream

In the Destination Settings, set HTTP Endpoint URL to point to the Tinybird Events API.

https://api.tinybird.co/v0/events?name=<your_datasource_name>&wait=true&token=<your_token_with_DS_rights>

Note the example is for workspaces in the EU region. Replace https://api.tinybird.co by https://api.us-east.tinybird.co for the ones in US-East.
Also note we are including the wait=true parameter. You can learn more about it here.

You don't need to create the Data Source in advance, it will automatically be created for you.

Destination Settings

Step 3. Send sample messages and check that they arrive to Tinybird

If you don't have any active data stream, follow this python script to generate dummy data.

Back in Tinybird, you should see 3 columns filled with data in your datasource. timestamp and requestId are self explanatory, and your messages are in recods__data.

Firehose Data Source

Decode message data

Step 4. Decode message data

The records__data column contains an array of encoded messages.

In order to get one row per each element of the array we need to use the ARRAY JOIN Clause. Also we have to decode the messages with the base64Decode() function.

Now that the raw JSON is in a column, you can use JSONExtract functions to extract the desired fields:

Decoding messages

Some performance tips

It is highly recommended to persist the decoded and unrolled result in a different Data Source. You can do it with a Materialized View, a combination of a Pipe and a DS that leaves the transformed data into the destination Data Source as soon as new data arrives to the Firehose Data Source.

We also encourage not to store what you won't need, and in this example case we believe that some of the extra columns can be skipped.

The easiest way to prevent keeping more data than you need is to add a TTL to the Firehose Data Source.

Another alternative would have been to create the Firehose DS with a Null Engine. This way, data ingested there can be transformed and fill the destination DS without being persisted in the DS with the Null Engine.