Hi everybody,
Is there any way to insert historical data into continuous aggregate?
I have two separate problems that such inserting can solve:
- I have per-day summation
create materialized view data_1day ... as select time_bucket('1 day', ts), sum(value)
. But now I also have some historical data. I can push them into separate table data_1day_extra
and select union out of it and data_1day
. But it looks a bit hacky.
- I need to add more columns to continuous aggregate. From another topic (Unified view across more continuous aggregates) I understood that the only way is to recreate the whole aggregate. But I need to keep and extend data from the old one, because raw data is long gone.
Thanks!
Hey @flapenguin , in this case, just create the new one and merge data into another bigger view that can simplify your calls.
You can insert data directly in the caggs as it’s a hypertable. Take a look on metadata offered by timescaledb_information.continuous_aggregates view. You just need to find the name in the catalog. Give a try and just be careful to not refresh the window that you have backfilled.
I can’t figure out how to insert data into underlying hypertable.
It looks like the actual data I want to insert (value
) is stored in some serialized way (agg_2_2 | bytea
). And therefore simple insert
doesn’t work.
toydb=# CREATE MATERIALIZED VIEW data_1day WITH (
timescaledb.continuous,
timescaledb.materialized_only = TRUE,
timescaledb.create_group_indexes = TRUE
) AS
SELECT
time_bucket('1 day'::INTERVAL, ts) AS ts,
SUM(value)::BIGINT AS value
FROM data_raw
GROUP BY 1
WITH NO DATA;
CREATE MATERIALIZED VIEW
toydb=# SELECT format('%I.%I', materialization_hypertable_schema, materialization_hypertable_name) AS materialization_hypertable
FROM timescaledb_information.continuous_aggregates
WHERE view_name = 'data_1day';
materialization_hypertable
--------------------------------------------------
_timescaledb_internal._materialized_hypertable_8
(1 row)
toydb=# insert into _timescaledb_internal._materialized_hypertable_8 (ts, value) values (now(), 42);
ERROR: column "value" of relation "_materialized_hypertable_8" does not exist
LINE 1: ...escaledb_internal._materialized_hypertable_8 (ts, value) val...
toydb=# \d+ data_1day
View "public.data_1day"
Column | Type | Collation | Nullable | Default | Storage | Description
--------+--------------------------+-----------+----------+---------+---------+-------------
ts | timestamp with time zone | | | | plain |
value | bigint | | | | plain |
View definition:
SELECT _materialized_hypertable_8.ts,
_timescaledb_internal.finalize_agg('pg_catalog.sum(bigint)'::text, NULL::name, NULL::name, '{{pg_catalog,int8}}'::name[], _materialized_hypertable_8.agg_2_2, NULL::numeric)::bigint AS value
FROM _timescaledb_internal._materialized_hypertable_8
GROUP BY _materialized_hypertable_8.ts;
toydb=# \d+ _timescaledb_internal._materialized_hypertable_8
Table "_timescaledb_internal._materialized_hypertable_8"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+--------------------------+-----------+----------+---------+----------+--------------+-------------
ts | timestamp with time zone | | not null | | plain | |
agg_2_2 | bytea | | | | extended | |
chunk_id | integer | | | | plain | |
Indexes:
"_materialized_hypertable_8_ts_idx" btree (ts DESC)
Triggers:
ts_insert_blocker BEFORE INSERT ON _timescaledb_internal._materialized_hypertable_8 FOR EACH ROW EXECUTE FUNCTION _timescaledb_internal.insert_blocker()
Access method: heap
It seems your sum(value)
will not allow because there’s some hacks to get same data internally.
The simplest solution is just insert in the raw hypertable and make it compute the average again.
Another approach is make another table with this backfilling data and then union all data from caggs with data from your table.
create table my_old_cagg as select * from old_cagg;
create view my_cagg as select * from my_old_cagg union all my_new_cagg;
We’re revisiting it today in the Community Slack and working on a new test it just works inserting directly in the continuous aggregates name. Here’s a POC:
CREATE MATERIALIZED VIEW event_per_day WITH (
timescaledb.continuous,
timescaledb.materialized_only = TRUE
) AS
SELECT
time_bucket('1 day'::INTERVAL, created_at) AS ts,
count(*) AS count
FROM events
GROUP BY 1
WITH NO DATA;
CREATE MATERIALIZED VIEW
Insert directly in the materialized view:
insert into event_per_day (ts, count) values (now() - interval '5 years', 2);
INSERT 0 1
Checking the raw data from the hypertable:
tsdb=> SELECT
time_bucket('1 day'::INTERVAL, created_at) AS ts,
count(*) AS count
FROM events
GROUP BY 1;
ts | count
---------------------+-------
2024-09-30 00:00:00 | 570
2024-10-01 00:00:00 | 2997
2024-10-02 00:00:00 | 2980
2024-10-03 00:00:00 | 2911
2024-10-04 00:00:00 | 542
2024-10-30 00:00:00 | 5
(6 rows)
Now, let’s refresh the hypertable to combine the older range and the new one:
tsdb=> call refresh_continuous_aggregate('event_per_day', '2024-09-30','2024-10-30');
CALL
Checked merged results:
tsdb=> table event_per_day ;
ts | count
----------------------------+-------
2019-11-22 23:50:25.041064 | 2
2024-09-30 00:00:00 | 570
2024-10-03 00:00:00 | 2911
2024-10-01 00:00:00 | 2997
2024-10-02 00:00:00 | 2980
2024-10-04 00:00:00 | 542
(6 rows)
Hypertable structure
tsdb=> \d events
Table "public.events"
Column | Type | Collation | Nullable | Default
------------+--------------------------------+-----------+----------+---------
identifier | character varying | | not null |
payload | jsonb | | |
created_at | timestamp(6) without time zone | | not null |
updated_at | timestamp(6) without time zone | | not null |
Indexes:
"events_created_at_idx" btree (created_at DESC)
Triggers:
ts_cagg_invalidation_trigger AFTER INSERT OR DELETE OR UPDATE ON events FOR EACH ROW EXECUTE FUNCTION _timescaledb_functions.continuous_agg_invalidation_trigger('341')
ts_insert_blocker BEFORE INSERT ON events FOR EACH ROW EXECUTE FUNCTION _timescaledb_functions.insert_blocker()
Number of child tables: 6 (Use \d+ to list them.)