Graceful exit with Python multiprocessing

I often use the Process/ThreadPoolExecutor from the concurrent.futures standard library module to parallelize workloads, but have trouble exiting gracefully as the default behavior is to finish all pending futures (either using as_completed or during exit of the with block).

I want to be able to:

  • Cancel pending jobs
  • Not do the work to submit all tasks at once (this might involve some API calls or whatever for 1M files that I'd rather not do all at once)
  • Cancel running jobs if necessary
  • Implement more sophisticated retry handling if desired

To do that, I'll build a simple demonstration that exhibits these behaviors.

It must be said that a new feature has landed in Python 3.9 that allows to cancel all pending futures on exit. However, that's only the first item on my list, the rest still need to be addressed so we'll happily continue making our own Executor.

Processes or threads?

Most of the work I do involves not only I/O but a good amount of compute so I'll use processes to avoid the GIL. The overhead of starting processes is negligible as these scripts usually run for minutes, not milliseconds.

Stop signals and k8s: 3 ways to handle SIGTERM

When running in a shell, one will tend to stop the main process and all its worker processes/threads by doing CTRL-C which sends a SIGINT to the main process. This is subsequently caught by Python after which a KeyboardInterrupt is raised.

If you want your application to run in Kubernetes (or other systems that communicate with your process using signals), you'll need to catch SIGTERM, which is k8s' way to say "please shut down gracefully". If the process is still running after the "grace period" (30 seconds by default), it is killed the hard way with SIGKILL (which cannot be caught). An easy way to handle this is to just catch SIGTERM and raise a KeyboardInterrupt, so that it has the same effect as a SIGINT.

import signal

def handler(signal_received, frame):
    raise KeyboardInterrupt("SIGTERM received")

signal.signal(signal.SIGTERM, handler)

Another option that has the same result is to set STOPSIGNAL in the Dockerfile, which is respected by k8s.

FROM python:3.8

STOPSIGNAL SIGINT

COPY main.py /usr/src/app/main.py

WORKDIR /usr/src/app

CMD ["python", "-u", "main.py"]

A third way is to catch SIGTERM/SIGINT and set some flag (Event) instead of raising KeyboardInterrupt. This method might provide for a more graceful shutdown in some cases.

import signal

panic = True

def handler(signal_received, frame):
    global panic
    panic = True

signal.signal(signal.SIGTERM, handler)
# You might want to handle ctrl-c the same way (this avoids KeyboardInterrupt being raised)
signal.signal(signal.SIGINT, handler)

for task in tasks:
    if panic:
        break
    do_task()

Implementation of a robust worker process pool

Let's have a look at how we can combine all these features. Please note that this is made for linux, I'm not sure if these specific signals would work on windows.

In [ ]:
import os, signal, logging, sys
from time import sleep, time
from random import random, randint
from multiprocessing import JoinableQueue, Event, Process
from queue import Empty
from typing import Optional

logger = logging.getLogger(__name__)

# Make sure to log process/thread name
logging.basicConfig(level=logging.INFO, format="%(processName)s %(relativeCreated)d %(message)s")

print("Python version:", sys.version.split(" ")[0])
In [ ]:
def do_task(sleep_sec: int) -> int:
    "Dummy task"
    sleep(sleep_sec)
    logger.info("Slept for %.3fs", sleep_sec)
    return randint(0, 10)


def worker(q: JoinableQueue, stop_event: Event):
    logger.info("Starting worker...")
    while 1:
        if stop_event.is_set():
            logger.info("Worker exiting because of stop_event")
            break
        # We set a timeout so we loop past "stop_event" even if the queue is empty
        try:
            args = q.get(timeout=.01)
        except Empty:
            # Run next iteration of loop
            continue
        
        # Exit if end of queue
        if args is None:
            logger.info("Worker exiting because of None on queue")
            q.task_done()
            break
        
        # Do the task
        try:
            do_task(*args)
        # Will also catch KeyboardInterrupt
        except:
            logger.exception("Failed to process args %s", args)
            # Can implement some kind of retry handling here
        finally:
            q.task_done()

def main(
        n_workers: int = 2,
        n_tasks: int = 10,
        max_queue_size: int = 1,
        grace_period: int = 2,
        kill_period: int = 30,
        interrupt: Optional[int] = None
    ) -> None:
    """
    Run a process pool of workers.
    
    Args:
        n_workers: Start this many processes
        n_tasks: Launch this many tasks
        max_queue_size: If queue exceeds this size, block when putting items on the queue
        grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM
        kill_period: Send SIGKILL to processes if they don't exit after this many seconds
        interrupt: If given, send signal SIGTERM to itself after queueing this many tasks
    """
    # The JoinableQueue has an internal counter that increments when an item is put on the queue and
    # decrements when q.task_done() is called. This allows us to wait until it's empty using .join()
    q = JoinableQueue(maxsize=max_queue_size)
    # This is a process-safe version of the "panic" variable shown above
    stop_event = Event()
    
    def handler(signalname):
        """
        Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.
        Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.
        """
        def f(signal_received, frame):
            raise KeyboardInterrupt(f"{signalname} received")
        return f

    # This will be inherited by the child process if it is forked (not spawned)
    signal.signal(signal.SIGINT, handler("SIGINT"))
    signal.signal(signal.SIGTERM, handler("SIGTERM"))
    
    procs = []

    for i in range(n_workers):
        # Make it a daemon process so it is definitely terminated when this process exits,
        # might be overkill but is a nice feature. See
        # https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.daemon
        p = Process(name=f"Worker-{i:02d}", daemon=True, target=worker, args=(q, stop_event))
        procs.append(p)
        p.start()
    try:
        # Put tasks on queue
        for i_task in range(n_tasks):
            # For demonstration purposes
            if interrupt and i_task == interrupt:
                os.kill(os.getpid(), signal.SIGTERM)
            s = random()
            logger.info("Put %.3f on queue", s)
            q.put([s])
        # Put exit tasks on queue
        for i in range(n_workers):
            q.put(None)
        # Wait until all tasks are processed
        q.join()
    except KeyboardInterrupt:
        logger.warning("Caught KeyboardInterrupt! Setting stop event...")
    finally:
        stop_event.set()
        t = time()
        # Send SIGINT if process doesn't exit quickly enough, and kill it as last resort
        # .is_alive() also implicitly joins the process (good practice in linux)
        while alive_procs := [p for p in procs if p.is_alive()]:
            if time() > t + grace_period:
                for p in alive_procs:
                    os.kill(p.pid, signal.SIGINT)
                    logger.warning("Sending SIGINT to %s", p)
            elif time() > t + kill_period:
                for p in alive_procs:
                    logger.warning("Sending SIGKILL to %s", p)
                    # Queues and other inter-process communication primitives can break when
                    # process is killed, but we don't care here
                    p.kill()
            sleep(.01)

        sleep(.1)
        for p in procs:
            logger.info("Process status: %s", p)
In [3]:
main()
Worker-00 425 Starting worker...
MainProcess 426 Put 0.006 on queue
Worker-01 428 Starting worker...
MainProcess 430 Put 0.717 on queue
MainProcess 431 Put 0.953 on queue
MainProcess 432 Put 0.306 on queue
Worker-00 437 Slept for 0.006s
MainProcess 439 Put 0.101 on queue
Worker-01 1149 Slept for 0.717s
MainProcess 1153 Put 0.654 on queue
Worker-00 1393 Slept for 0.953s
MainProcess 1397 Put 0.456 on queue
Worker-01 1459 Slept for 0.306s
MainProcess 1463 Put 0.566 on queue
Worker-00 1497 Slept for 0.101s
MainProcess 1500 Put 0.061 on queue
Worker-00 1957 Slept for 0.456s
MainProcess 1961 Put 0.721 on queue
Worker-01 2117 Slept for 0.654s
Worker-01 2182 Slept for 0.061s
Worker-00 2527 Slept for 0.566s
Worker-00 2531 Worker exiting because of None on queue
Worker-01 2906 Slept for 0.721s
Worker-01 2910 Worker exiting because of None on queue
MainProcess 3024 Process status: <Process name='Worker-00' pid=22560 parent=22549 stopped exitcode=0 daemon>
MainProcess 3025 Process status: <Process name='Worker-01' pid=22561 parent=22549 stopped exitcode=0 daemon>

Okay! That works all right. We have "backpressure" when putting things on the queue, we shut down the workers by putting None on the queue, and we correctly wait for the workers to finish.

We also stop the workers in between tasks if possible:

In [4]:
main(interrupt=4)
MainProcess 3046 Put 0.706 on queue
MainProcess 3051 Put 0.265 on queue
Worker-01 3049 Starting worker...
MainProcess 3053 Put 0.632 on queue
Worker-00 3045 Starting worker...
MainProcess 3073 Put 0.558 on queue
Worker-00 3338 Slept for 0.265s
MainProcess 3342 Caught KeyboardInterrupt! Setting stop event...
Worker-01 3760 Slept for 0.706s
Worker-01 3763 Worker exiting because of stop_event
Worker-00 3975 Slept for 0.632s
Worker-00 3978 Worker exiting because of stop_event
MainProcess 4088 Process status: <Process name='Worker-00' pid=22592 parent=22549 stopped exitcode=0 daemon>
MainProcess 4090 Process status: <Process name='Worker-01' pid=22593 parent=22549 stopped exitcode=0 daemon>

And if it takes too long for the worker to exit we send SIGINT, which results in the worker getting a KeyboardInterrupt in the middle of processing the task, after which it iterates through the loop again where it encounters the stop_signal. If it's not ok to crash the task like this, we could rely solely on the stop_event. (This is just one of many variations possible.)

In [5]:
main(interrupt=3, grace_period=.01)
Worker-00 4124 Starting worker...
MainProcess 4127 Put 0.564 on queue
MainProcess 4131 Put 0.253 on queue
Worker-01 4129 Starting worker...
MainProcess 4133 Put 0.672 on queue
MainProcess 4134 Caught KeyboardInterrupt! Setting stop event...
MainProcess 4145 Sending SIGINT to <Process name='Worker-00' pid=22610 parent=22549 started daemon>
MainProcess 4145 Sending SIGINT to <Process name='Worker-01' pid=22611 parent=22549 started daemon>
Worker-00 4145 Failed to process args [0.5635664781555114]
Traceback (most recent call last):
  File "<ipython-input-2-b6e88e5d97d3>", line 29, in worker
    do_task(*args)
  File "<ipython-input-2-b6e88e5d97d3>", line 3, in do_task
    sleep(sleep_sec)
  File "<ipython-input-2-b6e88e5d97d3>", line 68, in f
    raise KeyboardInterrupt(f"{signalname} received")
KeyboardInterrupt: SIGINT received
Worker-01 4145 Failed to process args [0.2526945976750854]
Traceback (most recent call last):
  File "<ipython-input-2-b6e88e5d97d3>", line 29, in worker
    do_task(*args)
  File "<ipython-input-2-b6e88e5d97d3>", line 3, in do_task
    sleep(sleep_sec)
  File "<ipython-input-2-b6e88e5d97d3>", line 68, in f
    raise KeyboardInterrupt(f"{signalname} received")
KeyboardInterrupt: SIGINT received
Worker-00 4147 Worker exiting because of stop_event
Worker-01 4147 Worker exiting because of stop_event
MainProcess 4256 Process status: <Process name='Worker-00' pid=22610 parent=22549 stopped exitcode=0 daemon>
MainProcess 4257 Process status: <Process name='Worker-01' pid=22611 parent=22549 stopped exitcode=0 daemon>

One thing to note in the logs above is that the signal handlers of the parent process were inherited by the child processes because they were forked ("cloned") and not spawned. In our case this is fine, in other situations you might want to attach new handlers.

If the process does not exit after setting the stop_event and sending SIGINT, we send SIGKILL as seen below.

In [6]:
main(interrupt=3, kill_period=.01)
MainProcess 4292 Put 0.080 on queue
MainProcess 4296 Put 0.854 on queue
Worker-01 4295 Starting worker...
MainProcess 4298 Put 0.520 on queue
Worker-00 4289 Starting worker...
MainProcess 4315 Caught KeyboardInterrupt! Setting stop event...
MainProcess 4326 Sending SIGKILL to <Process name='Worker-00' pid=22625 parent=22549 started daemon>
MainProcess 4327 Sending SIGKILL to <Process name='Worker-01' pid=22626 parent=22549 started daemon>
MainProcess 4439 Process status: <Process name='Worker-00' pid=22625 parent=22549 stopped exitcode=-SIGKILL daemon>
MainProcess 4440 Process status: <Process name='Worker-01' pid=22626 parent=22549 stopped exitcode=-SIGKILL daemon>

Conclusion

Implementing parallelism is tricky because you're working not just with Python but also with OS primitives like threads and processes. We've explored some of the functionality available in Python's standard library to build a robust worker pool that is infinitely customizable. One easy tweak would be to send back results from worker to main process (using a queue). Another useful endeavor might be to make a QueueingProcessPoolExecutor that has the same API as other Executors.

Useful articles for me to read were "Things I Wish They Told Me About Multiprocessing in Python", multiprocessing programming guidelines and multiprocessing examples.