Ruby

Nov 28, 2024

Building a Better Ruby ORM for Time Series and Analytics

Building a Better Ruby ORM for Time Series and Analytics

Rails developers know the joy of working with ActiveRecord. DHH didn’t just give us a framework; he gave us a philosophy, an intuitive way to manage data that feels delightful. But when it comes to time-series data, think metrics, logs, or events, ActiveRecord can start to feel a little stretched. Handling huge volumes of time-stamped data efficiently for analytics? That’s a challenge it wasn’t designed to solve (and neither was PostgreSQL).

This is where TimescaleDB comes in. Built on PostgreSQL (it’s an extension), TimescaleDB is purpose-built for time series and other demanding workloads, and thanks to the timescaledb gem, it integrates seamlessly into Rails. You don’t have to leave behind the conventions or patterns you love, it just works alongside them.

One of TimescaleDB’s standout features is continuous aggregates. Think of them as an upgrade to materialized views, automatically refreshing in the background so your data is always up-to-date and fast to query. With the new timescaledb gem continuous aggregates macro, you can define hierarchical time-based summaries in a single line of Ruby. It even reuses your existing ActiveRecord scopes, so you’re not duplicating logic you’ve already written.

Now, your Rails app can effortlessly handle real-time analytics dashboards or historical reports, scaling your time-series workloads while staying true to the Rails philosophy.

Better Time-Series Data Aggregations Using Ruby: The Inspiration

The following code snippet highlights the real-life use case that inspired me to build a continuous aggregates macro for better time-series data aggregations. It’s part of a RubyGems contribution I made, and it’s still a work in progress. However, it’s worth validating how this idea can reduce the Ruby code you’ll have to maintain.

Example model

class Download < ActiveRecord::Base
  extend Timescaledb::ActsAsHypertable
  include Timescaledb::ContinuousAggregatesHelper

  acts_as_hypertable time_column: 'ts'

  scope :total_downloads, -> { select("count(*) as total") }
  scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) }
  scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) }

  continuous_aggregates(
    timeframes: [:minute, :hour, :day, :month],
    scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
    refresh_policy: {
      minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
      hour:   { start_offset: "4 hour",     end_offset: "1 hour",   schedule_interval: "1 hour" },
      day:    { start_offset: "3 day",      end_offset: "1 day",    schedule_interval: "1 day" },
      month:  { start_offset: "3 month",    end_offset: "1 day",  schedule_interval: "1 day" }
  })
end

The refresh_policy will work for all basic frames, but it is not mandatory and can be skipped. Now, remember that declaring the macro in the model has almost no effect until you run a migration that uses such metadata. The creation of the continuous aggregates needs to happen on a database migration through the call of migration helpers that can use the information. Let’s take a look at the helpers we have.

The migration helpers

The macro will create a continuous aggregate in the model, but for migration, it can generate the SQL code for all the views iterating on each timeframe and scope you specify.

The create_continuous_aggregates and drop_continuous_aggregates methods are designed to be invoked during the database migration step.

So, after saving your model with the new continuous_aggregate definition, you can use the create_continuous_aggregate method to invoke the creation of all materialized views in the database. If you use refresh_policy, it will also add all the policies along with the aggregation. Here’s what a migration file would look like:

class SetupMyAmazingCaggsMigration < ActiveRecord::Migration[7.0]
  def up
    Download.create_continuous_aggregates
  end

  def down
    Download.drop_continuous_aggregates
  end
end

It will automatically create all the continuous aggregates for all timeframes and scopes in the right dependency order. When the create_continuous_aggregates is called, 12 continuous aggregates will be created, starting from minute to month.

The migration output

Let’s take a deep look at what the SQL behind the scenes looks like when the method create_continuous_aggregates is called. From the first scope, it builds the continuous aggregates, fetching the data from the raw data.

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, count(*) as total
FROM "downloads"
GROUP BY 1
WITH NO DATA;

Every materialization occurs independently, and to happen automatically, a refresh policy needs to be added. As it was specified generically by timeframe, it now incorporates the minute refresh for the policy.

SELECT add_continuous_aggregate_policy('total_downloads_per_minute',
  start_offset => INTERVAL '10 minutes',
  end_offset =>  INTERVAL '1 minute',
  schedule_interval => INTERVAL '1 minute');

Now, continuing the creation, it goes for the hourly level, already reusing the data from the previous materialized view.

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM "total_downloads_per_minute" 
GROUP BY 1
WITH NO DATA;

An hourly policy is also established to guarantee that it will refresh automatically. The same iteration is repeated for daily and monthly timeframes. Later, the same process will repeat for the other timeframes.

SELECT add_continuous_aggregate_policy('total_downloads_per_hour',
  start_offset => INTERVAL '4 hour',
  end_offset =>  INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM "total_downloads_per_hour" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_day',
  start_offset => INTERVAL '3 day',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS total_downloads_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM "total_downloads_per_day" GROUP BY 1
WITH NO DATA;

SELECT add_continuous_aggregate_policy('total_downloads_per_month',
  start_offset => INTERVAL '3 month',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, gem_name, count(*) as total FROM "downloads" GROUP BY 1, gem_name
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_minute',
  start_offset => INTERVAL '10 minutes',
  end_offset =>  INTERVAL '1 minute',
  schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_minute" GROUP BY 1, gem_name
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_hour',
  start_offset => INTERVAL '4 hour',
  end_offset =>  INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_hour" GROUP BY 1, gem_name
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_day',
  start_offset => INTERVAL '3 day',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, gem_name, sum(total) as total FROM "downloads_by_gem_per_day" GROUP BY 1, gem_name
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_gem_per_month',
  start_offset => INTERVAL '3 month',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_minute
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', ts) as ts, gem_name, gem_version, count(*) as total FROM "downloads" GROUP BY 1, gem_name, gem_version
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_minute',
  start_offset => INTERVAL '10 minutes',
  end_offset =>  INTERVAL '1 minute',
  schedule_interval => INTERVAL '1 minute');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_hour
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 hour', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_minute" GROUP BY 1, gem_name, gem_version
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_hour',
  start_offset => INTERVAL '4 hour',
  end_offset =>  INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_day
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 day', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_hour" GROUP BY 1, gem_name, gem_version
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_day',
  start_offset => INTERVAL '3 day',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_month
WITH (timescaledb.continuous) AS
SELECT time_bucket('1 month', ts) as ts, gem_name, gem_version, sum(total) as total FROM "downloads_by_version_per_day" GROUP BY 1, gem_name, gem_version
WITH NO DATA;

SELECT add_continuous_aggregate_policy('downloads_by_version_per_month',
  start_offset => INTERVAL '3 month',
  end_offset =>  INTERVAL '1 day',
  schedule_interval => INTERVAL '1 day');

That’s massive, right?! It’s probably too boring to read it all because it’s almost a repetitive structure, iterating over all the scopes. The continuous_aggregates leverages all logic by iterating over all the timeframes with all scopes. It reuses minute data in the hourly view and uses the same technique from hour to day, day to month, and so on.

In contrast, reusing the aggregations, if written all by hand, makes the process really error-prone. The Model.drop_continuous_aggregates method uses the reverse dependency path to call the drop materialized view from month to minute.

Continuously aggregating statistics can replace dozens of background jobs hosted by your application, avoiding serialization and deserialization efforts apart from bandwidth, I/O (input/output), and overuse of resources in general.

Reusing the previous timeframes makes it very fast and lightweight for the database to process. Adopting hierarchical processing also allows all processing to be done at a predictable speed because the number of rows will be static and only dependent on the cardinality of the data.

Processing aggregations in the database means there will only be calls between the database and the disk, releasing interactions between the application and the database and forcing network data trips to process it on application background jobs.

Now, let’s take a look at how the rollup works.

Hyperfunctions Integration for Faster Time-Series Analysis

Timescale also built a specialized extension for time-series data processing, the timescaledb-toolkit. It helps improve the developer experience and query performance, and most of its functions are called hyperfunctions.

Hyperfunctions are designed to reuse and make statistics fast for hypertables, allowing you to roll up granular aggregations into bigger timeframes. In the case of the Ruby library, it should work well with both regular statistics functions and also roll up the hyperfunctions already available.

The most important part of using multiple timeframes and scopes is to understand how the rollup scope works. 

For example, if you have a scope called total_downloads and a timeframe of day, the rollup will rewrite the query to group by the day.

# Original query
SELECT count(*) FROM downloads;

# Rolled up query
SELECT time_bucket('1 day', created_at) AS day, count(*) FROM downloads GROUP BY day;

In Ruby, the rollup method will help to roll up such queries in a more efficient way. Let’s consider the total_downloads scope as an example:

Download.total_downloads.map(&:attributes) #  => [{"total"=>6175}
# SELECT count(*) as total FROM "downloads"

The rollup scope will help to group data by a specific timeframe. Let’s start with one minute:

Download.total_downloads.rollup("'1 min'").map(&:attributes)
# SELECT time_bucket('1 min', ts) as ts, count(*) as total FROM "downloads" GROUP BY 1
=> [{"ts"=>2024-04-26 00:10:00 UTC, "total"=>110},
 {"ts"=>2024-04-26 00:11:00 UTC, "total"=>1322},
 {"ts"=>2024-04-26 00:12:00 UTC, "total"=>1461},
 {"ts"=>2024-04-26 00:13:00 UTC, "total"=>1150},
 {"ts"=>2024-04-26 00:14:00 UTC, "total"=>1127},
 {"ts"=>2024-04-26 00:15:00 UTC, "total"=>1005}]

As you can see, the time_bucket function is introduced, and a group by clause is also added.

If the current query uses a component like candlestick_agg, it will be able to call the rollup SQL function, and that’s where the name of the function comes from.

What if I want to sum the counters from the materialized view behind the scenes and roll up to a bigger frame? That’s when the aggregated classes join the game.

Continuous aggregates are hypertables. They’re materialized views that are periodically being updated in the background according to the refresh policy. Every aggregation can be accessed and refreshed independently.

Aggregates classes

In the previous example, the rollup was done directly in the raw data. Now, let’s explore how the continuous_aggregates macro creates a class for each aggregated view that is in the database. The classes can be accessed as subclasses in the model and also inherit the model as they’re fully dependent on it.

So, to access the materialized data, instead of building the query from raw data, nested classes are created with the Model::ScopeNamePerTimeframe naming convention.

Download::TotalDownloadsPerMinute.all.map(&:attributes)
# SELECT "total_downloads_per_minute".* FROM "total_downloads_per_minute"
=> [{"ts"=>2024-04-26 00:10:00 UTC, "total"=>110},
 {"ts"=>2024-04-26 00:11:00 UTC, "total"=>1322},
 {"ts"=>2024-04-26 00:12:00 UTC, "total"=>1461},
 {"ts"=>2024-04-26 00:13:00 UTC, "total"=>1150},
 {"ts"=>2024-04-26 00:14:00 UTC, "total"=>1127},
 {"ts"=>2024-04-26 00:15:00 UTC, "total"=>1005}]

To roll up from the materialized data, we need to consider how the data was built. So, to have the counter, we need to count rows from the hypertable raw data, but for bigger timeframes, we can just sum the counters. Here’s what it looks like if you need to roll up any scope to other timeframes:

Download::TotalDownloadsPerMinute.select("sum(total) as total").rollup("'2 min'").map(&:attributes)
# SELECT time_bucket('2 min', ts) as ts, sum(total) as total FROM "total_downloads_per_minute" GROUP BY 1
=> [{"ts"=>2024-04-26 00:12:00 UTC, "total"=>2611}, {"ts"=>2024-04-26 00:14:00 UTC, "total"=>2132}, {"ts"=>2024-04-26 00:10:00 UTC, "total"=>1432}]

With the rollup scope, you can easily build custom scopes and regroup as you need. It supports a few statistic scenarios on rollup to automatically detect SQL statements that contain count(*) as total and transform them into sum(total) as totalthem. It can also get a min of min or max of max values when it’s rolling up into larger time frames.

Refresh aggregates

If you need to refresh all aggregates manually in the right order, you can also use the refresh_aggregates method:

Download.refresh_aggregates

Next steps

That’s all, folks! I posted a few more details in my blog during the development phase. If you have any questions or feedback, join the #ruby channel on the TimescaleDB Slack. Also, GitHub ⭐s for our Ruby library are very much welcome!

To give it a try and use the continuous_aggregates macro on your project, install the timescaledb gem. Happy coding—but write fewer lines of code.

Originally posted

Nov 27, 2024

Share

Subscribe to the Timescale Newsletter

By submitting you acknowledge Timescale's Privacy Policy.