System Overview
- Ingestion rate: ~3 million records per hour
- Main table structure:
CREATE TABLE phasor_values (
"timestamp" TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
"mRID" UUID NOT NULL DEFAULT uuid_generate_v4(),
producer_id text,
measurement_id TEXT NOT NULL,
magnitude_sensor_accuracy real,
angle_sensor_accuracy real,
magnitude real NOT NULL,
angle real NOT NULL,
estimated BOOLEAN NOT NULL DEFAULT FALSE
);
- Current continuous aggregation:
CREATE MATERIALIZED VIEW phasor_values_medium
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 second', timestamp) AS timestamp_m,
first("mRID", timestamp) AS "mRID",
measurement_id,
first(producer_id, timestamp) AS producer_id,
first(estimated, timestamp) AS estimated,
first(angle, timestamp) AS angle,
first(magnitude, timestamp) AS magnitude,
first(magnitude_sensor_accuracy, timestamp) AS magnitude_sensor_accuracy,
first(angle_sensor_accuracy, timestamp) AS angle_sensor_accuracy
FROM phasor_values
GROUP BY timestamp_m, measurement_id
WITH NO DATA;
Current Challenges
We are able to run real-time continuous aggregate refresh using a refresh policy like:
SELECT add_continuous_aggregate_policy('analog_values_medium',
start_offset => (interval '1 hour' + interval '50 minutes'),
end_offset => interval '1 hour',
schedule_interval => interval '5 minutes');
Our main problem now is migrating existing data from the old system. The plan is to copy the old data into phasor_values
and then manually refresh the continuous aggregate, given that we can’t use copy from stdin in the cagg. We are fetching data from the old system with chunks of one hour, then copying them to the phasor_values
and manually refreshing the continuous aggregate for each imported chunk.
- Importing one day of data with no manual refresh takes around 8 minutes.
- With manual refresh, it takes around 70 minutes.
We also tried to drop some columns from the continuous aggregate like this:
CREATE MATERIALIZED VIEW phasor_values_medium
WITH (timescaledb.continuous) AS
SELECT
time_bucket ('1 second', timestamp) AS timestamp_m,
measurement_id,
first (angle, timestamp) AS angle,
first (magnitude, timestamp) AS magnitude,
first (magnitude_sensor_accuracy, timestamp) AS magnitude_sensor_accuracy,
first (angle_sensor_accuracy, timestamp) AS angle_sensor_accuracy
FROM
phasor_values
GROUP BY
timestamp_m,
measurement_id
WITH NO DATA;
With this modification, the import process takes around 40 minutes.
We do not have compression enabled.
Questions
-
How can we optimize our continuous aggregation for better performance, especially during data migrations?
-
Should we consider alternative approaches, such as eliminating continuous aggregates and performing downsampling in our application layer?
-
Are there any schema optimizations you’d recommend for our use case?
-
Is there a more efficient way to handle the migration of historical data while maintaining the benefits of continuous aggregates?