Graceful exit with Python multiprocessing
I often use the Process/ThreadPoolExecutor from the concurrent.futures standard library module to parallelize workloads,
but have trouble exiting gracefully as the default behavior is to finish all pending futures (either using as_completed
or during exit of the with
block).
I want to be able to:
- Cancel pending jobs
- Not do the work to submit all tasks at once (this might involve some API calls or whatever for 1M files that I'd rather not do all at once)
- Cancel running jobs if necessary
- Implement more sophisticated retry handling if desired
To do that, I'll build a simple demonstration that exhibits these behaviors.
It must be said that a new feature has landed in Python 3.9 that allows to cancel all pending futures on exit. However, that's only the first item on my list, the rest still need to be addressed so we'll happily continue making our own Executor.
Processes or threads?¶
Most of the work I do involves not only I/O but a good amount of compute so I'll use processes to avoid the GIL. The overhead of starting processes is negligible as these scripts usually run for minutes, not milliseconds.
Stop signals and k8s: 3 ways to handle SIGTERM¶
When running in a shell, one will tend to stop the main process and all its worker processes/threads by doing CTRL-C which sends a SIGINT to the main process. This is subsequently caught by Python after which a KeyboardInterrupt
is raised.
If you want your application to run in Kubernetes (or other systems that communicate with your process using signals), you'll need to catch SIGTERM, which is k8s' way to say "please shut down gracefully". If the process is still running after the "grace period" (30 seconds by default), it is killed the hard way with SIGKILL (which cannot be caught). An easy way to handle this is to just catch SIGTERM and raise a KeyboardInterrupt
, so that it has the same effect as a SIGINT.
import signal
def handler(signal_received, frame):
raise KeyboardInterrupt("SIGTERM received")
signal.signal(signal.SIGTERM, handler)
Another option that has the same result is to set STOPSIGNAL in the Dockerfile, which is respected by k8s.
FROM python:3.8
STOPSIGNAL SIGINT
COPY main.py /usr/src/app/main.py
WORKDIR /usr/src/app
CMD ["python", "-u", "main.py"]
A third way is to catch SIGTERM/SIGINT and set some flag (Event) instead of raising KeyboardInterrupt
. This method might provide for a more graceful shutdown in some cases.
import signal
panic = True
def handler(signal_received, frame):
global panic
panic = True
signal.signal(signal.SIGTERM, handler)
# You might want to handle ctrl-c the same way (this avoids KeyboardInterrupt being raised)
signal.signal(signal.SIGINT, handler)
for task in tasks:
if panic:
break
do_task()
Implementation of a robust worker process pool¶
Let's have a look at how we can combine all these features. Please note that this is made for linux, I'm not sure if these specific signals would work on windows.
import os, signal, logging, sys
from time import sleep, time
from random import random, randint
from multiprocessing import JoinableQueue, Event, Process
from queue import Empty
from typing import Optional
logger = logging.getLogger(__name__)
# Make sure to log process/thread name
logging.basicConfig(level=logging.INFO, format="%(processName)s %(relativeCreated)d %(message)s")
print("Python version:", sys.version.split(" ")[0])
def do_task(sleep_sec: int) -> int:
"Dummy task"
sleep(sleep_sec)
logger.info("Slept for %.3fs", sleep_sec)
return randint(0, 10)
def worker(q: JoinableQueue, stop_event: Event):
logger.info("Starting worker...")
while 1:
if stop_event.is_set():
logger.info("Worker exiting because of stop_event")
break
# We set a timeout so we loop past "stop_event" even if the queue is empty
try:
args = q.get(timeout=.01)
except Empty:
# Run next iteration of loop
continue
# Exit if end of queue
if args is None:
logger.info("Worker exiting because of None on queue")
q.task_done()
break
# Do the task
try:
do_task(*args)
# Will also catch KeyboardInterrupt
except:
logger.exception("Failed to process args %s", args)
# Can implement some kind of retry handling here
finally:
q.task_done()
def main(
n_workers: int = 2,
n_tasks: int = 10,
max_queue_size: int = 1,
grace_period: int = 2,
kill_period: int = 30,
interrupt: Optional[int] = None
) -> None:
"""
Run a process pool of workers.
Args:
n_workers: Start this many processes
n_tasks: Launch this many tasks
max_queue_size: If queue exceeds this size, block when putting items on the queue
grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM
kill_period: Send SIGKILL to processes if they don't exit after this many seconds
interrupt: If given, send signal SIGTERM to itself after queueing this many tasks
"""
# The JoinableQueue has an internal counter that increments when an item is put on the queue and
# decrements when q.task_done() is called. This allows us to wait until it's empty using .join()
q = JoinableQueue(maxsize=max_queue_size)
# This is a process-safe version of the "panic" variable shown above
stop_event = Event()
def handler(signalname):
"""
Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.
Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.
"""
def f(signal_received, frame):
raise KeyboardInterrupt(f"{signalname} received")
return f
# This will be inherited by the child process if it is forked (not spawned)
signal.signal(signal.SIGINT, handler("SIGINT"))
signal.signal(signal.SIGTERM, handler("SIGTERM"))
procs = []
for i in range(n_workers):
# Make it a daemon process so it is definitely terminated when this process exits,
# might be overkill but is a nice feature. See
# https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.daemon
p = Process(name=f"Worker-{i:02d}", daemon=True, target=worker, args=(q, stop_event))
procs.append(p)
p.start()
try:
# Put tasks on queue
for i_task in range(n_tasks):
# For demonstration purposes
if interrupt and i_task == interrupt:
os.kill(os.getpid(), signal.SIGTERM)
s = random()
logger.info("Put %.3f on queue", s)
q.put([s])
# Put exit tasks on queue
for i in range(n_workers):
q.put(None)
# Wait until all tasks are processed
q.join()
except KeyboardInterrupt:
logger.warning("Caught KeyboardInterrupt! Setting stop event...")
finally:
stop_event.set()
t = time()
# Send SIGINT if process doesn't exit quickly enough, and kill it as last resort
# .is_alive() also implicitly joins the process (good practice in linux)
while alive_procs := [p for p in procs if p.is_alive()]:
if time() > t + grace_period:
for p in alive_procs:
os.kill(p.pid, signal.SIGINT)
logger.warning("Sending SIGINT to %s", p)
elif time() > t + kill_period:
for p in alive_procs:
logger.warning("Sending SIGKILL to %s", p)
# Queues and other inter-process communication primitives can break when
# process is killed, but we don't care here
p.kill()
sleep(.01)
sleep(.1)
for p in procs:
logger.info("Process status: %s", p)
main()
Okay! That works all right. We have "backpressure" when putting things on the queue, we shut down the workers by putting None
on the queue, and we correctly wait for the workers to finish.
We also stop the workers in between tasks if possible:
main(interrupt=4)
And if it takes too long for the worker to exit we send SIGINT, which results in the worker getting a KeyboardInterrupt
in the middle of processing the task, after which it iterates through the loop again where it encounters the stop_signal
. If it's not ok to crash the task like this, we could rely solely on the stop_event
. (This is just one of many variations possible.)
main(interrupt=3, grace_period=.01)
One thing to note in the logs above is that the signal handlers of the parent process were inherited by the child processes because they were forked ("cloned") and not spawned. In our case this is fine, in other situations you might want to attach new handlers.
If the process does not exit after setting the stop_event
and sending SIGINT, we send SIGKILL as seen below.
main(interrupt=3, kill_period=.01)
Conclusion¶
Implementing parallelism is tricky because you're working not just with Python but also with OS primitives like threads and processes. We've explored some of the functionality available in Python's standard library to build a robust worker pool that is infinitely customizable. One easy tweak would be to send back results from worker to main process (using a queue). Another useful endeavor might be to make a QueueingProcessPoolExecutor
that has the same API as other Executors.
Useful articles for me to read were "Things I Wish They Told Me About Multiprocessing in Python", multiprocessing programming guidelines and multiprocessing examples.