Change Data Capture (CDC) is a design pattern that allows you to track and capture change streams from a source data system so that downstream systems can efficiently process these changes. In contrast to Extract, Transform, and Load (ETL) workflows, it can be used to update data in real-time or near real-time between databases, data warehouses, and other systems. The changes include inserts, updates, and deletes, and are captured and delivered to the target systems without requiring the source database to be queried directly.
In this post, you'll learn how to implement a real-time change data capture pipeline on change data in MongoDB.
In this blog post, I'll describe how to implement a real-time change data capture (CDC) pipeline on changes in MongoDB, using both Confluent and Tinybird.
How to work with MongoDB change streams in real-time
In this tutorial, I'll be using Confluent to capture change streams from MongoDB, and Tinybird to analyze MongoDB change streams using it's native Kafka Connector. Why Tinybird? For real-time analytics on change data, Tinybird serves as an ideal data sink. While MongoDB excels at operational workloads, ClickHouse® significantly outperforms MongoDB for analytical queries, making CDC pipelines an effective way to combine the strengths of both databases.
This is an alternative approach to using Debezium and Debezium’s MongoDB Connector, a popular open source framework for change data capture. It is a perfectly viable solution thanks to its MongoDB CDC connector, but this guide provides an alternative for real-time analytics use cases.
Tinybird is the perfect data sink when you want to run real-time analytics over change data in MongoDB.
With Tinybird, you can transform, aggregate, and filter MongoDB changes as they happen and expose them via high-concurrency, low-latency APIs. Using Tinybird with your CDC data offers several benefits:
- Real-Time Analytics: Tinybird processes MongoDB's oplog to provide real-time analytics on data changes.
- Data Transformation and Aggregation: With SQL Pipes, Tinybird enables real-time shaping and aggregation of the incoming MongoDB change data, a critical feature for handling complex data scenarios.
- High-Concurrency, Low-Latency APIs: Tinybird empowers you to publish your data transformations as APIs that can manage high concurrency with minimal latency, essential for real-time data interaction.
- Operational Intelligence: Real-time data processing allows you to gain operational intelligence, enabling proactive decision-making and immediate response to changing conditions in your applications or services.
- Event-Driven Architecture Support: Tinybird's processing of the oplog data facilitates the creation of event-driven architectures, where MongoDB database changes can trigger business processes or workflows.
- Efficient Data Integration: Rather than batching updates at regular intervals, Tinybird processes and exposes changes as they occur, facilitating downstream system synchronization with the latest data.
- Scalability: Tinybird's ability to handle large data volumes and high query loads ensures that it can scale with your application, enabling the maintenance of real-time analytics even as data volume grows.
How does MongoDB CDC work?
CDC with MongoDB works primarily through the oplog, a special capped collection in MongoDB that logs all operations modifying the data stored in your databases.
When a change event such as an insert, update, or delete occurs in your MongoDB instance, the change is recorded in the oplog. This log is part of MongoDB's built-in replication mechanism and it maintains a rolling record of all data-manipulating operations.
Changes in MongoDB are recorded in its oplog, a built-in replication mechanism offered by MongoDB.
CDC processes monitor this oplog, capturing the changes as they occur. These changes can then be propagated to other systems or databases, ensuring they have near real-time updates of the data.
In the context of MongoDB Atlas and a service like Confluent Kafka, MongoDB Atlas runs as a replica set and is configured to generate an oplog. A connector (like the MongoDB Source Connector) is then used to pull the changes from MongoDB's oplog and stream these changes to Kafka topics. From there, these changes can be further processed or streamed to other downstream systems as per your application requirements.

How to set up CDC with MongoDB, Confluent Connect, and Tinybird
Let's create a CDC pipeline using MongoDB Atlas and Confluent Cloud.
Step 1: Configure MongoDB Atlas
- Create an account with MongoDB Atlas, and create your MongoDB database cluster in MongoDB Atlas if you don't have one yet.
- Ensure that your cluster runs as a replica set if you are running MongoDB locally. MongoDB Atlas clusters are replica sets by default, so if you create a cluster with Atlas, you shouldn’t have to do any extra configuration.
- If you are running MongoDB locally, check that MongoDB Atlas generates oplogs. The oplog (operations log) is a special capped collection that keeps a rolling record of metadata describing all operations that modify the data stored in your databases. MongoDB Atlas does this by default, so again, no extra configuration should be required.
Step 2: Setup Confluent Cloud
- Sign up for a Confluent Cloud account if you haven't done so already.
- Create a new environment and then create a new Kafka cluster within that environment.
- Take note of your Cluster ID, API Key, and API Secret. You'll need these later to configure your source and sink connectors.
Step 3: Configure MongoDB Connector for Confluent Cloud
- Install the Confluent Cloud CLI. Instructions for this can be found in the Confluent Cloud documentation.
- Authenticate the Confluent Cloud CLI with your Confluent Cloud account:
- Use your Confluent Cloud environment and Kafka cluster:
For more details on this, check out the Confluent Docs.
- Describe the MongoDB source connector plugin:
- Create the MongoDB source connector with the Confluent CLI. You'll need your MongoDB Atlas connection string for this. Create the connector:
Replace the placeholders with your actual values:
$ENV_ID: Your Confluent Cloud environment ID$CLUSTER_ID: Your Kafka cluster ID$API_KEY: Your Confluent Cloud API key$API_SECRET: Your Confluent Cloud API secret$SR_ARN: Your Schema Registry ARN (if using dedicated schema registry)$BOOTSTRAP_SERVER: The bootstrap server address from your Confluent Cloud cluster settings$CONNECTION_URI: Your MongoDB Atlas connection string$DB_NAME: Your MongoDB database name$COLLECTION: Your MongoDB collection name$TOPIC_PREFIX: Prefix for the Kafka topic name
Step 4: Install and Set up Tinybird
Before connecting Confluent to Tinybird, you need to install the Tinybird CLI and authenticate with your account. This guide uses the CLI for a hands-on technical workflow.
Install Tinybird CLI
First, install the Tinybird CLI on your machine:
```bash curl -L tinybird.co | sh ```This installs the Tinybird CLI tool and sets up Tinybird Local for local development. For more installation options, see the Tinybird installation guide.
Authenticate with Tinybird
Next, authenticate with your Tinybird account:
```bash tb login ```This command opens a browser window where you can sign in to Tinybird Cloud. If you don't have an account yet, you can create one during this process. After signing in, create a new workspace or select an existing one.
For a complete quick start guide, see Get started with Tinybird.
Step 5: Connect Confluent Cloud to Tinybird
With CDC events being published to a Kafka stream in Confluent, your next step is connecting Confluent and Tinybird. This is quite simple using the Tinybird Kafka Connector, which will securely enable Tinybird to consume messages from your Confluent topic stream and write them into a Data Source.
The Kafka Connector is fully managed and requires no additional tooling. Simply connect Tinybird to your Confluent Cloud cluster, choose a topic, and Tinybird will automatically begin consuming messages from Confluent Cloud. As part of the ingestion process, Tinybird will extract JSON event objects with attributes that are parsed and stored in its underlying real-time database.
Create a Kafka connection
First, create a connection to your Confluent Cloud Kafka cluster using the Tinybird CLI. You'll need the bootstrap server, API key, and secret that you saved from Step 2.
Run the following command to start the interactive wizard:
```bash tb connection create kafka ```The wizard will prompt you to enter:
- A name for your connection (e.g.,
kafka_connection) - The bootstrap server address from your Confluent Cloud cluster settings (e.g.,
pkc-xxxxx.us-east-1.aws.confluent.cloud:9092) - The API key you created in Step 2
- The API secret you created in Step 2
If your Confluent Cloud cluster uses a CA certificate, the wizard will also prompt you for the certificate path.
Create a Kafka Data Source
Now, create a Data Source that will consume messages from your Kafka topic. You can use the guided CLI process or create the files manually.
Option 1: Use the guided CLI process (recommended)
Run the following command to start the guided process:
```bash tb datasource create --kafka ```The CLI will prompt you to:
- Select or enter the connection name (use the name you created above, e.g.,
kafka_connection) - Enter the Kafka topic name (this is the topic name from Step 3, with the prefix you configured)
- Enter a consumer group ID (use a unique name, e.g.,
mongodb_cdc_consumer) - Choose the offset reset behavior (
earliestto read from the beginning, orlatestto read only new messages)
Option 2: Manually create the Data Source files
Alternatively, you can manually create a .datasource file. First, create the connection file if you haven't already. Create a file named connections/kafka_connection.connection:
Then, create a Data Source file (e.g., datasources/mongodb_cdc.datasource) that references this connection. Here's an example that defines a Tinybird Data Source to hold the change events from your MongoDB collection. In your case, the SCHEMA should match the data in your Kafka topic, which includes the fields from your MongoDB documents. Use JSONPath expressions to extract specific fields from the MongoDB CDC events into separate columns:
Replace mongodb_cdc.users with the actual topic name from Step 3 (with the prefix you configured). Adjust the schema fields to match the structure of your MongoDB documents. The __timestamp column is automatically added by Tinybird and represents when the event was ingested.
Deploy the Data Source
After creating your connection and Data Source files, deploy them to Tinybird Cloud:
```bash tb --cloud deploy ```You can also validate the setup before deploying by running:
```bash tb --cloud deploy --check ```This will verify that Tinybird can connect to your Kafka broker with the provided credentials.
Once deployed, Tinybird will automatically begin consuming messages from your Confluent topic, and you'll start seeing MongoDB change events stream into your Data Source as changes are made to the source data system.
Step 6: Handle Deduplication for CDC at Scale
When implementing CDC at scale, deduplication is essential. MongoDB change streams can produce duplicate events due to network retries, connector restarts, or Kafka consumer rebalancing. Without proper deduplication, you'll have inconsistent analytics and incorrect aggregations.
There are several strategies for handling deduplication in Tinybird:
- ReplacingMergeTree Engine: Use Tinybird's
ReplacingMergeTreeengine to automatically deduplicate rows based on a primary key. This is ideal when you have a unique identifier (like a document ID) and a timestamp or version field that indicates the latest state. - Lambda Architecture: Implement a Lambda Architecture pattern where you maintain both a real-time stream and a batch layer. The batch layer provides the source of truth, while the stream layer provides low-latency updates. This approach is particularly effective for handling late-arriving data and ensuring eventual consistency.
- Query-time Deduplication: Use SQL functions like
argMaxto deduplicate at query time. This approach is flexible but can impact query performance on large datasets.
For detailed guidance on implementing these strategies, see:
- Deduplication Strategies: Comprehensive guide on deduplication techniques in Tinybird
- Lambda Architecture: Guide on implementing Lambda Architecture patterns for real-time analytics
Step 7: Start building real-time analytics with Tinybird
Now your CDC data pipeline should be up and running, capturing changes from your MongoDB Atlas database, streaming them into Kafka on Confluent Cloud, and then sinking them into a real-time, analytical datastore on Tinybird’s real-time data platform.
You can now query, shape, join, and enrich your MongoDB CDC data with SQL Pipes and instantly publish your transformations as high-concurrency, low-latency APIs to power your next use case.
For example, create a Pipe file (e.g., pipes/get_mongodb_changes.pipe) to query your MongoDB CDC data:
Deploy your Pipe to Tinybird Cloud:
```bash tb --cloud deploy ```After deployment, Tinybird automatically creates API endpoints for your Pipes. You can access your endpoint using the token you created. Here's an example of how to call the endpoint:
```bash curl "https://api.tinybird.co/v0/pipes/get_mongodb_changes.json?token=YOUR_TOKEN" ```The endpoint returns data in JSON format by default. You can also request other formats:
.csvfor CSV format.ndjsonfor newline-delimited JSON.parquetfor Parquet format
Example response:
```json { "data": [ { "_id": "507f1f77bcf86cd799439011", "name": "John Doe", "email": "john.doe@example.com", "age": 30, "city": "San Francisco", "created_at": "2023-08-15 10:30:00", "updated_at": "2023-08-15 10:30:00" } ], "rows": 1, "statistics": { "elapsed": 0.001, "rows_read": 1, "bytes_read": 256 } } ```Wrap Up:
Change Data Capture (CDC) is a powerful pattern that captures data changes and propagate them in real-time or near real-time between various systems. Using MongoDB as the source, changes are captured through its operations log (oplog) and propagated to systems like Confluent Kafka and Tinybird using a connector.
This setup enhances real-time data processing, reduces load on the source system, and maintains data consistency across platforms, making it vital for modern data-driven applications. The post walked through the steps of setting up a CDC pipeline using MongoDB Atlas, Confluent Cloud, and Tinybird, providing a scalable solution for handling data changes and powering real-time analytics.
Resources:
- MongoDB Atlas Documentation: A comprehensive guide on how to use and configure MongoDB Atlas, including how to set up clusters.
- Confluent Cloud Documentation: Detailed information on using and setting up Confluent Cloud, including setting up Kafka clusters and connectors.
- MongoDB Connector for Apache Kafka: The official page for the MongoDB Connector on the Confluent Hub. Provides in-depth documentation on its usage and configuration.
- Kafka Connector Documentation: Guide on setting up and using Tinybird's Kafka connector to ingest data from Kafka topics.
- Deduplication Strategies: Comprehensive guide on implementing deduplication strategies for real-time data pipelines.
- Lambda Architecture: Guide on implementing Lambda Architecture patterns for handling real-time and batch data processing.
- Tinybird Documentation: A guide on using Tinybird, which provides tools for building real-time analytics APIs.
- Change Data Capture (CDC) Overview: A high-level overview of CDC on Wikipedia, providing a good starting point for understanding the concept.
- Apache Kafka: A Distributed Streaming System: Detailed information about Apache Kafka, a distributed streaming system that's integral to the CDC pipeline discussed in this post.
FAQs
- What is Change Data Capture (CDC)? CDC is a design pattern that captures changes in data so that downstream systems can process these changes in real-time or near real-time. Changes include inserts, updates, and deletes.
- Why is CDC useful? CDC provides several advantages such as enabling real-time data processing, reducing load on source systems, maintaining data consistency across platforms, aiding in data warehousing, supporting audit trails and compliance, and serving as a foundation for event-driven architectures.
- How does CDC with MongoDB work? MongoDB uses an oplog (operations log) to record data manipulations like inserts, updates, and deletes. CDC processes monitor this oplog and capture the changes, which can then be propagated to other systems or databases.
- What is MongoDB Atlas? MongoDB Atlas is a fully managed cloud database service provided by MongoDB. It takes care of the complexities of deploying, managing, and healing your deployments on the cloud service provider of your choice.
- What is Confluent Cloud? Confluent Cloud is a fully managed, event streaming platform powered by Apache Kafka. It provides a serverless experience with elastic scalability and delivers industry-leading, real-time event streaming capabilities with Apache Kafka as-a-service.
- What is Tinybird? Tinybird is a real-time data platform that helps developers and data teams ingest, transform, and expose real-time datasets through APIs at any scale.
- Can I use CDC with other databases besides MongoDB? Yes, CDC can be used with various databases that support this mechanism such as PostgreSQL, MySQL, SQL Server, Oracle Database, and more. The specifics of implementation and configuration may differ based on the database system.
- How secure is data during the CDC process? The security of data during the CDC process depends on the tools and protocols in place. By using secure connections, authenticated sessions, and data encryption, data can be securely transmitted between systems. Both MongoDB Atlas and Confluent Cloud provide various security features to ensure the safety of your data.
