Jan 07, 2025
Posted by
Semab Tariq
The Internet of Things (IoT) is transforming industries by connecting devices and enabling real-time data sharing. For applications that do predictive maintenance, enable smart cities, and implement industrial automation, managing IoT data effectively is key to ensuring smooth operations and timely decision-making.
While many organizations opt to manage the deluge of sensor data and power their real-time analytics using different databases, we’ve always favored PostgreSQL. With the help of extensions like TimescaleDB, which is PostgreSQL under the hood and adds automatic data partitioning, always up-to-date materialized views, and a powerful hybrid-row columnar storage engine to PostgreSQL, this relational database becomes a robust IoT database.
With these additions, PostgreSQL keeps your relational and time-series data in one place as it endlessly grows, simplifies your operations, and provides speed and scale for real-time analytics.
To demonstrate this, in this blog, we’ll explore the following:
Before we dive into our tutorial, let's take a moment to explain some key concepts.
The Internet of Things (IoT) is a network of everyday objects—like smartwatches, refrigerators, cars, and industrial sensors—that are connected to the Internet and can collect, share, and act on data. The IoT data collected by devices like fitness trackers and smartwatches presents a number of challenges. Its high volume, velocity, and variety require scalable storage and real-time processing capabilities. Additionally, ensuring data reliability, security, and integration across diverse devices and systems adds significant complexity. To deal with such complexity, we’ll add the following tools with our supercharged PostgreSQL (TimescaleDB is PostgreSQL under the hood) to build an IoT pipeline and enable real-time monitoring.
Apache Kafka is an open-source distributed event streaming platform for building real-time data pipelines and streaming applications. It is designed to handle large volumes of data in real time and efficiently transmit it between systems. It allows you to publish, subscribe, store, and process streams of records (events) in a fault-tolerant and scalable manner. Kafka is widely used in industries for handling high-throughput data, such as log aggregation, real-time analytics, and stream processing.
Think of Kafka as a messaging app, where messages or data are like text messages, and Kafka is the platform that manages the sending and receiving of those messages.
Grafana is an open-source platform used for monitoring and visualizing data. It helps you understand and analyze your data by turning it into interactive and customizable dashboards. Grafana connects to various data sources, like databases, cloud services, and application logs, and allows you to create graphs, charts, and alerts based on the data you want to monitor.
With Grafana, you can track the health of your systems, view real-time metrics, and easily spot issues. It’s widely used for monitoring infrastructure, application performance, and business metrics and is particularly popular in DevOps and IT operations. Grafana also allows you to set up alerts so you can be notified when certain conditions are met, making it an essential tool for ensuring everything is running smoothly.
The goal here is to stream data into a Kafka topic, sending a continuous flow of records (or events). This data can represent various types of information, such as sensor readings, logs, or transaction data, that are generated in real time. While the data is being streamed into the Kafka topic, it’s simultaneously ingested into PostgreSQL via the Timescale database through Kafka Connect.
Kafka Connect is a framework designed to integrate Kafka with different data sources and sinks such as databases or analytics platforms. It streamlines the process of sending data into TimescaleDB without the need for custom code. Kafka Connect automatically pulls data from Kafka topics and writes it to TimescaleDB, ensuring the data is stored and ready for further processing or analysis.
For speed and convenience, we used TimescaleDB in our mature PostgreSQL cloud platform, Timescale Cloud (it’s free for 30 days, no credit card required), but you can always use the open-source extension. Here’s how to install it. To integrate Kafka with Timescale Cloud, you can check out another excellent blog post that provides detailed instructions here.
I’ve used the same example to integrate the Timescale Cloud instance with Kafka. Below, I’ll outline the changes I made during the integration process.
Obtain the latest Kafka source:
sudo mkdir /usr/local/kafka
sudo chown -R $(whoami) /usr/local/kafka
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz && tar -xzf kafka_2.13-3.9.0.tgz -C /usr/local/kafka --strip-components=1
And continue with the script used here.
Replace the following line in timescale-sink.properties
file:
"camel.kamelet.postgresql-sink.query":"INSERT INTO accounts (name,city) VALUES (:#name,:#city)",
With this line:
"camel.kamelet.postgresql-sink.query":"INSERT INTO metrics (ts, sensor_id, value) VALUES (CAST(:#ts AS TIMESTAMPTZ), :#sensor_id, :#value)",
This line tells the system how to save data from a sensor into the metrics table. For every new record, it saves the following:
The :#
syntax is used to represent parameters in the query. It's a way to tell the system that the values for these placeholders will be provided dynamically, typically by the system or application executing the query.
Also, make sure to insert valid credentials in the following properties:
- "camel.kamelet.postgresql-sink.databaseName":"tsdb",
- "camel.kamelet.postgresql-sink.password":"password",
- "camel.kamelet.postgresql-sink.serverName":"service_id.project_id.tsdb.cloud.timescale.com",
- "camel.kamelet.postgresql-sink.serverPort":"5432",
- "camel.kamelet.postgresql-sink.username":"tsdbadmin"
In this blog, I’ll be using the dataset provided by Timescale, which can be found here.
Inside this dataset, we have a table named metrics. This table is used to store time-series data commonly used in IoT (Internet of Things) or monitoring systems. Here's a breakdown of its structure and the kind of data it holds:
tsdb=> \d metrics
Table "public.metrics"
Column | Type | Collation | Nullable | Default
-----------+--------------------------+-----------+----------+---------
ts | timestamp with time zone | | |
sensor_id | integer | | |
value | numeric | | |
ts
(timestamp with time zone): represents the exact time when the data was recorded.sensor_id
(integer): identifies the sensor that generated the data.value
(numeric): represents the reading or measurement taken by the sensor.This record indicates that at the specified timestamp, Sensor 21 recorded a measurement of 0.68.
tsdb=> select * from metrics limit 1;
ts | sensor_id | value
-------------------------------+-----------+-------
2023-05-31 21:48:41.234187+00 | 21 | 0.68
(1 row)
Download and decompress the dataset.
wget https://assets.timescale.com/docs/downloads/metrics.csv.gzgzip -d metrics.csv.gz
Convert the dataset to JSON format so we can easily stream this data to Kafka topic.
echo "[" > metrics.jsonawk -F',' '{print "{\"ts\": \""$1"\", \"sensor_id\": "$2", \"value\": "$3"},"}' metrics.csv | sed '$ s/,$//' >> metrics.jsonecho "]" >> metrics.json
Once the dataset is ready, stream the data to Kafka topic.
To stream data to the Kafka topic, we are going to use a utility named kcat
, which was formerly known as kafkacat
.
The kcat
utility is a flexible tool that can function as either a producer (sending data) or a consumer (receiving data), depending on its configuration. It can easily switch between these roles.
In our example, by using the -p
switch, we configure kcat
as a producer to send data to a Kafka topic specified with the -t
switch. Once the data starts appearing in the Kafka topic, tools like Kafka Connect can be used to read the data and stream it into the Timescale database for permanent storage.
The -b
switch is used to specify the Kafka broker address. A Kafka broker is like a server that stores and manages messages, which are saved in topic partitions. These partitions act as separate storage areas where the messages are kept in the order they were sent.
When producers, like kcat
in our example, want to send data, they send it to the Kafka broker. The broker stores the data across different partitions.
Then consumers, such as Kafka Connect
, connect to the Kafka broker and fetch the data from the topics they're interested in. Even in case of system failures, Kafka brokers ensure that the data remains accessible and available, keeping the system reliable.
kcat -P -b localhost:9092 -t mytopic -l metrics.json
To integrate Timescale with Grafana, I recommend reading the following blog post, which provides a step-by-step guide for the integration process here.
After completing the Grafana integration, the next step is to create your first Grafana dashboard. To do this, follow these steps:
Next, choose your PostgreSQL data source.
At this stage, your first dashboard is nearly complete. Simply click on the "Back to Dashboard" option in the top right corner to return to it, where you can begin creating custom variables and queries for data visualization.
A new dashboard is created successfully:
This is how our initial data looks in Grafana:
Custom variables in Grafana are user-defined placeholders that allow dynamic data filtering and visualization based on user inputs, such as dropdown selections. They enhance dashboards by enabling flexible, reusable queries and tailored insights without modifying the underlying queries.
In an IoT use case, custom variables in Grafana can be utilized to monitor specific device locations. For example, a variable can filter temperature data by selecting devices installed in different rooms or floors of a building. This filtering allows the dashboard to display real-time temperature trends for the chosen locations, enabling targeted monitoring and analysis.
To create custom variables, navigate to your dashboard and select the Settings menu from the top right corner.
From Settings, click on the Variables tab and click on Add variable.
On the next screen, we need to add all the required information to create a new variable:
You can leave the rest of the options as the default values.
At the bottom, we can see Grafana provides a preview of the data it successfully fetched from the database table, which will then be used to populate the dropdown.
After creating the custom variables, the next step is setting up dashboard monitoring queries for real-time data visualization.
After creating your custom variables, navigate to the dashboard, click on the three dots in the panel, and select "Edit."
On the next screen, select the "Code" option next to "Run Query." We will use Code mode instead of the "Query Builder" mode, as this allows you to write your own query to generate the visuals.
Retrieving the range of values for a specific sensor over a particular day is useful for detecting anomalies (such as unusually high or low readings), evaluating the sensor's performance within expected limits, and ensuring the sensor is functioning correctly.
SELECT
MIN(value) AS min_value,
MAX(value) AS max_value
FROM metrics
WHERE
ts BETWEEN $__timeFrom() AND $__timeTo()
AND sensor_id = $sensor_id;
$__timeFrom()
, $__timeTo()
, and $sensor_id
variables to integrate seamlessly with Grafana dashboards.The diagram shows a minimum value of 0.265 and a maximum value of 0.999 for sensor ID 23, within the date range of 2023-05-29 to 2023-05-31.
This is an ideal use case for downsampling high-frequency data into fixed time intervals, streamlining trend analysis, and data visualization. It ensures uniform time buckets, prevents graphs from becoming cluttered with excessive data points, and preserves key readings.
SELECT
series.time AS time,
m.value AS value
FROM
generate_series(
$__timeFrom()::timestamp,
$__timeTo()::timestamp,
'10 seconds'::interval
) AS series(time)
LEFT JOIN LATERAL (
SELECT value
FROM metrics
WHERE sensor_id = $sensor_id
AND ts >= series.time
AND ts < series.time + interval '10 seconds'
ORDER BY ts
LIMIT 1
) m ON true
ORDER BY series.time;
$__timeFrom() to $__timeTo()
) in 10-second steps.metrics
table for the selected sensor_id
, ensuring that the data's timestamp (ts) falls within that specific interval. The closest match is selected for each interval.The diagram above displays readings from sensor ID 4, from 2023-05-29 at 06:07:48 to 2023-05-29 at 06:40:00. The graph shows data in five-minute intervals, where we observe consistent readings from 06:10 to 06:15, followed by a spike from 06:15 to 06:20. The readings stabilize again, with two more spikes occurring at 06:30 and 06:34, before returning to normal. These spikes suggest the sensor experienced three distinct unwanted events during this period.
Getting average data or value for a sensor helps to understand its overall performance or behavior over the course of the day or months. It is particularly useful for monitoring sensors that are supposed to maintain a certain average range, allowing you to assess the general trend of the data.
SELECT
sensor_id,
AVG(value) AS avg_value
FROM metrics
WHERE ts >= $__timeFrom()
AND ts <= $__timeTo()
AND sensor_id = $sensor_id
GROUP BY sensor_id;
value
column for the give sensor_id
within the time range defined by Grafana's $__timeFrom()
and $__timeTo()
variables.sensor_id
and retrieves the average reading within the selected time range for that specific sensor.The diagram above displays the average reading for sensor ID 23 within the selected timeframe, from 2023-05-29 06:10:00 to 2023-05-29 06:40:00.
By comparing the sensor readings side by side, you can identify correlations, trends, or issues that may exist between the two sensors' data, which can be crucial for diagnosing problems or ensuring data consistency across devices.
SELECT
a.ts AS time,
a.value AS sensor_a_value,
b.value AS sensor_b_value
FROM metrics a
JOIN metrics b
ON a.ts = b.ts
WHERE
a.sensor_id = $sensor_a
AND b.sensor_id = $sensor_b
AND a.ts >= $__timeFrom()
AND a.ts <= $__timeTo()
ORDER BY
a.ts;
sensor_a
and sensor_b
. The join allows comparing data from two different sensors in the same timestamp (a.ts = b.ts).sensor_a
and sensor_b
IDs using Grafana variables, limit the data to the specified time range and order the results by timestamp to show the values chronologically.The diagram above compares two sensors, sensor ID 1 and sensor ID 11, within the specified time interval from 2023-05-29 06:00:00 to 2023-05-29 07:30:00. The graph illustrates value variations for both sensors in five-minute intervals. From the data, it is evident that sensor 1 maintains consistent and steady readings, while sensor 11 exhibits a few spikes during the same timeframe. These spikes indicate that sensor 11 may require further attention or investigation.
Here is the final look of the dashboard:
A separate dashboard to compare sensors:
The final design of the dashboard ensures all critical information is available in one place. This centralization means you do not have to go to different sections or tools to find the stats you need. With everything neatly organized, users can easily monitor key metrics, spot trends, and make quick decisions, making this PostgreSQL-based IoT pipeline fueled by Kafka and Grafana an ideal solution for predictive maintenance and alerting. Real-time analytics are easily available to inform timely insights and prompt quick actions.
While common in a number of industries, these use cases present different challenges compared to general-purpose analytics.
Unlike more general analytics use cases, where you can wait for the data and perform batch inserts, real-time analytics require high ingestion speed and the ability to make that data available immediately for querying and analysis.
TimescaleDB excels in both thanks to its hybrid-row columnar storage engine. It can ingest and store data in the most efficient format, enabling you to query it transparently across the rowstore and column store. This transition happens automatically under the hood and without overhead.
In this blog, we have looked at how easily we can connect Kafka and Kafka Connect to stream IoT data into a PostgreSQL-based TimescaleDB instance and power a real-time analytics dashboard. TimescaleDB is great for handling large amounts of IoT data, thanks to its strong time-series features, ensuring it’s both scalable and efficient. We explore more real-time analytics solutions in another article.
Once your pipeline is running smoothly with Timescale and Kafka, the next thing we should focus on is monitoring, so Grafana is a great tool for visualizing your data in real time, helping you keep track of performance and make better, data-driven decisions.
If your use case demands real-time insights, like Trebellar’s, try TimescaleDB. You can self-host it or try our managed PostgreSQL option, Timescale Cloud, for free. This will allow you to focus on your app—not your database.