Apr 11, 2024
Posted by
Samuel Van Ackere
This guest blog post was originally published in the author’s blog. Thank you so much to Samuel for sending it to us for publication in the Timescale Blog.
Linked Data Event Streams (LDES) represent and share fast and slow-moving data on the Web using the Resource Description Framework (RDF), which allows data to be linked and connected to other data sources using unique identifiers (URIs).
A Linked Data Event Stream is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers, or financial data).
LDES streams provide a flexible and interoperable way of describing and exchanging events as Linked Data, enabling different systems and applications to easily consume and act on data streams in a consistent and standardized way.
This article shows how to effortlessly insert sensor data in the form of an LDES into a Timescale database.
Timescale is an open-source database for storing and querying large amounts of time-series data. It extends PostgreSQL with time-series support, offering features like fast ingestion and querying of large data, flexible data granularity, and long-term data storage.
Timescale is a powerful and efficient database system well-suited for storing and querying time-series data at scale. It is widely used in various applications, including IoT, finance, and telemetry.
First, you need to configure a data flow to ingest a Linked Data Event Stream into PostgreSQL. You can do this in an Apache NiFi environment.
To persist LDES in a database, we use the Apache NiFi platform. Apache NiFi is an open-source data processing and integration platform designed to automate data flows between systems. It provides a Web-based user interface for creating, managing, and monitoring data flows and a range of pre-built connectors and processors for performing data processing tasks.
To consume an LDES stream, an LDES client processor is needed in the Apache NiFi flow. An LDES client is a component capable of consuming, processing, and analyzing the events in the stream.
The output of the LDES Client (working with the following LDES):
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B727a9b2b6a6311cc83b677d439f4cf68 .
_:B5edf92f59913b8d14f45818d5bde1d51 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:B6003104aae33209ee0e6a26e14ba38cb <https://schema.org/value> "1.152E1"^^<http://www.w3.org/2001/XMLSchema#double> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:Bdd20913615f9b4c0169c7770bdc770d4 .
_:Bec20a7e293639ee5a985d621d7b2d6d5 <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
_:Bdd20913615f9b4c0169c7770bdc770d4 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B6003104aae33209ee0e6a26e14ba38cb .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://www.w3.org/TR/vocab-ssn-ext/#sosa:ObservationCollection> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B5edf92f59913b8d14f45818d5bde1d51 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:B3105a27f0059867877fb28653f5a6abc .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://def.isotc211.org/iso19156/2011/SamplingFeature#SF_SamplingFeatureCollection.member> _:Bec20a7e293639ee5a985d621d7b2d6d5 .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://purl.org/dc/terms/isVersionOf> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/prov#generatedAtTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> <http://www.w3.org/ns/sosa/hasFeatureOfInterest> "spt-00029-97" .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://def.isotc211.org/iso19156/2011/Measurement#OM_Measurement> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.observedProperty> <https://data.vmm.be/concept/observatieparameter/hydrostatische-druk> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.phenomenonTime> "2022-11-09T20:30:00.000Z"^^<http://www.w3.org/2001/XMLSchema#datetime> .
_:B3105a27f0059867877fb28653f5a6abc <http://def.isotc211.org/iso19156/2011/Observation#OM_Observation.result> _:B2598b988faa2635d5dd520f59f376b8e .
_:B3105a27f0059867877fb28653f5a6abc <http://www.w3.org/ns/sosa/madeBySensor> <urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j> .
<https://iow.smartdataspace.beta-vlaanderen.be/water-quality-observations> <https://w3id.org/tree#member> <urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn/2022-11-09T20:30:00.000Z> .
_:B727a9b2b6a6311cc83b677d439f4cf68 <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B9776c69148d2ef600aed62aa2c85bd40 .
_:B2598b988faa2635d5dd520f59f376b8e <http://def.isotc211.org/iso19103/2005/UnitsOfMeasure#Measure.value> _:B436b5950c7725d6d9a38aaf6ca88b00f .
_:B9776c69148d2ef600aed62aa2c85bd40 <https://schema.org/value> "1120"^^<http://www.w3.org/2001/XMLSchema#integer> .
_:B436b5950c7725d6d9a38aaf6ca88b00f <https://schema.org/value> "673"^^<http://www.w3.org/2001/XMLSchema#integer> .
Next in the Apache NiFi flow is a version materialization component. "Version materialization" refers to the process of removing version information from an LDES member and reverting it back to a "state" object, which only reflects the current state of the LDES member without historical information about changes.
All information about previous changes is removed when you perform version materialization on an LDES member. Version materialization is needed since a consumer doesn’t want to store these versions in a database.
The data is first converted to JSON-LD to easily convert it to a tabular structure. This JSON-LD file looks like this:
{
"@id" : "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-g9kEXxUeP28TNc6wJvFsau",
"@type" : "Observatieverzameling",
"Bemonsteringsobjectverzameling.lid" : [ {
"@id" : "_:b8",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/temperatuur",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b6",
"Maat.maat" : {
"@id" : "_:b1",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#double",
"@value" : "1.153E1"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b3",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/waterkwaliteitparameter/conductiviteit",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b4",
"Maat.maat" : {
"@id" : "_:b5",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "920"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
}, {
"@id" : "_:b7",
"@type" : "Meting",
"Observatie.geobserveerdKenmerk" : "https://data.vmm.be/concept/observatieparameter/hydrostatische-druk",
"Observatie:.phenomenonTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#datetime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"Observatie.resultaat" : {
"@id" : "_:b2",
"Maat.maat" : {
"@id" : "_:b0",
"https://schema.org/value" : {
"@type" : "http://www.w3.org/2001/XMLSchema#integer",
"@value" : "11133"
}
}
},
"Observatie.uitgevoerdMetSensor" : "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-8JgF3vxoYKoXzwWGUiX3Yc"
} ],
"http://www.w3.org/ns/prov#generatedAtTime" : {
"@type" : "http://www.w3.org/2001/XMLSchema#dateTime",
"@value" : "2022-11-09T19:30:00.000Z"
},
"http://www.w3.org/ns/sosa/hasFeatureOfInterest" : "spt-00027-06"
}
A JOLT transformation filters the relevant parameters and puts them in a structured JSON file (see output underneath).
{
"id": "urn:ngsi-v2:cot-imec-be:WaterQualityObserved:dwg-iow-9NQCQNb4dJZ5J8kTACzdVn",
"temperature_value": "1.152E1",
"temperature_date": "2022-11-09T20:30:00.000Z",
"temperature_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"conductivity_value": "1120",
"conductivity_date": "2022-11-09T20:30:00.000Z",
"conductivity_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j",
"hydro_pressure_value": "673",
"hydro_pressure_date": "2022-11-09T20:30:00.000Z",
"hydro_pressure_sensor": "urn:ngsi-v2:cot-imec-be:Device:dwg-iow-csHTUVdGuPYK89L34yi88j"
}
After this transformation, the data can be written to a PostgreSQL or Timescale database. The analysis you can do with data in Timescale includes but is not limited to time-series visualization, anomaly detection, forecasting, and aggregation of time-series data over various time intervals.
To replicate the data flow in this article, please go to the LDES2TimescaleDB GitHub repository. It describes how to set up the dockerized Timescale and Apache NiFi, after which the data flow can be started using the supplied Apache NiFi setup file.
A Linked Data Event Stream is the core API of fast and slow-moving data. It is a data event stream of a group of immutable objects described as machine-readable RDF (such as sensor observations, address registers, or financial data).
To write a LDES to a PostgreSQL or Timescale database, a data conversion flow is configured in Apache NiFi. Using this article as a guideline, you should be in good shape to write Linked Data Event Streams to a Timescale database.
Contributors to this article are ddvlanck (Dwight Van Lancker) (github.com), sandervd (Sander Van Dooren) (github.com) at Flanders Smart Data Space (Digital Flanders, Belgium). In a rapidly changing society, governments need to be more agile and resilient than ever. Digital Flanders realizes and supervises digital transformation projects for Flemish and local governments.