Nov 28, 2024
Posted by
Mathis Van Eetvelde
Apache Kafka Connect is an ecosystem of pre-written and maintained Kafka Producers (source connectors) and Kafka Consumers (sink connectors) for various other data products and platforms like databases and message brokers. This allows you to easily build data pipelines without having to write and test your own consumers and producers.
There are two distinct types of connectors:
But the real benefit comes from the fact that you are not limited to a one-to-one relationship between producers and consumers. You can have multiple connectors act on the same message. Maybe your message is flowing between microservices, but you also want to store these messages in S3 and a database but also send it to another message broker. The sky's the limit when it comes to building data pipelines using Apache Kafka Connect.
When I first started learning about Kafka and Kafka Connect, my biggest grievance was that there were almost no beginner-friendly end-to-end tutorials on properly setting up Kafka Connect for connectors that were more complicated than a local file sink. Because I had very limited Java experience, the ecosystem was quite daunting to wrap my head around, which made understanding and installing plugins all the more difficult.
My goal for this tutorial is to clearly explain every step to set up a JDBC Sink connector that streams data from a Kafka topic into a Timescale database without any guesswork. If you aren’t fond of blog posts and would rather just dive into the code, you can find the full shell script with all the necessary steps and commands here.
This short tutorial will show you how to set up Kafka and Kafka Connect to stream data from a Kafka topic into a Timescale database.
Because Kafka is written almost entirely in Java, you need to install the headless Java runtime.
I am using Ubuntu 20.04, so throughout this blog post, I will use the apt package manager to install packages.
If you are on a different operating system or Linux distribution, I recommend reading the official Java/Kafka documentation for installation instructions.
sudo apt update
sudo apt install openjdk-11-jre-headless
After installing a Java runtime, it’s time to install Kafka!
Download the compressed archive from the official Apache repository using wget
.
When the download finishes, create a directory called /usr/local/kafka
. This is where we will store everything related to Kafka.
Use the chown
command to change the directory ownership to you. If you plan on running this in production, it is worth creating a separate kafka
user to launch the binaries and scripts from for added security.
Lastly, untar the compressed archive into the /usr/local/kafka
directory.
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
sudo mkdir /usr/local/kafka
sudo chown -R $(whoami) /usr/local/kafka
tar \
-xzf kafka_2.13-3.5.1.tgz \
-C /usr/local/kafka \
--strip-components=1
In KIP-500, the KRaft consensus algorithm was added, removing the need to run a separate Zookeeper process alongside Kafka. And because I am all about simplicity, I will be using KRaft! To do so, you need to generate a random UUID and format the storage.
Then, all that is left for us to do is start Kafka using the kafka-server-start.sh
script. Use the -daemon
flag to run this process in the background. If you are having issues later down the line, I recommend omitting the -daemon
flag and using tmux
to temporarily run Kafka in a separate terminal.
The second argument in the “start” command is the configuration file, which, in this case, is the default configuration file used when running Kafka with KRaft. This configuration will suffice for this tutorial, but I implore you to take a look at the file to see what you can change within Kafka.
export uuid=$(/usr/local/kafka/bin/kafka-storage.sh random-uuid)
/usr/local/kafka/bin/kafka-storage.sh format \
-t $uuid \
-c /usr/local/kafka/config/kraft/server.properties
/usr/local/kafka/bin/kafka-server-start.sh \
-daemon \
/usr/local/kafka/config/kraft/server.properties
Once your Kafka server has started, use the kafka-topics.sh
script to create two topics:
mytopic
topic will be used to publish JSON messages onto that will be consumed by our sink connector and inserted into the Timescale database.deadletter
topic will be used as a dead letter queue (duh). A dead letter queue is used to store messages that your Kafka Connect workers couldn’t process. The main benefit of this is that you can see what messages are causing errors, which will allow you to detect, understand, and solve problems faster!/usr/local/kafka/bin/kafka-topics.sh \
--create \
--topic mytopic \
--bootstrap-server localhost:9092 \
--partitions 10
/usr/local/kafka/bin/kafka-topics.sh \
--create \
--topic deadletter \
--bootstrap-server localhost:9092 \
--partitions 10
Before we start setting up our Kafka connect workers, we need to create a Timescale database to store our messages in.
Without going too in-depth, create a service inside the Timescale Console.
Check out our Docs if you’d like to know more about creating a Timescale database.
After creating your service, copy the psql
connection command and connect to it.
Once connected to your database, execute the following SQL statements:
CREATE TABLE accounts (created_at TIMESTAMPTZ DEFAULT NOW(),
name TEXT,
city TEXT);
SELECT create_hypertable('accounts', 'created_at');
Now that your database is ready to accept messages, let’s set up the Kafka Connect server, plugins, drivers, and connectors!
Start off by making the /usr/local/kafka/plugins
paths.
Add a single configuration line to the config/connect-distributed.properties
file. This line simply points the Kafka Connect process to the directory of plugins it can use.
mkdir /usr/local/kafka/plugins
/usr/local/kafka/plugins/camel-postgresql-sink-kafka-connector
echo "plugin.path=/usr/local/kafka/plugins" >> /usr/local/kafka/config/connect-distributed.properties
Next up, download the camel PostgreSQL sink connector and extract it in the plugins directory you created earlier.
wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.18.2/camel-postgresql-sink-kafka-connector-3.18.2-package.tar.gz
tar \
-xzf camel-postgresql-sink-kafka-connector-3.18.2-package.tar.gz \
-C /usr/local/kafka/plugins/camel-postgresql-sink-kafka-connector \
--strip-components=1
Then, download the PostgreSQL driver and move it to the plugins directory.
wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jarmv postgresql-42.6.0.jar
/usr/local/kafka/plugins/camel-postgresql-sink-kafka-connector
When that’s done, start the Kafka Connect process. Much like the Kafka process, if things aren’t going to plan, omit the -daemon
flag and start the process in a tmux window to see the output.
/usr/local/kafka/bin/connect-distributed.sh
-daemon \
/usr/local/kafka/config/connect-distributed.properties
Now that Kafka, Kafka Connect, and your Timescale database are all running, you’re ready to create your first Timescale sink.
The way you add a Kafka connector is by sending a POST request to the Kafka Connect REST API on port 8083. The body of this POST request contains your connector configuration in JSON format.
Some interesting fields in this configuration are:
connector.class
: describes the connector class, which can be found in the plugin directory we configured earlier.errors.tolerance
: a connector fails when encountering an erroneous message by default. In many cases, this is a very desirable behavior as you do not want the connector to propagate or process these messages.errors.deadletterqueue
: describes the topic on which the erroneous messages are published.tasks.max
: describes the maximum number of workers. If set to 10, Kafka Connect will, at most, run 10 concurrent PostgreSQL sinks. Keep in mind that there can only be one worker per topic partition. Setting tasks.max
to 100, while you only have 20 partitions on that topic, will be no faster than setting tasks.max
to 20.topics
: contains a list of topics the connector will consume from. In this case, the connector will only consume messages from the mytopic
topic.Don’t forget to modify the camel.kamelet.postgresql-sink
fields with your database password, serverName (URL), and serverPort (port).
echo '{
"name": "timescale-sink",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "deadletter",
"tasks.max": 10,
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"topics": "mytopic",
"camel.kamelet.postgresql-sink.databaseName": "tsdb",
"camel.kamelet.postgresql-sink.username": "tsdbadmin",
"camel.kamelet.postgresql-sink.password": "password",
"camel.kamelet.postgresqlsink.serverName": "service_id.project_id.tsdb.cloud.timescale.com",
"camel.kamelet.postgresql-sink.serverPort": "5432",
"camel.kamelet.postgresql-sink.query": "INSERT INTO accounts (name,city) VALUES (:#name,:#city)"
}
}' > timescale-sink.properties
To make working with this REST API a little easier, I prefer to write the JSON configuration to a file, use the cat
command, and pipe it into the curl command. This is what that command looks like:
cat timescale-sink.properties | curl -X POST -d @- http://localhost:8083/connectors -H "Content-Type: application/json"
To validate whether or not our connector was added successfully, we can query the /connectors
route in a GET request, and our sink should show up.
curl -X GET http://localhost:8083/connectors
#["timescale-sink"]
To validate our installation, we want to send some messages onto the `mytopic` topic. We can do this very easily using the kafkacat
or kcat
utility.
sudo apt install kafkacat
The kafkacat
flags we will be using are:
Then, we pipe a JSON string containing a name and city into kafkacat.
echo '{"name":"Mathis","city":"Salt Lake City"}' | kafkacat -P -b localhost:9092 -t mytopic
echo '{"name":"Oliver","city":"Moab"}' | kafkacat -P -b localhost:9092 -t mytopic
echo '{"name":"Lauren","city":"Park City"}' | kafkacat -P -b localhost:9092 -t mytopic
If we then query our database for all rows in the accounts table, we can see that all three messages appear.
tsdb=> SELECT * FROM accounts;
created_at | name | city
------------------------------+--------+----------------
2023-08-23 18:04:51.101906+00 | Mathis | Salt Lake City
2023-08-23 18:04:54.856341+00 | Oliver | Moab
2023-08-23 18:04:58.217255+00 | Lauren | Park City
You might also be wondering how the created_at
column was populated. If you look back at our table creation statement, we added the created_at
column of type TIMESTAMPTZ
with the default value set to NOW()
. This means that the timestamp is added at the beginning of the transaction.
While this might be an easy way to add timestamps to messages or events, it is highly recommended to do this at the origin before the message is sent as publishing on a Kafka topic, waiting for a Kafka Connector to consume the message and insert it in the database can take quite a while.
Depending on the settings of your Kafka Connector (batch.size
and linger.ms
), it may wait several seconds before consuming and inserting messages in an attempt to batch messages together.
Congratulations! You’ve successfully built a data pipeline that streams data from a Kafka topic (or shell command that publishes data onto a topic) into a Timescale database.
You can read how other Timescale users are integrating Kafka into their data pipelines:
Or try it out for yourself: sign up for a free trial of Timescale for 30 days, no credit card required.