Stream from Google Pub/Sub

In this guide you'll learn how to send data from Google Pub/Sub to Tinybird.


Tinybird is a Google Cloud partner & supports integrating with Google Cloud services.

Google Pub/Sub is often used as a messaging middleware that decouples event stream sources from the end destination. Pub/Sub streams are usually consumed by Google's DataFlow which can send events on to destinations such as BigQuery, BigTable, or Google Cloud Storage.

This DataFlow pattern works with Tinybird too, however, Pub/Sub also has a feature called Push subscriptions which can forward messages directly from Pub/Sub to Tinybird. The following guide steps use the subscription approach.

Push messages from Pub/Sub to Tinybird

1. Create a Pub/Sub topic

Start by creating a topic in Google Pub/Sub following the Google Pub/Sub documentation.

2. Create a push subscription

Next, create a Push subscription in Pub/Sub.

Set the Delivery Type to Push. In the Endpoint URL field, ue the following snippet (which uses the Tinybird Events API) and pass your own Token, which you can find in your Workspace > Tokens:

Endpoint URL<your-token>

You don't need to create the Data Source in advance, it will automatically be created for you. This snippet also includes the wait=true parameter, which is explained in the Events API docs.

Add Push subscription to Pub/Sub topic

3. Send sample messages

Generate and send some sample messages to test your connection. If you don't have your own messages to test, use this this script.

4. Decode message data

You'll start to see messages arrive in the Data Source in Tinybird.

By default, a Pub/Sub Push subscription encodes the message body in base64.

Raw Pub/Sub messages

To decode the message body, you can use the base64Decode() function. You can then start to extract values from the JSON.

Below is an example Tinybird Pipe that decodes the message body, extracts values from the JSON message, and writes to a Materialized View:

Decode message and parse JSON with Materialized View
NODE events_payload_decoded
      message_message_id as message_id,
      base64Decode(message_data) as message_data
    FROM events_demo

NODE events_payload_1
      CAST(JSONExtractString(message_data, 'timestamp'), 'DateTime64') as payload_timestamp,
      JSONExtractString(message_data, 'event') as payload__event,
      JSONExtractInt(message_data, 'product_id') as payload__product_id,
      JSONExtractString(message_data, 'url') as payload__url,
      JSONExtractString(message_data, 'browser') as payload__browser,
      JSONExtractString(message_data, 'OS') as payload__os,
      JSONExtractString(message_data, 'cart_id') as payload__cart_id

TYPE materialized
DATASOURCE events_payload_1_mv
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(message_publish_time)"
ENGINE_SORTING_KEY "message_publish_time, payload__browser, payload__os, payload__cart_id"

And that's it!

Next steps