Stream from AWS Kinesis

In this guide you'll learn how to send data from AWS Kinesis to Tinybird.

If you have a Kinesis Data Stream that you want to send to Tinybird, it should be pretty quick thanks to Kinesis Firehose. This page explains how to integrate Kinesis with Tinybird using Firehose.

1. Push messages From Kinesis To Tinybird

Create a token with the right scope

Go to your workspace and create a token with the Create new Data Sources or append data to existing ones scope

Create a token with the right scope

Create a new Data Stream

Start by creating a new Data Stream in AWS Kinesis (see the detailed AWS documentation for more information).

Create a Kinesis Data Stream

Create a Firehose Delivery Stream

Next, create a Kinesis Data Firehose Delivery Stream.

Se the Source to Amazon Kinesis Data Streams and the Destination to HTTP Endpoint.

Create a Delivery Stream

In the Destination Settings, set HTTP Endpoint URL to point to the Tinybird Events API.

https://api.tinybird.co/v0/events?name=<your_datasource_name>&wait=true&token=<your_token_with_DS_rights>

Note the example is for Workspaces in the EU region. Replace https://api.tinybird.co by https://api.us-east.tinybird.co for the ones in US-East. Additionally, note the wait=true parameter. You can learn more about it in the Events API docs.

You don't need to create the Data Source in advance, it will automatically be created for you.

Destination Settings

Send sample messages and check that they arrive to Tinybird

If you don't have any active data stream, follow this python script to generate dummy data.

Back in Tinybird, you should see 3 columns filled with data in your Data Source. timestamp and requestId are self explanatory, and your messages are in records\_\data.

Firehose Data Source

2. Decode message data

Decode message data

The records\_\data column contains an array of encoded messages.

In order to get one row per each element of the array, use the ARRAY JOIN Clause. You'll also need to decode the messages with the base64Decode() function.

Now that the raw JSON is in a column, you can use JSONExtract functions to extract the desired fields:

Decoding messages

Performance optimizations

It is highly recommended to persist the decoded and unrolled result in a different Data Source. You can do it with a Materialized View: A combination of a Pipe and a Data Source that leaves the transformed data into the destination Data Source as soon as new data arrives to the Firehose Data Source.

Don't store what you won't need; In this example, some of the extra columns could be skipped. Add a TTL to the Firehose Data Source to prevent keeping more data than you need.

Another alternative is to create the Firehose DS with a Null Engine. This way, data ingested there can be transformed and fill the destination DS without being persisted in the DS with the Null Engine.

Next steps