Hi,
Usually I’d just run it as this in a query: CALL decompress_backfill(staging_table=>‘backfill_event_data’, destination_hypertable=>‘event_data’);
When adding the job, I do it like this: SELECT add_job(‘decompress_backfill’,‘x days’,’{“staging_table”:“backfill_event_data”, “destination_hypertable”:“event_data”}’, NOW());
When I try to run the job, I do it like this where is the job number/index: CALL run_job(n);
Below is the function documentation.
NOTICE: CREATE OR REPLACE PROCEDURE public.decompress_backfill(staging_table regclass, destination_hypertable regclass, on_conflict_action text DEFAULT ‘NOTHING’::text, delete_from_staging boolean DEFAULT true, compression_job_push_interval interval DEFAULT ‘1 day’::interval, on_conflict_update_columns text[] DEFAULT ‘{}’::text[], skip_empty_ranges boolean DEFAULT false)
LANGUAGE plpgsql
AS procedure
DECLARE
source text := staging_table::text; – Forms a properly quoted table name from our regclass
dest text := destination_hypertable::text;
dest_nspname name;
dest_relname name;
hypertable_row _timescaledb_catalog.hypertable;
dimension_row _timescaledb_catalog.dimension;
dimension_slice_row _timescaledb_catalog.dimension_slice;
min_time_internal bigint;
max_time_internal bigint;
unformatted_move_stmt text ;
on_conflict_clause text := '';
r_start text := NULL;
r_end text := NULL;
r_end_prev text := NULL;
affected bigint;
old_compression_job_time timestamptz;
chunks_decompressed bool;
current_slice_has_rows boolean := true;
BEGIN
SELECT (get_schema_and_table_name(destination_hypertable)).* INTO STRICT dest_nspname, dest_relname;
–This should throw an error if we can’t cast the staging table’s type into the hypertable’s type, which means the inserts won’t work.
EXECUTE FORMAT('SELECT row(h.)::%1$s FROM %2$s AS h LIMIT 1’, source, dest);
– Make sure our source table has been analyzed so our selects are better later
EXECUTE FORMAT(‘ANALYZE %s’, source);
–Get our hypertable
SELECT h. INTO STRICT hypertable_row FROM _timescaledb_catalog.hypertable h
WHERE table_name = dest_relname AND schema_name = dest_nspname ;
--And our time dimension, which is always the first dimension
SELECT d.* INTO STRICT dimension_row FROM _timescaledb_catalog.dimension d WHERE hypertable_id = hypertable_row.id ORDER BY id LIMIT 1 ;
-- Push the compression job out for some period of time so we don't end up compressing a decompressed chunk
-- Don't disable completely because at least then if we fail and fail to move it back things won't get completely weird
SELECT move_compression_job(hypertable_row.id, hypertable_row.schema_name, hypertable_row.table_name, now() + compression_job_push_interval) INTO old_compression_job_time;
--Get the min and max times in timescale internal format from the source table, this will tell us which chunks we need to decompress
EXECUTE FORMAT($$SELECT _timescaledb_internal.time_to_internal(min(%1$I)) ,
_timescaledb_internal.time_to_internal(max(%1$I))
FROM %2$s $$, dimension_row.column_name, source)
INTO STRICT min_time_internal, max_time_internal;
--Set up our move statement to be used with the right formatting in each of the loop executions
-- Note that the table names and literal time values are properly formatted outside and so are
-- passed in as raw strings. We cannot re-format as they will then have extra quotes.
IF delete_from_staging THEN
unformatted_move_stmt = $$
WITH to_insert AS (DELETE
FROM %1$s --source table
WHERE %2$I >= %3$s -- time column >= range start
AND %2$I < %4$s -- time column < range end
RETURNING * )
INSERT INTO %5$s
SELECT * FROM to_insert
%6$s -- ON CONFLICT CLAUSE if it exists
$$;
ELSE
unformatted_move_stmt = $$
WITH to_insert AS (SELECT *
FROM %1$s --source table
WHERE %2$I >= %3$s -- time column >= range start
AND %2$I < %4$s) -- time column < range end)
INSERT INTO %5$s
SELECT * FROM to_insert
%6$s -- ON CONFLICT CLAUSE if it exists
$$;
END IF;
IF UPPER(on_conflict_action) = 'NOTHING' THEN
on_conflict_clause = 'ON CONFLICT DO NOTHING';
ELSEIF UPPER(on_conflict_action) = 'UPDATE' THEN
SELECT 'ON CONFLICT DO UPDATE SET ' || STRING_AGG(FORMAT('%1$I = EXCLUDED.%1$I', on_conflict_update_column), ', ')
FROM UNNEST(on_conflict_update_columns) AS on_conflict_update_column INTO on_conflict_clause;
END IF;
--Loop through the dimension slices that that are impacted
FOR dimension_slice_row IN
SELECT ds.*
FROM _timescaledb_catalog.dimension_slice ds
WHERE dimension_id = dimension_row.id
-- find the dimension slices that overlap with the data in our staging table
-- the range_ends are non inclusive, the range_starts are inclusive
AND max_time_internal >= ds.range_start AND min_time_internal < ds.range_end
ORDER BY ds.range_end
LOOP
--Set the previous r_end, so that we can insert from the previous (or the min) to
--the start, this will catch any rows that are in the source table for which we
--haven't yet made a chunk in the dest hypertable.
r_end_prev = COALESCE(r_end, _timescaledb_internal.time_literal_sql(min_time_internal, dimension_row.column_type));
-- now actually move rows
r_start = _timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimension_row.column_type);
r_end = _timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimension_row.column_type);
-- catch any stray rows that fall into a chunk that doesn't exist yet by expanding
-- our range to the lower of r_end_prev and r_start, there is a case where r_start
-- can be lower, which is if r_end_prev was actually the minimum in the in the
-- source table. We won't compress the new chunks that are created, the
-- compression job will pick those up when we re-activate it.
r_start =LEAST(r_end_prev, r_start);
-- check if the current slice contains data that needs to be moved
IF skip_empty_ranges THEN
EXECUTE FORMAT(
'SELECT count(*) > 0 FROM %1$s WHERE %2$s >= %3$s AND %2$s < %4$s LIMIT 1',
source, dimension_row.column_name, r_start, r_end)
INTO current_slice_has_rows;
END IF;
-- skip if there is nothing to move and the flag is set
CONTINUE WHEN skip_empty_ranges AND NOT current_slice_has_rows;
-- decompress the chunks in the dimension slice, committing transactions after each decompress
CALL decompress_dimension_slice(dimension_slice_row, chunks_decompressed);
EXECUTE FORMAT(unformatted_move_stmt
, source
, dimension_row.column_name
, r_start
, r_end
, dest
, on_conflict_clause
);
GET DIAGNOSTICS affected = ROW_COUNT;
RAISE NOTICE '% rows moved in range % to %', affected, r_start, r_end ;
COMMIT;
-- recompress the chunks in the dimension slice, committing transactions after each recompress
IF chunks_decompressed THEN
CALL compress_dimension_slice(dimension_slice_row);
END IF;
END LOOP;
-- catch any stray rows that fall into new chunks that need to be created between our
-- final chunk and the max in the source table, We won't compress the new chunks that are
-- created, the job will pick those up when we re-activate it.
r_start = COALESCE(r_end, _timescaledb_internal.time_literal_sql(min_time_internal, dimension_row.column_type)); --if there were no rows inserted into a chunk, r_end wouldn't be defined.
r_end = _timescaledb_internal.time_literal_sql(max_time_internal+1, dimension_row.column_type); -- add one here, so that we can still use < rather than <= (our internal representation is a bigint)
EXECUTE FORMAT(unformatted_move_stmt
, source
, dimension_row.column_name
, r_start
, r_end
, dest
, on_conflict_clause
);
GET DIAGNOSTICS affected = ROW_COUNT;
RAISE NOTICE '% rows moved in range % to %', affected, r_start, r_end ;
COMMIT;
–Move our job back to where it was
SELECT move_compression_job(hypertable_row.id, hypertable_row.schema_name, hypertable_row.table_name, old_compression_job_time) INTO old_compression_job_time;
COMMIT;
END;
procedure
DO