Build a Data Pipeline With Apache Kafka and TimescaleDB
Introduction
In this post, we will cover how to set up TimescaleDB (hosted on Managed Service for TimescaleDB) as a Kafka consumer (via the JDBC sink connector) with the Confluent Platform for the streaming data. We will start with a pre-created topic and a schema that has already been defined in the Confluent platform, and we will cover ways to create the schema.
Whether you are generating system infrastructure readings (for DevOps / IT monitoring) or capturing edge devices measurements (for IoT), TimescaleDB can be used as a centralized repository for time-series data.
To ingest this data into TimescaleDB, our users often create a data pipeline that includes Apache Kafka. For those who are not familiar, Kafka is a distributed streaming platform that allows systems to publish data that can be read by a number of different systems in a very resilient and scalable way. A popular way users leverage Kafka is via Confluent which is a comprehensive platform for event streaming.
Why Apache Kafka, Confluent and Timescale DB?
Kafka is arguably the most popular open-source, reliable, and scalable streaming messaging platform. It provides a very easy, yet robust way to share data generated up/down stream. The Confluent platform glues together the bits needed for using Kafka and the connector. And TimescaleDB is an open-source database built for analyzing time-series data with the power and convenience of SQL. Essentially, if you are collecting time-series data and looking for an easy streaming messaging platform to handle the routing of that data to TimescaleDB, you should consider this setup.
Convinced? Ready to get started?
Before you start
Install TimescaleDB
For the purposes of this demonstration, we are going to use TimescaleDB hosted on Managed Service for TimescaleDB. You can sign up for Managed Service for TimescaleDB here for a 30-day free trial.
Install Confluent
Next, you will need to set up Confluent. You can get started with Confluent (if you are not already running the platform) via their quick start guide.
Understand the data pipeline
Now, let's take a high-level view of the setup:
From the diagram above, you can see we are ingesting data into Kafka from upstream data sources (e.g. servers, edge devices). This data will pass through a Kafka topic that is subscribed to via the Kafka Connect JDBC sink connector, which inserts that data into TimescaleDB for storage and processing.
Set up the JDBC sink connector
Since TimescaleDB is built on top PostgreSQL, any tools or extensions that work with PostgreSQL work with TimescaleDB. This means we can use the JDBC sink connector that is readily available to make the Kafka-to-TimescaleDB connection.
Let's start by setting up our TimescaleDB instance as a consumer of a Kafka topic. For this we are going to use the JDBCSinkConnector accessed through the Confluent UI.
Here we are accessing the Kafka Connect functions in Confluent to setup the JDBC sink connector, and subscribe our TimescaleDB instance to a topic. The next set of configuration steps are form driven in the Confluent platform. The platform provides tooltips for each setting, in the next few paragraphs we will cover the most relevant items.
The first few sections of the setup form are straightforward. First we are going to associate the connection with a topic or set of topics (you will want to have a topic set up at this point).
The next field is preselected for you and it asks how you want Kafka to connect to your data (in this case JDBC sink connector is pre-selected). Next, we want to give our connection a name.
The following section of the setup process covers some common settings. For the purposes of this demo, we were able to accept the defaults, however your setup might require some configuration in this section. Let's quickly go over the settings:
- Tasks max: This will refer to the max number parallel tasks the connector will attempt to perform.
- Key converter class: This will be setup used to convert between the Kafka Connect and the serialized form that is written to Kafka (Message Keys). Settings here would be driven by how upstream data is brought in. Popular options are JSON and Avro.
- Topics regex: This can be filled out if you want to associate this connection with a series of topics. (Using the regex saves you time by listing them all and will ensure future topics that match the regex are subscribed to).
- Value converter class: Similar to key converter class, only this is the message value converter. Like key converter class it will be driven by how upstream data is brought into the topic.
- Header converter class: Similar to value and key converter class this instance is dealing with the message header.
- Reload action: The action that connect should take when changes in external configuration providers result in changes in the connector configuration.
- Transforms: Here you can select any transformations you may have defined (a list of supported transformation types is here).
The last setting that’s important to note is error handling. Most are self explanatory, but lets cover two of the more important items.
- Retry timeout for errors: This is by default zero which means no operation will be retried on error. This should be set in accordance with your environment's error handling policy.
- Error tolerance: This will happen when there is a message error. The default is to stop the connector processing, and setting this to “All” will result in skipping the problematic record and continuing.
Configure connection parameters
After you determine your settings, it’s time to connect to TimescaleDB:
Connection:
- JDBC URL: This is straightforward and should look something like this:
jdbc:postgresql://<hostname>:<port>/<database>
- JDBC User: User for connection.
- JDBC Password: Password for connection.
- Database Dialect: This will be inferred by the JDBC connection string, however this can serve as an override.
Writes:
- Insert Mode: How you want to see messages processed into the database? In the case of time series data, we are going to define INSERT as the default, as this will be our primary operation (vs. update).
- Batch Size: How many records do you want to batch together for insertion into the destination table?
You can obtain your TimescaleDB instance connection information from the Managed Service for TimescaleDB portal.
Configure data mapping
The next set of fields tell the connector about how you would like the data mapped once it lands in TimescaleDB. Let’s have a look at the fields:
- Table Name Format: This allows you to define table name convention (will also be used to check if table already exists). For example, if you wanted to create a naming convention that called the tables
kafka_<topic_name>
you could define this by enteringkafka_$(topic)
- Fields Whitelist: List of comma separated field names to be used. If left empty, it will use all fields.
- DB Time Zone: Name the JDBC timezone uses for timestamp related data.
The JDBC sink connector will auto create the table that it needs, and will also auto evolve the table if the schema changes.
- Auto-Create: Will auto create the table if it does not exist.
- Auto-Evolve: Will alter the table automatically if the schema should change.
We recommend setting Auto-Create to false. If you would like to save some keystrokes, you can allow the Auto-Create to create the base table in your test environment, stop the data flow, and truncate the table and convert the table to a hypertable. Once that is done, you can turn the data flow back on. If you are going to create the table yourself based on the message schema, be sure to follow the naming convention (by default it will look to name the table based on the topic name) so the connector finds the table when the sink connector is turned on.
We recommend setting Auto-Evolve to false. The schema should be considered end-to-end and a change made upstream that is pushed downstream may create performance issues.
Set up TimescaleDB
Next, we need to configure the database to ingest the data from the JDBC sink connector (from Kafka). We will now need to create the table that will receive the data from the Kafka topic by way of the JDBC sink connector. To do this you have two options: leverage auto create or manually create the table.
Option 1: Leverage Auto Create
As part of the JDBC sink connector process, you have the ability to let the connector create the table. While there is not an option to allow this to create a hypertable, you can turn on the connector and allow it to generate the table structure for you. Once this is done, turn the connector back off, truncate your table, and follow the instructions here to convert the table to a hypertable.
Once this is completed, you will be ready to turn the connector back on and allow the data to flow from the JDBC connector into your newly created hypertable.
Option 2: Manually create the table
The other option is to manually create the table based on your knowledge of the topic schema. Here you will want to make sure you name the table based on the topic name (accounting for any changes to the default you may have set while setting up the JDBC sink connector). Once you have created the table, you can follow the same instructions here to convert this to a hypertable.
After you have created the hypertable, and assuming you have your JDBC sink connector working and associated with a working topic, you can now turn on the JDBC sink connector and you should start to see data flow from the Kafka topic through the connector and into the TimescaleDB hypertable.
Another use case: data generation
So far, we’ve assumed that you are preparing to ingest data from an existing topic, or a topic that you will make available as you bring TimescaleDB online. If for some reason you are simply looking to do a PoC or other test that would bear out the data pipeline, the Confluent platform also includes a data generation connector. That connector supports a number of predefined schemas that are supported by the random data generator for use in generating random data that is published to a Kafka topic, then pushed to the JDBC connector, and lands in the TimescaleDB hypertable you have configured.
This is a great option if you just want to run a small proof of concept, or would like to do some stress testing with the entire stack that you have set up. Confluent has a number of quick start guides that will help you generate data quickly.
Next steps
We hope this post helped you get TimescaleDB up and running with Kafka and Confluent. As a reminder, if you are looking to connect a technology with TimescaleDB that has an existing connection to Postgres, it will work!
We encourage you to visit our Managed Service for TimescaleDB documentation page and sign up for a free trial. If you prefer to deploy on-premises, you can explore the available installation options here.