Real time aggregation of historical blockchain trades

Say I have some table with a bunch of swap events (token in/out, usd in/out). What would be the best way to have real time aggregates on the data? (pnl, winrate, etc). I tried writing continuous but it’s difficult to write a FIFO sql query without doing CTEs

Hey @wozhendeai, that looks like a great challenge. I was playing around here; let’s explore a possible solution:

1. Base Hypertable Structure

CREATE TABLE swap_events (
  id SERIAL,
  time TIMESTAMPTZ NOT NULL,
  token_address TEXT NOT NULL,
  token_in NUMERIC,
  token_out NUMERIC,
  usd_in NUMERIC,
  usd_out NUMERIC,
  wallet_address TEXT,
  PRIMARY KEY (id, time)  -- Include time in the primary key for TimescaleDB
);

-- Convert to a TimescaleDB hypertable
SELECT create_hypertable('swap_events', by_range('time', INTERVAL '1 week'));

For straightforward metrics that don’t require FIFO accounting, continuous aggregates work perfectly:

CREATE MATERIALIZED VIEW swap_events_hourly WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 hour', time) AS bucket,
  wallet_address,
  token_address,
  SUM(usd_in) AS total_usd_in,
  SUM(usd_out) AS total_usd_out,
  SUM(token_in) AS total_token_in,
  SUM(token_out) AS total_token_out,
  COUNT(*) AS swap_count,
  COUNT(CASE WHEN token_out > 0 THEN 1 END) AS sell_count,
  COUNT(CASE WHEN token_in > 0 THEN 1 END) AS buy_count
FROM swap_events
GROUP BY bucket, wallet_address, token_address;

-- Set refresh policy
SELECT add_continuous_aggregate_policy('swap_events_hourly',
  start_offset => INTERVAL '1 day',
  end_offset => INTERVAL '1 hour',
  schedule_interval => INTERVAL '1 hour');

As timescaledb does not support the complex FIFO accounting, we use a regular view with window functions:

CREATE OR REPLACE VIEW swap_fifo_pnl AS
WITH token_queue AS (
  SELECT
    time,
    id,
    token_address,
    wallet_address,
    token_in,
    token_out,
    usd_in,
    usd_out,
    SUM(token_in) OVER (
      PARTITION BY wallet_address, token_address
      ORDER BY time, id
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) - SUM(token_out) OVER (
      PARTITION BY wallet_address, token_address
      ORDER BY time, id
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS token_balance,
    SUM(token_in) OVER (
      PARTITION BY wallet_address, token_address
      ORDER BY time, id
    ) AS cumulative_token_in,
    SUM(token_out) OVER (
      PARTITION BY wallet_address, token_address
      ORDER BY time, id
    ) AS cumulative_token_out,
    SUM(usd_in) OVER (
      PARTITION BY wallet_address, token_address
      ORDER BY time, id
    ) AS cumulative_usd_in
  FROM swap_events
),
fifo_calcs AS (
  SELECT
    time,
    id,
    token_address,
    wallet_address,
    token_in,
    token_out,
    usd_in,
    usd_out,
    token_balance,
    cumulative_token_in,
    cumulative_token_out,
    cumulative_usd_in,
    CASE 
      WHEN token_out > 0 THEN
        -- Calculate the average cost basis for tokens being sold using FIFO
        usd_out - (token_out * 
          (LAG(cumulative_usd_in, 1, 0) OVER (PARTITION BY wallet_address, token_address ORDER BY time, id) / 
           LAG(cumulative_token_in, 1, 1) OVER (PARTITION BY wallet_address, token_address ORDER BY time, id)))
      ELSE 0
    END AS realized_pnl
  FROM token_queue
)
SELECT
  time,
  wallet_address,
  token_address,
  token_in,
  token_out,
  usd_in,
  usd_out,
  token_balance,
  realized_pnl,
  SUM(realized_pnl) OVER (
    PARTITION BY wallet_address, token_address
    ORDER BY time, id
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS cumulative_pnl
FROM fifo_calcs;

Let’s also build Win Rate:

-- Query for performance metrics (win rate, etc.)
SELECT
  wallet_address,
  token_address,
  COUNT(*) AS total_trades,
  COUNT(CASE WHEN realized_pnl > 0 THEN 1 END) AS winning_trades,
  ROUND(COUNT(CASE WHEN realized_pnl > 0 THEN 1 END)::numeric / NULLIF(COUNT(*), 0) * 100, 2) AS win_rate,
  SUM(realized_pnl) AS total_pnl
FROM swap_fifo_pnl
WHERE token_out > 0
GROUP BY wallet_address, token_address;

Use continuous aggregates for simple metrics and views for complex calculations.

You can also use Materialized Views for frequently accessed FIFO calculations, consider creating materialized views that you refresh on a schedule:

CREATE MATERIALIZED VIEW fifo_pnl_daily AS
SELECT * FROM swap_fifo_pnl WHERE time > now() - INTERVAL '30 days';

That’s not Timescale but it’s PostgreSQL :nerd_face:

  1. Chunking Time Periods: For large datasets, query the FIFO view with time constraints:
    SELECT * FROM swap_fifo_pnl 
    WHERE time > now() - INTERVAL '7 days'
    AND wallet_address = '0xuser1';
    

I hope this helps! Let me know if you have any questions about implementing this approach.
Here is the full example if you want to play with it: sql-snippets/swap_finance.sql at master · jonatas/sql-snippets · GitHub

1 Like

Thanks a lot. I was really trying to get the accounting to work in Timescale but it seems like I’ll end up needing to do it in a regular materialized view if I don’t want to add a table specifically for the FIFO calculations

1 Like

Yes. That’s a way to avoid reprocessing it for each query. It is not the most optimal solution and is also limited to the number of necessary days.

I also added the example to our templates: Add swap FIFO accounting functionality and enhance README.md · timescale/templates@509f790 · GitHub

Feel free to send corrections :nerd_face: