Tuesday, September 17, 2024
No menu items!
HomeDatabase ManagementStream data with Amazon DocumentDB and Amazon MSK using a Kafka connector

Stream data with Amazon DocumentDB and Amazon MSK using a Kafka connector

A common trend in modern application development and data processing is the use of Apache Kafka as a standard delivery mechanism for your data pipeline and fan-out approach. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available, and secure service that makes it simple for developers and DevOps managers to run applications on Apache Kafka in AWS without needing Apache Kafka infrastructure management expertise.

Document databases like Amazon DocumentDB (with MongoDB compatibility) are increasing in usage as developers and application owners use JSON-based datasets with their modern applications. Amazon DocumentDB is a scalable, durable, and fully managed database service for operating mission-critical MongoDB workloads. Increasingly, customers are using Amazon MSK with Amazon DocumentDB for various use cases.

In this post, we discuss how to run and configure the MongoDB Kafka connector to move data between Amazon DocumentDB and Amazon MSK for sink and source use cases.

Solution overview

Amazon DocumentDB can act as both the data source and data sink to Amazon MSK. The MongoDB Kafka connector can act in either use case to transfer data between Amazon DocumentDB and Amazon MSK.

Kafka Connect is an open-source component of Apache Kafka that solves the problem of connecting Apache Kafka to data stores. It provides a framework for deploying connectors such as the MongoDB Kafka connector for connecting with external systems such as databases, key-value stores, search indexes, and file systems.

Kafka Connect currently supports two modes:

Standalone –Work is performed in a single process
Distributed – Multiple workers, automatic balancing, and dynamic scaling of tasks

The balancing and scaling of the distributed mode offer fault tolerance for active tasks, configuration, and offset commit data, which isn’t provided in standalone mode. In this post, we configure and run the connector in distributed mode. In distributed mode, Kafka Connect also exposes the REST API interface to manage the connectors that we use in this post.

The following are example use cases in which you can use Amazon DocumentDB as a data store behind Amazon MSK:

In a large video streaming or flash sale event, the data generated relating to viewers, reactions, or a buyer’s clickstream can be fed to Amazon MSK as raw data. You can further stream this data to Amazon DocumentDB for downstream processing and aggregation.
For streaming of telemetry data from IoT devices, website hit data, or meteorological data, the data can be streamed into Amazon DocumentDB using the connector and then processed (such as aggregation or min/max calculation).
For any record replay or application recovery in the Amazon DocumentDB cluster, rather than restoring the whole backup, the application can replay specific item-level changes from Amazon MSK to the Amazon DocumentDB collection.

The following are example use cases in which you can send Amazon DocumentDB change streams to Amazon MSK:

In case of selective replication of collections from one Amazon DocumentDB cluster to another cluster or other data stores, you can use Amazon MSK as an intermediate layer.
Amazon DocumentDB offers a rich aggregation framework, but for advanced analytics and machine learning, you can create a data pipeline from Amazon DocumentDB to various other data stores. You can use Amazon MSK as an intermediate layer to modify and filter change events before loading them to the target data store.

In both use cases, you can use the Kafka connector to move the change streams from Amazon DocumentDB to Amazon MSK.

We divided this post into two main sections:

Amazon DocumentDB as a sink – In the first half of this post, we discuss data delivery to Amazon DocumentDB via Amazon MSK using the connector.
Amazon DocumentDB as a source – In the second half of this post, we cover pulling data from Amazon DocumentDB using the same connector and publishing it to a Kafka topic for a downstream Kafka consumer.

We also discuss automatic balancing and fault tolerance for active connector tasks, which distributed mode offers.

The following diagram illustrates the architecture and data flow.

Prerequisites

To follow along with this post, you need the following resources:

An Amazon DocumentDB cluster – You can use an existing cluster or create a new cluster. If creating a new cluster, verify that your instances are deployed to multiple Availability Zones through subnet group settings.

An Amazon MSK cluster – You can use an existing cluster or create a new one with the custom create method. The Amazon MSK cluster type should be provisioned. The cluster should be deployed to the same VPC as your Amazon DocumentDB cluster and configured with the same security group used for Amazon DocumentDB. Your cluster should also have the following configurations:
Create a custom configuration with auto.create.topics.enable=true. The following screenshot shows an example of a custom configuration called production-config.

Create three brokers (minimum) while configuring your Amazon MSK cluster, as shown in the following screenshot.

Use the custom config during your Amazon MSK cluster configuration.

Configure AWS Identity Access and Management (IAM) role-based authentication (with SSL). With IAM role-based authentication, TLS automatically gets enabled.

An Amazon EC2 instance – You can choose an Amazon Elastic Compute Cloud (Amazon EC2) instance or configure a new one. We use this EC2 instance for running containers and testing purposes. As a production best practice, you can deploy containers on Amazon Elastic Container Service (Amazon ECS), Amazon Elastic Kubernetes Service (Amazon EKS), or AWS Fargate for effectively managing the containers. Your instance should have the following configurations:
Instance class minimum t3.large.
Instance storage at least 10 GB.
Deployed in the same VPC of your Amazon DocumentDB cluster and Amazon MSK cluster with the same security group.
The instance security group should be configured to connect to and from the Amazon MSK cluster (port 9098) and Amazon DocumentDB cluster (port 27017).

A customer managed policy Create a customer managed policy using the following document for the Amazon MSK cluster. You need to update your Region and account id in the policy. The Region should be same as where you provisioned your Amazon DocumentDB cluster, Amazon MSK cluster, and EC2 instance.

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Sid”: “VisualEditor0”,
“Effect”: “Allow”,
“Action”: “kafka-cluster:*”,
“Resource”: “arn:aws:kafka:<region>:<account id>:*/*/*” } ] }

An IAM roleCreate an IAM role with the preceding policy and assign that to the EC2 instance.
Mongo shell to connect to the Amazon DocumentDB cluster – You can install the mongo shell on the EC2 instance. For instructions, refer to Install the mongo shell.
Packages to run Docker containers – Log in to the EC2 instance and run the following commands to install the Java, Docker, and docker-compose packages that you need to run Docker containers:

sudo yum install docker -y
sudo usermod -a -G docker ec2-user
newgrp docker
sudo systemctl enable docker.service
sudo systemctl start docker.service
sudo curl -L https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
sudo yum install -y java-11-amazon-corretto-headless
sudo yum install -y jq

A trust store for JVM –The Amazon DocumentDB cluster is by default SSL/TLS enabled and the Kafka connector runs with Java Virtual Machine (JVM), so you need to create a trust store with a password. For instructions, refer to Connecting Programmatically to Amazon DocumentDB. Create a local directory and copy your trust store file (called rds-truststore.jks). If you followed the steps to create the trust store correctly, the file is located in /tmp/certs .

mkdir -p ~/local_kafka/; cd ~/local_kafka/;cp /tmp/certs/rds-truststore.jks .

You will incur costs in your account related to the Amazon DocumentDB, Amazon MSK, and Amazon EC2 resources. You can use the AWS Pricing Calculator to estimate the cost.

Amazon DocumentDB as a sink

In this part of the post, we focus on the sink use case, as shown in the following diagram. We discuss how to create and run the connector (using Docker containers) and use Amazon DocumentDB as a sink database to move data from the Amazon MSK Kafka topic, which is generated by a Kafka producer.

Build and run the connector Docker container

To build and run our connector Docker container, complete the following steps:

On the Amazon MSK console, choose Clusters in the navigation pane.
Open your cluster.
Choose View client information.
Copy the private endpoint of the Amazon MSK bootstrap servers.
In your EC2 instance, create a new Dockerfile with the following contents using the vi editor. Update the Amazon MSK bootstrap servers and trust store password.

FROM amazonlinux:latest
ENV KAFKA_HOME /usr/local/kafka
ENV DOCDBJKSPASS <truststore_password>
ENV KAFKA_OPTS ” -Djavax.net.ssl.trustStore=/usr/local/kafka/rds-truststore.jks
-Djavax.net.ssl.trustStorePassword=${DOCDBJKSPASS}”
ENV BOOTSTRAP_SERVER <kafka_bootstarp_servers_with_ports>
ENV CONNECT_CLUSTER_GROUP_NAME docdb-kafka-connect-cluster1
EXPOSE 8083
USER root
RUN echo “Installing Java…”
&& yum update -y
&& yum install -y java-11-amazon-corretto-headless
&& yum install wget tar -y -q
&& echo “Installing Kafka Connect…”
&& wget https://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz -q
&& tar -xzf kafka_2.13-3.2.3.tgz
&& mv kafka_2.13-3.2.3 ${KAFKA_HOME}
&& cd ${KAFKA_HOME}
&& echo “Installing MongoDb Kafka Connector jar for DocumentDB…”
&& wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -q
&& cp mongo-kafka-connect-1.7.0-all.jar /usr/local/kafka/libs/
&& echo “Installing MSK IAM Authehtication jar…”
&& wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.3/aws-msk-iam-auth-1.1.3-all.jar -q
&& cp aws-msk-iam-auth-1.1.3-all.jar /usr/local/kafka/libs/

RUN echo “Configuring Kafka Connect..”
&& useradd -ms /bin/bash conuser
&& chown -R conuser:conuser $KAFKA_HOME
&& sed -i s/localhost:9092/${BOOTSTRAP_SERVER}/g ${KAFKA_HOME}/config/connect-distributed.properties
&& sed -i s/offset.storage.replication.factor=1/offset.storage.replication.factor=3/g ${KAFKA_HOME}/config/connect-distributed.properties
&& sed -i s/status.storage.replication.factor=1/status.storage.replication.factor=3/g ${KAFKA_HOME}/config/connect-distributed.properties
&& sed -i s/config.storage.replication.factor=1/config.storage.replication.factor=3/g ${KAFKA_HOME}/config/connect-distributed.properties
&& sed -i s/group.id=connect-cluster/group.id=${CONNECT_CLUSTER_GROUP_NAME}/g ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “topic.creation.enable=true” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts ${KAFKA_HOME}/kafka_iam_truststore.jks

RUN echo “Configuring SSL for DocumentDB…”
COPY rds-truststore.jks ${KAFKA_HOME}/rds-truststore.jks

RUN echo “Configuring SSL and IAM for MSK…”
&& echo “ssl.truststore.location=${KAFKA_HOME}/kafka_iam_truststore.jks” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “security.protocol=SASL_SSL” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “sasl.mechanism=AWS_MSK_IAM” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “producer.ssl.truststore.location=${KAFKA_HOME}/kafka_iam_truststore.jks” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “producer.security.protocol=SASL_SSL” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “producer.sasl.mechanism=AWS_MSK_IAM” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “consumer.ssl.truststore.location=${KAFKA_HOME}/kafka_iam_truststore.jks” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “consumer.security.protocol=SASL_SSL” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “consumer.sasl.mechanism=AWS_MSK_IAM” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;” >> ${KAFKA_HOME}/config/connect-distributed.properties
&& echo “consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler” >> ${KAFKA_HOME}/config/connect-distributed.properties

USER conuser

CMD ${KAFKA_HOME}/bin/connect-distributed.sh ${KAFKA_HOME}/config/connect-distributed.properties

In the Dockerfile, you build the connector from scratch (installing Java, installing Kafka Connect with the connector JAR, configuring IAM authentication, and so on).

We’re running the connector in distributed mode to provide fault tolerance. In distributed mode, you can start multiple worker processes using the same group.id config, and they              automatically coordinate to schedule running the connectors and tasks across the available workers. In the preceding Dockerfile, group.id is defined as docdb-kafka-connect-cluster1.

Create a Docker image:

docker build . -t docdbkafkaconnet:latest

For this post, we’re running containers with Docker Compose, a container orchestration framework that enables you to define and run the multiple containers. It runs the containers on a single host machine.

Create the Docker Compose file docker-compose.yaml with the following content:

version: “3.7”
services:
docdb_kafka_connect_worker:
image: docdbkafkaconnet

Run two containers of the docdbkafkaconnect image using Docker Compose:

docker-compose up -d –scale docdb_kafka_connect_worker=2

We get the following output:

[+] Running 2/2
⠿ Container local_kafka-docdb_kafka_connect_worker-1 Started 0.6s
⠿ Container local_kafka-docdb_kafka_connect_worker-2 Started 0.6s

These two containers run the connector worker in distributed mode. You can verify the containers’ running status using the following command:

docker-compose ps

We get the following output:

NAME COMMAND SERVICE STATUS PORTS
local_kafka-docdb_kafka_connect_worker-1 “/bin/sh -c ‘${KAFKA…” docdb_kafka_connect_worker running 8083/tcp
local_kafka-docdb_kafka_connect_worker-2 “/bin/sh -c ‘${KAFKA…” docdb_kafka_connect_worker running 8083/tcp

Get the IP address of these two running containers using the docker inspect command. You can get the container name from the previous step’s output.

docker inspect -f ‘{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}’ &<container name>

We get the following output:

docker inspect -f ‘{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}’ local_kafka-docdb_kafka_connect_worker-1
172.XX.XX.3
docker inspect -f ‘{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}’ local_kafka-docdb_kafka_connect_worker-2
172.XX.XX.2

Define two environment variables CONTAINER_IP1 and CONTAINER_IP2 to store the IP addresses of these running containers:

export CONTAINER_IP1=<ip_address_of_first_container>
export CONTAINER_IP2=<ip_address_of_second_container>

Use the REST API to check the health of the connectors. Kafka Connect supports a REST API interface for managing connectors. By default, this service runs on port 8083. You can use any container IP address as follows:

curl -X GET http://${CONTAINER_IP1}:8083/

This GET API call provides basic information about the Kafka Connect cluster, such as the version of the Kafka Connect worker that serves the REST request (including the git commit ID of the source code) and the Kafka cluster ID that it’s connected to. If you get error 404, it means the connector is still starting. You should wait until it returns the required information.

Because there is no connector configuration, the following REST API call to the connectors returns null:

curl -X GET http://${CONTAINER_IP1}:8083/connectors

Configure the Amazon DocumentDB sink connector

Now you need to configure the connector to read the data from the Amazon MSK topic and sync that to the target Amazon DocumentDB database.

Connector configurations are key-value mappings. In distributed mode, these are included in the JSON payload for the request that creates and configures the connector. You need to update the Amazon DocumentDB login name, password, cluster endpoint, and port of your cluster. You can get these values on the Connectivity & security tab on the Amazon DocumentDB console.

You can use any connector IP address for the REST API call. See the following code:

curl -X POST
-H “Content-Type: application/json”
–data ‘
{“name”: “documentdb-sink”,
“config”: {
“connector.class”:”com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:1,
“topics”:”documentdb_topic”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: false,
“connection.uri”:”mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false”,
“database”:”sinkdatabase”,
“collection”:”sinkcollection”
}
}

http://${CONTAINER_IP1}:8083/connectors -w “n”

The preceding data payload contains the following connector configuration details in JSON format:

name – The unique name for the connector. The connector name is documentdb-sink for this configuration.
connector.class – The Java class for the connector. It’s the class responsible for moving data from Kafka.
tasks.max – The maximum number of tasks that should be created for this connector.
topics – The list of Kafka topics that this sink connector watches. The topic name is documentdb_topic.
key.converter – The converter class that instructs the connector how to translate the key from Kafka serialized format. We use the string class converter.
value.converter – The converter class that instructs the connector how to translate the value from Kafka serialized format. We have JSON data in our Kafka topic, so we configure Kafka Connect to use the JSON converter.
value.converter.schemas.enable – By default, the JSON converter is going to expect a JSON schema, but we set it as false because there isn’t any schema.
connection-uri – Defines the endpoint to connect to the Amazon DocumentDB cluster. We use an endpoint with the SSL option.
database – The target Amazon DocumentDB database. We use the database name sinkdatabase.
collection – The collection name in the database to push the changes. The collection name is sinkcollection.

For complete details of the configurations, refer to All Sink Connector Configuration Properties.

Now you can check for the configured connector details using the REST API call, which returns the connector name documentdb-sink:

curl -X GET http://${CONTAINER_IP1}:8083/connectors

You can also see the documentdb-sink connector status using the REST API call as follows (the jq command helps to show the output prettier).

curl -X GET http://${CONTAINER_IP1}:8083/connectors/documentdb-sink/status |jq

We get the following output; the state shows as Running:

{
“name”: “documentdb-sink”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “172.XX.XX.2:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “172.XX.XX.2:8083”
}
],
“type”: “sink”
}

In this output, the sink connector is running on a container with IP address 172.XX.XX.2.

To check the config for a running documentdb-sink connector, use the following code:

curl -X GET http://${CONTAINER_IP1}:8083/connectors/documentdb-sink/config|jq

We get the following output:

{
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“database”: “sinkdatabase”,
“tasks.max”: “1”,
“topics”: “documentdb_topic”,
“value.converter.schemas.enable”: “false”,
“connection.uri”: “mongodb://XXX:XXX@XXX:XXX/?ssl=true&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false”,
“name”: “documentdb-sink”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“collection”: “sinkcollection”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}

Test the MongoDB Kafka connector with Amazon DocumentDB as sink

To test the connector, start a Kafka producer to push the changes to the Kafka topic documentdb_topic. The Kafka connector reads the details from this topic and puts the details in Amazon DocumentDB based on the configuration.

To run the local Kafka producer, you need to download the binary distribution of Apache Kafka and extract the archive in the local_kafka directory on the EC2 instance:

cd ~/local_kafka/
cp /usr/lib/jvm/java-11-amazon-corretto.x86_64/lib/security/cacerts kafka_iam_truststore.jks
wget https://dlcdn.apache.org/kafka/3.2.3/kafka_2.13-3.2.3.tgz
tar -xzf kafka_2.13-3.2.3.tgz
ln -sfn kafka_2.13-3.2.3 kafka

To use IAM to authenticate with the MSK cluster, download the Amazon MSK Library for IAM and copy to the local Kafka library directory as shown in the following code. For complete instructions, refer to Configure clients for IAM access control.

wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.3/aws-msk-iam-auth-1.1.3-all.jar
cp aws-msk-iam-auth-1.1.3-all.jar kafka/libs

We use the latest version of Kafka as of the publishing of this post, 3.2.3.

In the ~/local_kafka/kafka/config/ directory, create a client-config.properties file to configure a Kafka client to use IAM authentication for the Kafka console producer and consumers:

ssl.truststore.location=/home/ec2-user/local_kafka/kafka_iam_truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

Define the BOOTSTRAP_SERVERS environment variable to store the bootstrap servers of the Amazon MSK cluster and locally install Kafka in the path environment variable:

export BOOTSTRAP_SERVERS=<kafka_bootstarp_serverswithports>
export PATH=$PATH:/home/ec2-user/local_kafka/kafka_2.13-3.2.3/bin

Run the Kafka console producer to write into the Amazon MSK topic documentdb_topic and submit the valid JSON documents {“name”:”DocumentDB NoSQL”} and {“test”:”DocumentDB Sink Connector”}:

cd ~/local_kafka/kafka/config
kafka-console-producer.sh –bootstrap-server $BOOTSTRAP_SERVERS –producer.config client-config.properties –topic documentdb_topic
{“name”:”DocumentDB NoSQL”}
{“test”:”DocumentDB Sink Connector”}

Open a second terminal and connect to the Amazon DocumentDB cluster using the mongo shell. The preceding two JSON documents should be part of the sinkcollection collection in sinkdatabase:

use sinkdatabase
db.sinkcollection.find()

We get the following output:

{ “_id” : ObjectId(“62c3cf2ec3d9010274c7a37e”), “name” : “DocumentDB NoSQL” }
{ “_id” : ObjectId(“62c3d048c3d9010274c7a37f”), “test” : “DocumentDB Sink Connector” }

You should see the JSON document that we pushed using the console producer.

Amazon DocumentDB as the source

In this section, we discuss how to create and run the connector (using Docker containers) with the Kafka Connect framework, and use Amazon DocumentDB as the source database to move the collection changes to the Amazon MSK Kafka topic.

The following diagram illustrates this data flow.

We use the connector containers that we set up earlier.

Configure Amazon DocumentDB for a change stream

The connector reads changes from the source collection through a change stream cursor. The change streams feature in Amazon DocumentDB provides a time-ordered sequence of change events that occur within your collections.

For this post, we use the collection sourcecollection in the sourcedatabase database in our Amazon DocumentDB cluster.

Connect to the Amazon DocumentDB cluster and enable the change stream for collection sourcecollection:

use sourcedatabase
db.createCollection(“sourcecollection”)
db.adminCommand({modifyChangeStreams: 1,database: “sourcedatabase”,collection: “sourcecollection”, enable:true});

Configure the connector as an Amazon DocumentDB source connector

Now we need to configure the source connector to read the changes in the Amazon DocumentDB collection and store those changes in the Amazon MSK topic. The connector reads these changes from the Amazon DocumentDB change stream that we configured.

The connector configurations are key-value mappings. In distributed mode, these are included in the JSON payload for the request that creates and configures the connector. You need to update the Amazon DocumentDB login name, password, cluster endpoint, and port of your cluster. You can use any container IP address for the following REST API call.

Note that connection.uri is different than the previous sink use case. Don’t include the read preference setting as secondary in connection.uri, because Amazon DocumentDB only supports a change stream on the primary instance.

You can open a new terminal, or you can stop one of the earlier created terminals to run the following command:

curl -X POST
-H “Content-Type: application/json”
–data ‘
{“name”: “documentdb-source”,
“config”: {
“connector.class”:”com.mongodb.kafka.connect.MongoSourceConnector”,
“connection.uri”:”mongodb://<docdbloginname>:<docdbpassword>@<docdbclusterendpoint>:<docdbportnumber>/?ssl=true&replicaSet=rs0″,
“database”:”sourcedatabase”,
“collection”:”sourcecollection”,
“pipeline”:”[{“$match”: {“operationType”: “insert”}}, {$addFields : {“fullDocument.newfield”:”Testing DocumentDB Kafka Source Connecter”}}]”
}
}

http://${CONTAINER_IP1}:8083/connectors -w “n”

The preceding data payload contains the connector type and its properties:

name – The unique name for the connector. The connector name is documentdb-soutrce for this configuration.
connector.class – The Java class for the connector. It’s the class responsible for moving data from the Amazon DocumentDB collection to the Amazon MSK topic.
tasks.max – The maximum number of tasks that should be created for this connector.
connection-uri – The Amazon DocumentDB endpoint to connect to the Amazon DocumentDB cluster. We use an endpoint with the SSL option.
database – The source database. In this case, the database name is sourcedatabase.
collection – The collection in the database to watch the changes. The collection name is sourcecollection.
pipeline – The aggregation pipeline to add new fields in the document. With this config, we’re adding a field in the document, but it isn’t mandatory.

For complete details of the configurations, refer to All Source Connector Configuration Properties.

Check the configured connector details using the REST API call; it returns all the configured connectors including this new one. You can use any container IP address for the REST API call:

curl -X GET http://${CONTAINER_IP1}:8083/connectors

You will see a connector with the name documentdb-source apart from documentdb-sink.

You can also see the documentdb-sink connector status using the REST API call as follows:

curl -X GET http://${CONTAINER_IP1}:8083/connectors/documentdb-source/status|jq

We get the following output; the state shows as Running:

{
“name”: “documentdb-source”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “172.XX.XX.3:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “RUNNING”,
“worker_id”: “172.XX.XX.3:8083”
}
],
“type”: “source”
}

As already mentioned, the sink connector is running on a container with IP address 172.XX.XX.2. Now the source connector is on a different container with IP 172.XX.XX.3. In distributed mode, Kafka Connect automatically load balances tasks across different available containers (connector workers). In case of failures with the container, it automatically moves running tasks to another available container.

To check the config for a running documentdb-source connector, use the following code:

curl -X GET http://${CONTAINER_IP1}:8083/connectors/documentdb-source/config|jq

We get the following output:

{
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“pipeline”: “[{“$match”: {“operationType”: “insert”}}, {$addFields : {“fullDocument.newfield”:”Testing DocumentDB Kafka Source Connecter”}}]”,
“database”: “sourcedatabase”,
“connection.uri”: “mongodb://XXX:XXX@XXX:XXX/?ssl=true&replicaSet=rs0”,
“name”: “documentdb-source”,
“collection”: “sourcecollection”
}

Test the connector with Amazon DocumentDB as source

To test the connector, we insert data in the Amazon DocumentDB collection. The Kafka connector reads the inserted data using the collection change stream and writes that to the Kafka topic.

Open a new terminal or use an existing one and run the Kafka console consumer to read the details from the sourcecollection.sourcedatabase Kafka topic. If you run it on a new terminal, make sure to create the BOOTSTRAP_SERVERS environment variable.

cd ~/local_kafka/kafka/config
kafka-console-consumer.sh –bootstrap-server $BOOTSTRAP_SERVERS –consumer.config client-config.properties –topic sourcedatabase.sourcecollection –from-beginning

You get the following warning because the console consumer command creates a new topic called sourcedatabase.sourcecollection:

WARN [Consumer clientId=console-consumer, groupId=console-consumer-32474] Error while fetching metadata with correlation id 2 :
{sourcedatabase.sourcecollection=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

In a second terminal, add the record in sourcedatabase.sourceCollection of your Amazon DocumentDB cluster:

use sourcedatabase
db.sourcecollection.insert({“name”:”Amazon DocumentDB”})

Return to the first terminal, where the console consumer is reading from the Amazon MSK topic:

kafka-console-consumer.sh –bootstrap-server $BOOTSTRAP_SERVERS –consumer.config client-config.properties –topic sourcedatabase.sourcecollection –from-beginning
{“schema”:{“type”:”string”,”optional”:false},”payload”:”{“_id”: {“_data”: “0162c3ff0400000001010000000100006039”}, “operationType”: “insert”, “clusterTime”: {“$timestamp”: {“t”: 1657011972, “i”: 1}}, “ns”: {“db”: “sourcedatabase”, “coll”: “sourcecollection”}, “documentKey”: {“_id”: {“$oid”: “62c3ff045986fd12df47f0e6”}}, “fullDocument”: {“_id”: {“$oid”: “62c3ff045986fd12df47f0e6”}, “name”: “Amazon DocumentDB”, “newfield”: “Testing DocumentDB Kafka Source Connecter”}}”}

We can observe the insert operation made on the Amazon DocumentDB collection is available on the console consumer. Additionally, a new field has been added, with newfield as the key and Testing DocumentDB Kafka Source Connecter as the value.

We’re now able to capture changes in Amazon DocumentDB as the source database using the MongoDB Kafka connector by running connectors with Docker containers.

Cleanup

To clean up the resources you used in your account, delete them in the following order:

EC2 instance
IAM role and customer managed policy
Amazon MSK Kafka cluster
Amazon DocumentDB cluster

Conclusion

In this post, we discussed how to run and configure the MongoDB Kafka connector to move data between Amazon DocumentDB and Amazon MSK for different sink and source use cases. You can use this solution for a variety of use cases, such as creating pipelines for large video streaming or flash sale events, streaming telemetry data from IoT devices, collecting website hit data, replicating collections from Amazon DocumentDB to other data stores, and moving data for advanced analytics and machine learning.

We first showed you how to use the connector to stream data from Amazon MSK to Amazon DocumentDB, where Amazon DocumentDB acts as a sink. We also showed how to build and configure a connector Docker image, and run the connector containers in distributed mode. In the second half of this post, we showed you how to stream data from Amazon DocumentDB to Amazon MSK where Amazon DocumentDB acts as the source. By querying the connector status, we showed how in distributed mode, connectors provide automatic balancing and fault tolerance. We also discussed various configurations available with both use cases that you can adjust for your specific use case or workload requirement.

Do you have follow-up questions or feedback? Leave a comment. We’d love to hear your thoughts and suggestions.

About the Author

Anshu Vajpayee is a Senior DocumentDB Specialist Solutions Architect at Amazon Web Services(AWS). He has been helping customers to adopt NoSQL databases and modernize applications leveraging Amazon DocumentDB. Before joining AWS, he worked extensively with relational and NoSQL databases.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments