Benchmarking Postgres timeseries queries
In my domain, the energy world, we work a many with timeseries. Generally, these arrive in batches where one day (in localtime) always arrives at once and there can be multiple versions of the same data. A normal portfolio has many thousands of assets. Often one wants to aggregate these timeseries over a subset of assets, for some time range. This is not a trivial thing to do in a row-oriented database like Postgres. Below we will explore some methods to store this data, and benchmark different queries to get the same result.
See this post for more tips on how to work with psql and psycopg2 from a notebook.
import os, random, json, pytz
from datetime import datetime, timedelta, time
from time import time as t
from functools import lru_cache
from itertools import zip_longest
from IPython.display import Javascript
import plotly.express as px
import tqdm
# Install python dependencies
!conda install -y --quiet psycopg2
# Install postgres (if not already installed)
!sudo apt-get install postgresql -yqq
# It needs to run in the background
!sudo service postgresql start
!psql --version
# Optionally drop the user and/or database if starting anew
# !sudo su - postgres -c "psql -c \"DROP DATABASE timeseries_benchmark\""
# !sudo su - postgres -c "psql -c \"DROP USER tse\""
# Create user, db, and give access
!sudo su - postgres -c "psql -c \"CREATE USER tse WITH PASSWORD 'abc'\" && psql -c \"CREATE DATABASE timeseries_benchmark OWNER tse\""
# Needed to be able to COPY FROM a file
!sudo su - postgres -c "psql -c \"ALTER USER tse WITH SUPERUSER\""
import psycopg2
conn = psycopg2.connect(user="tse", password="abc", database="timeseries_benchmark", host="localhost")
Create tables¶
First we make the three tables to be benchmarked, they differ:
- actuals_array_double: One array per asset/day/version, with DOUBLE PRECISION (8 bytes) datatype
- actuals_array: Same but with REAL (4 bytes)
- actuals_cols: Also with REAL but split into 100 physical columns instead of ARRAYs
# Drop tables if running anew
# !sudo su - postgres -c "psql -d timeseries_benchmark -c \"DROP TABLE IF EXISTS actuals_array_double; DROP TABLE IF EXISTS actuals_array; DROP TABLE IF EXISTS actuals_cols;\""
# FLOAT means DOUBLE PRECISION (8B)
with conn as conn:
with conn.cursor() as curs:
curs.execute("""
CREATE TABLE actuals_array_double (
id serial PRIMARY KEY,
asset_id INTEGER NOT NULL,
day DATE NOT NULL,
created TIMESTAMP NOT NULL,
values FLOAT[] NOT NULL,
CONSTRAINT actuals_array_double_asset_day UNIQUE (asset_id, day, created)
);
""")
# REAL is single precision floating point
with conn as conn:
with conn.cursor() as curs:
curs.execute("""
CREATE TABLE actuals_array (
id serial PRIMARY KEY,
asset_id INTEGER NOT NULL,
day DATE NOT NULL,
created TIMESTAMP NOT NULL,
values REAL[] NOT NULL,
CONSTRAINT actuals_array_asset_day UNIQUE (asset_id, day, created)
);
""")
with conn as conn:
with conn.cursor() as curs:
pte_str = ",\n ".join(f"pte{i:02d} REAL" for i in range(100))
curs.execute(f"""
CREATE TABLE actuals_cols (
id serial PRIMARY KEY,
asset_id INTEGER NOT NULL,
day DATE NOT NULL,
created TIMESTAMP NOT NULL,
{pte_str},
CONSTRAINT actuals_cols_asset_day UNIQUE (asset_id, day, created)
);
""")
Creating and importing testdata¶
I'm creating a reasonable amount of data, about 16M rows, as that is still okay to load and query on my laptop. To reach that amount of data I use 5000 assets, 3 years, and 3 versions of daily actuals.
# Use a cache to speed up the function call, the randomness is not critical
@lru_cache(maxsize=10000)
def randvals(d: datetime, tz = pytz.timezone("Europe/Amsterdam"), values_arr=True):
d1 = tz.localize(datetime.combine(d, time()))
d2 = tz.localize(datetime.combine(d + timedelta(days=1), time()))
periods = int((d2 - d1).total_seconds() / 3600 * 4)
values = [random.random() for i in range(periods)]
if values_arr:
return '"{' + ",".join(f"{v:.3f}" for v in values) + '}"'
else:
# Always make 100 columns, but fill with unquoted empty string
# See parameters -> NULL here: https://www.postgresql.org/docs/9.2/sql-copy.html
return ",".join(f"{v:.3f}" if v else "" for i, v in zip_longest(range(100), values))
def generate_fake_data(filename, n_assets=5E3, n_years=3, n_versions=3, stop_dt=datetime.now(), values_arr=True):
with open(filename, "w") as f:
i = 0
for days_ago in tqdm.tqdm(range(n_years * 365)):
for asset_id in range(int(n_assets)):
day = (stop_dt - timedelta(days=days_ago))
for i_v in range(n_versions):
i += 1
created = day - i_v * timedelta(days=3)
values = randvals(day, values_arr=values_arr)
line = f"{i:d}, {asset_id:d}, '{day:%Y-%m-%d}', '{created.isoformat()}', {values}"
f.write(line + "\n")
generate_fake_data("actuals_array.csv")
generate_fake_data("actuals_cols.csv", values_arr=False)
!head -1 actuals_array.csv
!head -1 actuals_cols.csv
!wc -l actuals_array.csv
def copy_from(conn, filename, table):
path = os.path.abspath(filename)
print(f"Importing '{filename}' to '{table}'")
with conn as conn:
with conn.cursor() as curs:
curs.execute(f"DELETE FROM {table};")
curs.execute(f"COPY {table} FROM '{path}' (FORMAT CSV);")
curs.execute(f"SELECT COUNT(*) FROM {table};")
n_rows = curs.fetchone()[0]
curs.execute(f"SELECT pg_size_pretty(pg_relation_size('{table}'));")
print(f"Table '{table}' rows and size:", n_rows, ",", curs.fetchone()[0])
%time copy_from(conn, "actuals_array.csv", "actuals_array_double")
%time copy_from(conn, "actuals_array.csv", "actuals_array")
%time copy_from(conn, "actuals_cols.csv", "actuals_cols")
Tuning Postgres¶
I tuned postgres a bit to increase performance, as my machine is much larger than the default settings assume. After checking out TimescaleDB, I used their tune script which seemed to set even better values.
Find the config file with SHOW config_file;
, edit it, and then restart with systemctl restart postgresql
.
timescaledb-tune
gave me these settings based on my system (only listed the values that were changed from their defaults):
effective_cache_size = 23976MB
maintenance_work_mem = 2047MB
work_mem = 10229kB
max_worker_processes = 19
max_parallel_workers_per_gather = 4
max_parallel_workers = 8
wal_buffers = 16MB
min_wal_size = 512MB
default_statistics_target = 500
random_page_cost = 1.1
checkpoint_completion_target = 0.9
max_locks_per_transaction = 256
autovacuum_max_workers = 10
autovacuum_naptime = 10
effective_io_concurrency = 200
Use pev2 to visualize Postgres query plans¶
I came across a very nice visualization tool called PEV2. It's a Vue.js module, so naturally I wanted to go the extra mile and make it possible to render this in a jupyter lab notebook. Some hours later I finally got it to work, the steps were:
- Cloned pev2 and followed the pev2 getting started guide
- Install vue-cli-service script using
npm i @vue/cli-service
- Compile only the Plan top-level module as UMD (universal, so will register as
window.pev2
when loaded with no specific loader)./node_modules/.bin/vue-cli-service build --target lib src/components/Plan.vue
- Insert all required css/javascript in head/body, respectively
- Create function that instantiates the pev2 Vue Component using a new Vue instance
- (When rendering notebook for blog put javascript in EXTRA_HEAD_DATA in conf.py, I could not get it to reliably load when output in a cell like below)
This works surprisingly well! One could definitely create a nice jupyter lab extension this way.
I should also mention that I tried pgMustard, which gave me some pretty good tips on index use. Unfortunately it's not embeddable.
%%javascript
function addScript(url) {
var script = document.createElement("script");
script.type = "text/javascript";
script.src = url;
//document.head.appendChild(script);
// For use on website, as anything added to <head> is not available on jupyter reload
element.append(script);
}
function addCSS(url) {
var link = document.createElement("link");
link.type = "text/css";
link.rel = "stylesheet";
link.href = url;
//document.head.appendChild(link);
element.append(link);
}
addCSS("https://unpkg.com/bootstrap@4.5.0/dist/css/bootstrap.min.css");
addCSS("https://unpkg.com/@fortawesome/fontawesome-free@5.13.0/css/all.css");
addScript("https://cdn.jsdelivr.net/npm/vue@2.6.12/dist/vue.min.js");
// Only works if you're running jupyter lab with this notebook as root repo and the compiled file in this folder with name pev2.umd.js
addScript("/files/pev2.umd.js");
def pev2(query, plan):
"""
Visualize a Postgres query plan with pev2. Try to pass a text query plan, not JSON,
as text has less issues with escaping double quotes.
"""
s = """
var el = document.createElement('div');
element.append(el);
var v = new Vue({
el: el,
components: {
'pev2': window.pev2
},
data: function() {
return {
plan: `""" + plan + """`,
query: `""" + query + """`,
}
},
template: "<pev2 :plan-source='plan' :plan-query='query'></pev2>"
});
"""
return Javascript(s)
Query 1a: UNNESTing the array without index¶
We start with this query that uses UNNEST()
on the array of values, to get separate rows for each value, then do some GROUP BY
tricks to get the final outcome. The result is an aggregated timeseries with a summed value for every timestep.
This is just to show how expensive a sort step can be, soon we will add some indexes.
def query_array_unnest(table="actuals_array", explain=True):
with conn.cursor() as curs:
query = f"""
{'EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)' if explain else ''}
-- Put aggregated load result in array per day. max/min for unique assets
-- does not matter, we just need to pick one as the same value
-- has been distributed over the rows
SELECT
day,
MAX(count_asset) as count_asset,
array_agg(s ORDER BY timeblock) as total_value
FROM (
SELECT
day,
timeblock,
SUM(elem) as s,
COUNT(*) as count_asset
FROM (
-- Select only latest rows
SELECT DISTINCT ON (asset_id, day)
day,
values
FROM {table}
-- Sort descending by reference_datetime so it picks latest distinct value,
-- choosing a value over NULL, and the highest ID if there are ties
ORDER BY asset_id, day, created DESC NULLS LAST
) AS t, LATERAL UNNEST(t.values) WITH ORDINALITY x(elem, timeblock)
GROUP BY day, timeblock
) as a
GROUP BY day;
"""
curs.execute(query)
if explain:
plan = "\n".join([l[0] for l in curs.fetchall()])
else:
plan = curs.fetchall()
return query, plan
pev2(*query_array_unnest())
Creating indexes¶
The queries all sort by asset_id, day, created DESC NULLS LAST
. To speed this up, we create several indexes so that Postgres can do an INDEX SCAN instead of a SORT, which is much faster.
We also ANALYZE the tables to give the query planner some up-to-date statistics.
with conn.cursor() as curs:
def create_idx(table):
curs.execute(f"CREATE INDEX IF NOT EXISTS {table}_asset_id_day_created "
f"ON {table} (asset_id, day, created DESC NULLS LAST); "
f"ANALYZE {table};")
create_idx("actuals_array_double")
create_idx("actuals_array")
create_idx("actuals_cols")
Query 1: UNNESTing with index¶
Now that we have relevant indexes, we can eliminate the very expensive SORT step.
pev2(*query_array_unnest())
Query 2: Array subscripting¶
Our first idea did work, but the query time is very high, even when we created a relevant index.
My hunch is that UNNEST()
creates so many rows (about two orders of magnitude more) that the aggregate step is much slower. Let's try to subscript the array to go directly to many columns, instead of many rows.
def query_array_subscript(table='actuals_array'):
with conn.cursor() as curs:
# Maximum amount of columns
n_cols = 100
array_index_str = ", ".join(f'values[{i+1}] as t{i:02d}' for i in range(n_cols))
array_sum_str = ", ".join(f"SUM(t{i:02d})" for i in range(n_cols))
query = f"""
EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)
SELECT
day,
COUNT(*) as count_asset,
{array_sum_str}
FROM (
-- Select only latest rows
SELECT DISTINCT ON (asset_id, day)
day,
{array_index_str}
FROM {table}
ORDER BY asset_id, day, created DESC NULLS LAST
) AS t
GROUP BY day;
"""
curs.execute(query)
plan = "\n".join([l[0] for l in curs.fetchall()])
return query, plan
pev2(*query_array_subscript())
Query 3: Pre-made columns¶
That went a lot better! 3x speedup, that's not bad. Now we have to try the obvious next step: pre-create these columns and query that.
with conn.cursor() as curs:
table = 'actuals_cols'
n_cols = 100
col_str = ", ".join(f'pte{i:02d}' for i in range(n_cols))
array_sum_str = ", ".join(f"SUM(pte{i:02d})" for i in range(n_cols))
query = f"""
EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS)
SELECT
day,
COUNT(*) as count_asset,
{array_sum_str}
FROM (
-- Select only latest rows
SELECT DISTINCT ON (asset_id, day)
day,
{col_str}
FROM {table}
ORDER BY asset_id, day, created DESC NULLS LAST
) AS t
GROUP BY day;
"""
curs.execute(query)
plan = "\n".join([l[0] for l in curs.fetchall()])
pev2(query, plan)
Woah! That went even better, 4x speedup (12x in total since our first try). That is pretty amazing. This format is more cumbersome to work with, though, than the nice arrays. However, with some code generation in Python it's not that bad. We do have to accomodate the maximum number of columns, which is 100 for 15-minute periods in localtime (usually 96 periods but once a year 92 and once 100 due to summertime).
Bonus query: DOUBLE PRECISION¶
I'm interested to see what the difference is between querying the double and single precision numeric types, so I'll re-execute the array subscripting-based query from before and see if it makes a big difference.
pev2(*query_array_subscript('actuals_array_double'))
Interesting: very little difference, while we read much more data (14 vs 8GB). Apparently reading the actual data is not the problem, the bottleneck seems to be looking through the index.
Visualizing the query result¶
We've been doing the same query over and over, so now let's see what the data actually looks like. (Just the last 2 days, to avoid a huge plot.)
query, result = query_array_unnest(explain=False)
dt = []
val = []
for date, n_assets, values in result[:2]:
for i, v in enumerate(values):
dt.append(datetime.combine(date, time()) + timedelta(minutes=i*15))
val.append(v)
fig = px.line(x=dt, y=val, title="Last two days of query result")
# Need to include it like this because plotly does not include its javascript by default any more
display(HTML(fig.to_html("plot.html", include_plotlyjs='cdn')))
Scores¶
So to conclude, here are the scores (lowest seen over a few runs):
Query | Approx. duration |
---|---|
1a) UNNEST w/o index | 4.5min |
1b) UNNEST with index | 3.5min |
2a) Subscript | 1min |
2b) Double-precision | 1min |
3) Pre-made columns | 20s |
Lessons learned:
- Subqueries are ok but CTEs are materialized and thus can be a barrier to optimizations like predicate push down (edited the first query after learning this)
- UNNEST() used on large arrays creates lots of rows which might slow down subsequent aggregation steps
- Creating columns by subscripting an array directly is faster for aggregations with large arrays (and reasonably constant array size)
- Some postprocessing is needed with all queries except query 1 (which makes no assumptions about the number of columns)
- Columns win over arrays for aggregations
Conclusion¶
It was quite cool to dive so deep into Postgres datastructures and indexes, and benchmarking various methods with representative data. Further optimization steps could be to materialize the result (although the GROUP BY is then not customizable any more) or cache it somewhere else. Also TimescaleDB looks interesting, although it is very realtime-timeseries optimized, meaning that it assumes timeseries are always append-only while this usecase might have random INSERTs for data in the past. Other NoSQL databases might also be interesting to try, although they will require much more thought on row keys, sorting and the type of queries that need to be performed.