Benchmarking Postgres timeseries queries

In my domain, the energy world, we work a many with timeseries. Generally, these arrive in batches where one day (in localtime) always arrives at once and there can be multiple versions of the same data. A normal portfolio has many thousands of assets. Often one wants to aggregate these timeseries over a subset of assets, for some time range. This is not a trivial thing to do in a row-oriented database like Postgres. Below we will explore some methods to store this data, and benchmark different queries to get the same result.

See this post for more tips on how to work with psql and psycopg2 from a notebook.

In [11]:
import os, random, json, pytz
from datetime import datetime, timedelta, time
from time import time as t
from functools import lru_cache
from itertools import zip_longest

from IPython.display import Javascript
import plotly.express as px
import tqdm
In [2]:
# Install python dependencies
!conda install -y --quiet psycopg2
# Install postgres (if not already installed)
!sudo apt-get install postgresql -yqq
# It needs to run in the background
!sudo service postgresql start
!psql --version
Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.

psql (PostgreSQL) 12.4 (Ubuntu 12.4-1.pgdg20.04+1)
In [10]:
# Optionally drop the user and/or database if starting anew
# !sudo su - postgres -c "psql -c \"DROP DATABASE timeseries_benchmark\""
# !sudo su - postgres -c "psql -c \"DROP USER tse\""
# Create user, db, and give access
!sudo su - postgres -c "psql -c \"CREATE USER tse WITH PASSWORD 'abc'\" && psql -c \"CREATE DATABASE timeseries_benchmark OWNER tse\""
# Needed to be able to COPY FROM a file
!sudo su - postgres -c "psql -c \"ALTER USER tse WITH SUPERUSER\""
CREATE ROLE
CREATE DATABASE
ALTER ROLE
In [28]:
import psycopg2
conn = psycopg2.connect(user="tse", password="abc", database="timeseries_benchmark", host="localhost")

Create tables

First we make the three tables to be benchmarked, they differ:

  • actuals_array_double: One array per asset/day/version, with DOUBLE PRECISION (8 bytes) datatype
  • actuals_array: Same but with REAL (4 bytes)
  • actuals_cols: Also with REAL but split into 100 physical columns instead of ARRAYs
In [14]:
# Drop tables if running anew
# !sudo su - postgres -c "psql -d timeseries_benchmark -c \"DROP TABLE IF EXISTS actuals_array_double; DROP TABLE IF EXISTS actuals_array; DROP TABLE IF EXISTS actuals_cols;\""

# FLOAT means DOUBLE PRECISION (8B)
with conn as conn:
    with conn.cursor() as curs:
        curs.execute("""
        CREATE TABLE actuals_array_double (
          id serial PRIMARY KEY,
          asset_id INTEGER NOT NULL,
          day DATE NOT NULL,
          created TIMESTAMP NOT NULL,
          values FLOAT[] NOT NULL,
          CONSTRAINT actuals_array_double_asset_day UNIQUE (asset_id, day, created)
        );
        """)

# REAL is single precision floating point
with conn as conn:
    with conn.cursor() as curs:
        curs.execute("""
        CREATE TABLE actuals_array (
          id serial PRIMARY KEY,
          asset_id INTEGER NOT NULL,
          day DATE NOT NULL,
          created TIMESTAMP NOT NULL,
          values REAL[] NOT NULL,
          CONSTRAINT actuals_array_asset_day UNIQUE (asset_id, day, created)
        );
        """)

with conn as conn:
    with conn.cursor() as curs:
        pte_str = ",\n      ".join(f"pte{i:02d} REAL" for i in range(100))
        curs.execute(f"""
        CREATE TABLE actuals_cols (
          id serial PRIMARY KEY,
          asset_id INTEGER NOT NULL,
          day DATE NOT NULL,
          created TIMESTAMP NOT NULL,
          {pte_str},
          CONSTRAINT actuals_cols_asset_day UNIQUE (asset_id, day, created)
        );
        """)

Creating and importing testdata

I'm creating a reasonable amount of data, about 16M rows, as that is still okay to load and query on my laptop. To reach that amount of data I use 5000 assets, 3 years, and 3 versions of daily actuals.

In [15]:
# Use a cache to speed up the function call, the randomness is not critical
@lru_cache(maxsize=10000)
def randvals(d: datetime, tz = pytz.timezone("Europe/Amsterdam"), values_arr=True):
    d1 = tz.localize(datetime.combine(d, time()))
    d2 = tz.localize(datetime.combine(d + timedelta(days=1), time()))
    periods = int((d2 - d1).total_seconds() / 3600 * 4)
    values = [random.random() for i in range(periods)]
    if values_arr:
        return '"{' + ",".join(f"{v:.3f}" for v in values) + '}"'
    else:
        # Always make 100 columns, but fill with unquoted empty string
        # See parameters -> NULL here: https://www.postgresql.org/docs/9.2/sql-copy.html
        return ",".join(f"{v:.3f}" if v else "" for i, v in zip_longest(range(100), values))

def generate_fake_data(filename, n_assets=5E3, n_years=3, n_versions=3, stop_dt=datetime.now(), values_arr=True):
    with open(filename, "w") as f:
        i = 0
        for days_ago in tqdm.tqdm(range(n_years * 365)):
            for asset_id in range(int(n_assets)):
                day = (stop_dt - timedelta(days=days_ago))
                for i_v in range(n_versions):
                    i += 1
                    created = day - i_v * timedelta(days=3)
                    values = randvals(day, values_arr=values_arr)
                    line = f"{i:d}, {asset_id:d}, '{day:%Y-%m-%d}', '{created.isoformat()}', {values}"
                    f.write(line + "\n")
In [16]:
generate_fake_data("actuals_array.csv")
100%|██████████| 1095/1095 [01:50<00:00,  9.88it/s]
In [17]:
generate_fake_data("actuals_cols.csv", values_arr=False)
100%|██████████| 1095/1095 [01:47<00:00, 10.19it/s]
In [18]:
!head -1 actuals_array.csv
!head -1 actuals_cols.csv
!wc -l actuals_array.csv
1, 0, '2020-09-04', '2020-09-04T20:16:44.631097', "{0.172,0.477,0.225,0.707,0.876,0.952,0.913,0.252,0.598,0.292,0.326,0.854,0.821,0.203,0.706,0.531,0.560,0.150,0.843,0.727,0.669,0.039,0.144,0.843,0.019,0.708,0.179,0.308,0.967,0.307,0.031,0.120,0.095,0.824,0.623,0.377,0.916,0.699,0.586,0.953,0.704,0.372,0.156,0.620,0.408,0.229,0.400,0.496,0.502,0.983,0.394,0.683,0.974,0.253,0.398,0.241,0.317,0.899,0.322,0.312,0.659,0.567,0.404,0.210,0.374,0.307,0.126,0.483,0.074,0.536,0.022,0.230,0.881,0.246,0.742,0.247,0.807,0.694,0.570,0.447,0.329,0.570,0.221,0.954,0.693,0.355,0.506,0.111,0.826,0.973,0.822,0.707,0.918,0.734,0.171,0.597}"
1, 0, '2020-09-04', '2020-09-04T20:16:44.631097', 0.401,0.674,0.385,0.774,0.085,0.304,0.894,0.695,0.591,0.050,0.157,0.392,0.334,0.009,0.095,0.452,0.083,0.289,0.336,0.174,0.140,0.918,0.005,0.389,0.815,0.001,0.323,0.000,0.759,0.697,0.318,0.948,0.985,0.327,0.414,0.605,0.260,0.701,0.156,0.568,0.503,0.455,0.042,0.582,0.310,0.860,0.013,0.190,0.313,0.168,0.506,0.163,0.792,0.828,0.161,0.160,0.356,0.920,0.774,0.493,0.224,0.194,0.326,0.107,0.645,0.954,0.382,0.151,0.593,0.914,0.739,0.332,0.469,0.053,0.472,0.831,0.062,0.538,0.767,0.467,0.909,0.363,0.865,0.744,0.167,0.171,0.442,0.305,0.563,0.144,0.228,0.157,0.473,0.210,0.846,0.958,,,,
16425000 actuals_array.csv
In [19]:
def copy_from(conn, filename, table):
    path = os.path.abspath(filename)
    print(f"Importing '{filename}' to '{table}'")
    with conn as conn:
        with conn.cursor() as curs:
            curs.execute(f"DELETE FROM {table};")
            curs.execute(f"COPY {table} FROM '{path}' (FORMAT CSV);")
            curs.execute(f"SELECT COUNT(*) FROM {table};")
            n_rows = curs.fetchone()[0]
            curs.execute(f"SELECT pg_size_pretty(pg_relation_size('{table}'));")
            print(f"Table '{table}' rows and size:", n_rows, ",", curs.fetchone()[0])
In [20]:
%time copy_from(conn, "actuals_array.csv", "actuals_array_double")
%time copy_from(conn, "actuals_array.csv", "actuals_array")
%time copy_from(conn, "actuals_cols.csv", "actuals_cols")
Importing 'actuals_array.csv' to 'actuals_array_double'
Table 'actuals_array_double' rows and size: 16425000 , 14 GB
CPU times: user 17.1 ms, sys: 851 µs, total: 18 ms
Wall time: 7min 43s
Importing 'actuals_array.csv' to 'actuals_array'
Table 'actuals_array' rows and size: 16425000 , 7547 MB
CPU times: user 13 ms, sys: 1 ms, total: 14 ms
Wall time: 6min 53s
Importing 'actuals_cols.csv' to 'actuals_cols'
Table 'actuals_cols' rows and size: 16425000 , 7129 MB
CPU times: user 11.4 ms, sys: 0 ns, total: 11.4 ms
Wall time: 5min 40s

Tuning Postgres

I tuned postgres a bit to increase performance, as my machine is much larger than the default settings assume. After checking out TimescaleDB, I used their tune script which seemed to set even better values.

Find the config file with SHOW config_file;, edit it, and then restart with systemctl restart postgresql.

timescaledb-tune gave me these settings based on my system (only listed the values that were changed from their defaults):

effective_cache_size = 23976MB
maintenance_work_mem = 2047MB
work_mem = 10229kB

max_worker_processes = 19
max_parallel_workers_per_gather = 4
max_parallel_workers = 8

wal_buffers = 16MB
min_wal_size = 512MB

default_statistics_target = 500
random_page_cost = 1.1
checkpoint_completion_target = 0.9
max_locks_per_transaction = 256
autovacuum_max_workers = 10
autovacuum_naptime = 10
effective_io_concurrency = 200

Use pev2 to visualize Postgres query plans

I came across a very nice visualization tool called PEV2. It's a Vue.js module, so naturally I wanted to go the extra mile and make it possible to render this in a jupyter lab notebook. Some hours later I finally got it to work, the steps were:

  • Cloned pev2 and followed the pev2 getting started guide
  • Install vue-cli-service script using npm i @vue/cli-service
  • Compile only the Plan top-level module as UMD (universal, so will register as window.pev2 when loaded with no specific loader) ./node_modules/.bin/vue-cli-service build --target lib src/components/Plan.vue
  • Insert all required css/javascript in head/body, respectively
  • Create function that instantiates the pev2 Vue Component using a new Vue instance
  • (When rendering notebook for blog put javascript in EXTRA_HEAD_DATA in conf.py, I could not get it to reliably load when output in a cell like below)

This works surprisingly well! One could definitely create a nice jupyter lab extension this way.

I should also mention that I tried pgMustard, which gave me some pretty good tips on index use. Unfortunately it's not embeddable.

In [ ]:
%%javascript
    function addScript(url) {
        var script = document.createElement("script");
        script.type = "text/javascript";
        script.src = url;
        //document.head.appendChild(script);
        // For use on website, as anything added to <head> is not available on jupyter reload
        element.append(script);
    }
    function addCSS(url) {
        var link = document.createElement("link");

        link.type = "text/css";
        link.rel = "stylesheet";
        link.href = url;

        //document.head.appendChild(link);
        element.append(link);
    }
    addCSS("https://unpkg.com/bootstrap@4.5.0/dist/css/bootstrap.min.css");
    addCSS("https://unpkg.com/@fortawesome/fontawesome-free@5.13.0/css/all.css");
    addScript("https://cdn.jsdelivr.net/npm/vue@2.6.12/dist/vue.min.js");
    // Only works if you're running jupyter lab with this notebook as root repo and the compiled file in this folder with name pev2.umd.js
    addScript("/files/pev2.umd.js");
In [35]:
def pev2(query, plan):
    """
    Visualize a Postgres query plan with pev2. Try to pass a text query plan, not JSON,
    as text has less issues with escaping double quotes.
    """
    s = """
    var el = document.createElement('div');
    element.append(el);

    var v = new Vue({
        el: el,
        components: {
            'pev2': window.pev2
        },
        data: function() {
            return {
                plan: `""" + plan + """`,
                query: `""" + query + """`,
            }
        },
        template: "<pev2 :plan-source='plan' :plan-query='query'></pev2>"
    });
    """
    return Javascript(s)

Query 1a: UNNESTing the array without index

We start with this query that uses UNNEST() on the array of values, to get separate rows for each value, then do some GROUP BY tricks to get the final outcome. The result is an aggregated timeseries with a summed value for every timestep.

This is just to show how expensive a sort step can be, soon we will add some indexes.

In [36]:
def query_array_unnest(table="actuals_array", explain=True):
    with conn.cursor() as curs:
        query = f"""
        {'EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)' if explain else ''}

        -- Put aggregated load result in array per day. max/min for unique assets
        -- does not matter, we just need to pick one as the same value
        -- has been distributed over the rows
        SELECT
        day,
        MAX(count_asset) as count_asset,
        array_agg(s ORDER BY timeblock) as total_value
        FROM (
        SELECT
            day,
            timeblock,
            SUM(elem) as s,
            COUNT(*) as count_asset

        FROM (
            -- Select only latest rows
            SELECT DISTINCT ON (asset_id, day)
                day,
                values
            FROM {table}
            -- Sort descending by reference_datetime so it picks latest distinct value,
            -- choosing a value over NULL, and the highest ID if there are ties
            ORDER BY asset_id, day, created DESC NULLS LAST
        ) AS t, LATERAL UNNEST(t.values) WITH ORDINALITY x(elem, timeblock)

        GROUP BY day, timeblock
        ) as a
        GROUP BY day;
        """
        curs.execute(query)
        if explain:
            plan = "\n".join([l[0] for l in curs.fetchall()])
        else:
            plan = curs.fetchall()
    
    return query, plan

pev2(*query_array_unnest())
Out[36]:

Creating indexes

The queries all sort by asset_id, day, created DESC NULLS LAST. To speed this up, we create several indexes so that Postgres can do an INDEX SCAN instead of a SORT, which is much faster.

We also ANALYZE the tables to give the query planner some up-to-date statistics.

In [40]:
with conn.cursor() as curs:
    def create_idx(table):
        curs.execute(f"CREATE INDEX IF NOT EXISTS {table}_asset_id_day_created "
                     f"ON {table} (asset_id, day, created DESC NULLS LAST); "
                     f"ANALYZE {table};")
    create_idx("actuals_array_double")
    create_idx("actuals_array")
    create_idx("actuals_cols")

Query 1: UNNESTing with index

Now that we have relevant indexes, we can eliminate the very expensive SORT step.

In [41]:
pev2(*query_array_unnest())
Out[41]:

Query 2: Array subscripting

Our first idea did work, but the query time is very high, even when we created a relevant index.

My hunch is that UNNEST() creates so many rows (about two orders of magnitude more) that the aggregate step is much slower. Let's try to subscript the array to go directly to many columns, instead of many rows.

In [42]:
def query_array_subscript(table='actuals_array'):
    with conn.cursor() as curs:
        # Maximum amount of columns
        n_cols = 100
        array_index_str = ", ".join(f'values[{i+1}] as t{i:02d}' for i in range(n_cols))
        array_sum_str = ", ".join(f"SUM(t{i:02d})" for i in range(n_cols))
        query = f"""
        EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)

        SELECT
        day,
        COUNT(*) as count_asset,
        {array_sum_str}

        FROM (
        -- Select only latest rows
        SELECT DISTINCT ON (asset_id, day)
            day,
            {array_index_str}
        FROM {table}
        ORDER BY asset_id, day, created DESC NULLS LAST
        ) AS t

        GROUP BY day;
        """
        curs.execute(query)
        plan = "\n".join([l[0] for l in curs.fetchall()])
    
    return query, plan

pev2(*query_array_subscript())
Out[42]:

Query 3: Pre-made columns

That went a lot better! 3x speedup, that's not bad. Now we have to try the obvious next step: pre-create these columns and query that.

In [43]:
with conn.cursor() as curs:
    table = 'actuals_cols'
    n_cols = 100
    col_str = ", ".join(f'pte{i:02d}' for i in range(n_cols))
    array_sum_str = ", ".join(f"SUM(pte{i:02d})" for i in range(n_cols))
    query = f"""
    EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)

    SELECT
    day,
    COUNT(*) as count_asset,
    {array_sum_str}
    FROM (
    -- Select only latest rows
    SELECT DISTINCT ON (asset_id, day)
        day,
        {col_str}
    FROM {table}
    ORDER BY asset_id, day, created DESC NULLS LAST
    ) AS t
    GROUP BY day;
    """
    curs.execute(query)
    plan = "\n".join([l[0] for l in curs.fetchall()])

pev2(query, plan)
Out[43]:

Woah! That went even better, 4x speedup (12x in total since our first try). That is pretty amazing. This format is more cumbersome to work with, though, than the nice arrays. However, with some code generation in Python it's not that bad. We do have to accomodate the maximum number of columns, which is 100 for 15-minute periods in localtime (usually 96 periods but once a year 92 and once 100 due to summertime).

Bonus query: DOUBLE PRECISION

I'm interested to see what the difference is between querying the double and single precision numeric types, so I'll re-execute the array subscripting-based query from before and see if it makes a big difference.

In [44]:
pev2(*query_array_subscript('actuals_array_double'))
Out[44]:

Interesting: very little difference, while we read much more data (14 vs 8GB). Apparently reading the actual data is not the problem, the bottleneck seems to be looking through the index.

Visualizing the query result

We've been doing the same query over and over, so now let's see what the data actually looks like. (Just the last 2 days, to avoid a huge plot.)

In [38]:
query, result = query_array_unnest(explain=False)

dt = []
val = []

for date, n_assets, values in result[:2]:
    for i, v in enumerate(values):
        dt.append(datetime.combine(date, time()) + timedelta(minutes=i*15))
        val.append(v)

fig = px.line(x=dt, y=val, title="Last two days of query result")
# Need to include it like this because plotly does not include its javascript by default any more
display(HTML(fig.to_html("plot.html", include_plotlyjs='cdn')))

Scores

So to conclude, here are the scores (lowest seen over a few runs):

Query Approx. duration
1a) UNNEST w/o index 4.5min
1b) UNNEST with index 3.5min
2a) Subscript 1min
2b) Double-precision 1min
3) Pre-made columns 20s

Lessons learned:

  • Subqueries are ok but CTEs are materialized and thus can be a barrier to optimizations like predicate push down (edited the first query after learning this)
  • UNNEST() used on large arrays creates lots of rows which might slow down subsequent aggregation steps
  • Creating columns by subscripting an array directly is faster for aggregations with large arrays (and reasonably constant array size)
  • Some postprocessing is needed with all queries except query 1 (which makes no assumptions about the number of columns)
  • Columns win over arrays for aggregations

Conclusion

It was quite cool to dive so deep into Postgres datastructures and indexes, and benchmarking various methods with representative data. Further optimization steps could be to materialize the result (although the GROUP BY is then not customizable any more) or cache it somewhere else. Also TimescaleDB looks interesting, although it is very realtime-timeseries optimized, meaning that it assumes timeseries are always append-only while this usecase might have random INSERTs for data in the past. Other NoSQL databases might also be interesting to try, although they will require much more thought on row keys, sorting and the type of queries that need to be performed.