Playing with Postgres NOTIFY/LISTEN using Python asyncio and psycopg2

Everything is easier when a (web) application can run in just one thread. No need to communicate with other threads, no need to keep data in sync, you can do whatever and get away with it. Unfortunately, today's systems need to be distributed to be fault-tolerant, and (auto)scale to multiple threads/processes to handle higher workloads. Depending on the architecture this can bring additional requirements, like some form of inter-process communication.

An example of this is a classic REST API. In one thread it's fine, and it can scale to many processes easily because a REST API is naturally stateless; it does not matter which instance receives the request, it will be handled the same way. When we introduce a kind of streaming pub/sub updates like Server-Sent Events (SSE) or WebSockets however, this abstraction suddenly breaks, as the connection persists to only one instance and there is no way to communicate between instances, for example to send a chat message from user A to user B if they are both connected to different instances of the same app.

One way to solve this is by placing a GRIP proxy in front of it, that can terminate the SSE/WebSocket connection. Relatively simple and our abstraction works again. Another way to solve this is to stay with terminating the streaming connection in the web application instance, but have a backend pub/sub system like Redis to communicate between instances.

In place of Redis we can use Postgres, as it also contains NOTIFY/LISTEN (pub/sub) functionality. This saves us from having to set up yet another service with credentials, secure it properly etc.

In this article I'll explore how to use it, and how we can use it from Python asyncio. In a future post I hope to implement this for Django REST Framework to enable easy streaming create/update/delete notifications.

You can run this notebook in Colab! Simply download here and upload to Colab.

This is a great time for it as Django has native asyncio support since its 3.0 release. (It must be said that ASGI support has been present in Django-Channels for quite some time, but now it's coming to core Django, at some point allowing use of only the ASGI stack instead of ASGI/WSGI side-by-side.)

Let's start with a basic demonstration of NOTIFY/LISTEN using only the psql commandline tool and some Python subprocess trickery to keep the tool running in the background.

In [17]:
# Install Postgres
!apt-get install postgresql -yqq
!psql --version
psql (PostgreSQL) 10.10 (Ubuntu 10.10-0ubuntu0.18.04.1)
In [18]:
# It needs to run in the background
!service postgresql start
 * Starting PostgreSQL 10 database server
   ...done.
In [0]:
import os, sys, subprocess, threading, time

# Start a subprocess that runs psql. Pipe in/out/err so we can talk to the process,
# set bufsize to 0 so we don't need to .flush(), and set encoding to text to avoid having to .encode()
p = subprocess.Popen(
    ["sudo", "su", "-", "postgres", "-c", "psql"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    bufsize=0,
    universal_newlines=True,
)
In [20]:
# Start listening
p.stdin.write("LISTEN testchannel;" + os.linesep)
# Notify
!sudo su - postgres -c "psql -c 'NOTIFY testchannel;'"
NOTIFY
In [21]:
# Need to send ';' to psql, it does not receive notifications "live", not sure why not
# .communicate() will close stdin and collect output (process exits)
for line in p.communicate(";"):
    print(line)
LISTEN
Asynchronous notification "testchannel" received from server process with PID 6744.


LISTEN/NOTIFY directly from Python with psycopg2

Cool! We just used LISTEN/NOTIFY to communicate between two different processes. It would be nice if we can use this directly from Python in a non-blocking way, using the default Postgres Python library psycopg2. See the async notification part of its docs to learn more.

In [22]:
# Need to create a new user and give it access to the default database or just make a new database, like we do here
!sudo su - postgres -c "psql -c \"CREATE USER nlt WITH PASSWORD 'abc'\" && psql -c \"CREATE DATABASE notify_listen_test OWNER nlt\""
CREATE ROLE
CREATE DATABASE
In [0]:
import select
import psycopg2
import psycopg2.extensions

conn = psycopg2.connect(user="nlt", password="abc", database="notify_listen_test", host="localhost")
# Can only receive notifications when not in transaction, set this for easier usage
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
In [0]:
# Start listening
curs = conn.cursor()
curs.execute("LISTEN testchannel;")
In [29]:
# Notify using psql, note that we need to specify the correct database
!sudo su - postgres -c "psql notify_listen_test -c 'NOTIFY testchannel;'"
NOTIFY
In [30]:
# This is a blocking call that will return if and when any file descriptor in the list has new data
# We see that there is fresh information on our connection, better poll and read!
select.select([conn],[],[],1)
Out[30]:
([<connection object at 0x7f4b4f21ad58; dsn: 'user=nlt password=xxx dbname=notify_listen_test host=localhost', closed: 0>],
 [],
 [])
In [31]:
# Needed to get the actual message
conn.poll()
# Pop notification from list
conn.notifies.pop(0)
Out[31]:
Notify(6814, 'testchannel', '')

Throwing asyncio in the loop

In a real situation, we will want to run this in an asyncio event loop, getting updates in a non-blocking way. In combination with Django asyncio support, this will allow us to keep an HTTP or WebSocket connection open and send new data to the relevant consumers when we get notified.

In [0]:
import asyncio

q = asyncio.Queue()

def listen_callback():
    # We have executed 'LISTEN testchannel;' in a cursor on this connection before
    conn.poll()
    # .put() is a coroutine while .put_nowait() is a non-blocking synchronous version
    q.put_nowait(conn.notifies.pop(0))

async def print_queue():
    while 1:
        data = await q.get()
        # None is the quit signal
        if data is None:
            return
        else:
            # In a real application we'd do something interesting here (like send to all subscribers),
            # or listen to this queue while keeping a connection open
            print("Notification received: ", data)

def stop_queue():
    print("Stopping queue...")
    q.put_nowait(None)
In [0]:
# We get the event loop
loop = asyncio.get_event_loop()
# This is a special method to watch file descriptors, the async equivalent of select.select()
loop.add_reader(conn, listen_callback)
In [34]:
# Notify using psql
!sudo su - postgres -c "psql notify_listen_test -c 'NOTIFY testchannel;'"
# Give it some payload
!sudo su - postgres -c $'psql notify_listen_test -c "NOTIFY testchannel, \'payload\';"'
NOTIFY
NOTIFY
In [35]:
# We schedule the queue None stop signal to be scheduled on the event loop in 1 second
loop.call_later(1, stop_queue)
# We run the print_queue() function until it finds the stop signal None on its queue
loop.run_until_complete(print_queue())
Notification received:  Notify(6833, 'testchannel', '')
Notification received:  Notify(6849, 'testchannel', 'payload')
Stopping queue...
In [36]:
# Clean up: remove user and database
conn.close()
!sudo su - postgres -c "psql -c 'DROP DATABASE notify_listen_test' && psql -c 'DROP USER nlt'"
DROP DATABASE
DROP ROLE

Pretty cool! We have managed to run an asyncio event loop that handles all incoming notifies in a non-blocking way.

In theory we could use exactly this to build on the ASGI Django stack and keep an incoming HTTP connection open (SSE) while listening to notifications and sending them to clients.

Using Postgres has the advantage of less complexity, but another pub/sub backend like Redis could easily be used instead. We could also add triggers to the database table to emit NOTIFY on changes, but it might be easier to just do it by hooking into CRUD operations or signals of the Django Model or DRF Serializer.