This PR provides a PostgreSQL output plugin.
This is a continuation of #3428 …but massively reworked. There are a lot of breaking changes to address issues/limitations, as well as performance optimizations.
The performance optimizations are not minor either. The original code from #3428 benchmarked at around 250 points per second (using real production data). Its been a bit since I last ran a benchmark, but last time I did I was able to get around 60,000 points per second.
closes #3408 closes #3428
Major changes from #3428:
* Deterministic tags when using `tags_as_foreign_keys`.
Previous code utilized an auto-generated serial to obtain the tag ID for a set of tags. This resulted in a severe bottleneck as the tag IDs have to be looked up from the database for every single series. The new code hashes the tags & their values (within telegraf) to generate the tag ID.
* Connection pooling & concurrency.
The plugin allows multiple simultaneous inserts into the database. Each batch is split into a separate insert (COPY) per measurement/table, which can run in parallel. If the plugin receives batches faster than a single connection can write, each batch will also be inserted in parallel.
* Use of `numeric` data type for `uint64`.
Previous code used `bigint` for `uint64`. This would result in overflow errors when inserting values larger than an `int64` as `bigint` is a signed 64-bit integer. `numeric` is an arbitrary precision exact value numeric data type. It is less performant than `bigint`, but it's the only datatype that can hold the full `uint64` range.
* More powerful schema modification template statements.
For table creation & column addition, a more powerful mechanism was implemented. The new template statements use golang's text/template library with lots of provided variables, methods, & functions to allow for a virtually any use case. Some example configs are provided below to demonstrate. Comprehensive documentation on this functionality will be added.
* Better error handling.
The old code didn't use transactions, and would infinitely retry the entire batch on error. This resulted in things like duplicate inserts. As mentioned earlier, each measurement is a separate sub-batch so this mitigates some of the scope of errors. Each sub-batch is inserted in a transaction, so there's no risk of duplicates. In addition, the plugin is able to discern between temporary and permanent errors. A permanent error is something like bad SQL that will always fail now matter how many times you retry it. A temporary error is something like a failed connection, where a retry may succeed. Temporary errors will infinitely retry (with incremental backoff), while permanent errors will discard the sub-batch.
Note that there are breaking changes in the db schema:
* If using `tags_as_foreign_keys`, the `tag_id` column type is now `bigint`. However if the column type is changed to `bigint` preserving the records, then while the plugin will use new `tag_id` values, and insert new records, joins should be functional.
* If field type is `uint64`, the column type is now `numeric` instead of `bigint`. Leaving the column type as `bigint` should still work unless values exceed the maximum of `bigint`.
* Tag columns, when not using `tags_as_foreign_keys`, must be [commented](https://www.postgresql.org/docs/12/sql-comment.html) with the string `tag` at the beginning. Failure to do so will result in some of the add column template functionality not working properly.
Right now this PR is a draft as I want to run it in an environment with a TimescaleDB similar to how we plan on deploying it to production. However there are several blocking issues with TimescaleDB preventing me from being able to do this. All the tests are still for the old code, and have not been updated (and thus won't even compile). There's also still a few more minor changes to make, as well as general cleanup. So I'm taking this opportunity to put it out for preview so that feedback can be gathered and any architectural changes can be made.
I have not done exhaustive testing, so there may be bugs. I do not know of any right now, so if any are found, please raise them.
### Example templating
Below are some example templates I've been using in my testing. The scenario is for use with TimescaleDB. The templates basically allow creating the tables in the `telegraf` schema, and then a view in the `public` schema which joins the tag table and the data table making it easier to work with. In addition since you cannot add columns to a TimescaleDB hypertable with compression, it creates a new table when columns are added, and creates another view which `UNION`s the old and new tables.
This is probably one of the most complex use cases possible, but it demonstrates the power and flexibility of the templating.
```toml
tags_as_foreignkeys = true
schema = 'telegraf'
create_templates = [
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1h')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2h')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
add_column_templates = [
'''ALTER TABLE {{ .table }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash).WithSchema "" }}''',
'''ALTER VIEW {{ .table.WithSuffix "_data" }} RENAME TO {{ (.table.WithSuffix "_" .table.Columns.Hash "_data").WithSchema "" }}''',
'''DROP VIEW {{ .table.WithSchema "public" }}''',
'''CREATE TABLE {{ .table }} ({{ .allColumns }})''',
'''SELECT create_hypertable({{ .table|quoteLiteral }}, 'time', chunk_time_interval => INTERVAL '1h')''',
'''ALTER TABLE {{ .table }} SET (timescaledb.compress, timescaledb.compress_segmentby = 'tag_id')''',
'''SELECT add_compression_policy({{ .table|quoteLiteral }}, INTERVAL '2h')''',
'''CREATE VIEW {{ .table.WithSuffix "_data" }} AS SELECT {{ .allColumns.Selectors | join "," }} FROM {{ .table }} UNION ALL SELECT {{ (.allColumns.Union .table.Columns).Selectors | join "," }} FROM {{ .table.WithSuffix "_" .table.Columns.Hash "_data" }}''',
'''CREATE VIEW {{ .table.WithSchema "public" }} AS SELECT time, {{ (.tagTable.Columns.Tags.Concat .allColumns.Fields).Identifiers | join "," }} FROM {{ .table.WithSuffix "_data" }} t, {{ .tagTable }} tt WHERE t.tag_id = tt.tag_id''',
]
```
### Required for all PRs:
- [x] Signed [CLA](https://influxdata.com/community/cla/).
- [x] Associated README.md updated.
- [x] Has appropriate unit tests.