{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "I often use the Process/ThreadPoolExecutor from the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) standard library module to parallelize workloads,\n", "but have trouble exiting gracefully as the default behavior is to finish all pending futures (either using `as_completed` or during exit of the `with` block).\n", "\n", "I want to be able to:\n", "\n", "- Cancel pending jobs\n", "- Not do the work to submit all tasks at once (this might involve some API calls or whatever for 1M files that I'd rather not do all at once)\n", "- Cancel running jobs if necessary\n", "- Implement more sophisticated retry handling if desired\n", "\n", "To do that, I'll build a simple demonstration that exhibits these behaviors.\n", "\n", "\n", "\n", "It must be said that a new feature has landed in Python 3.9 that allows to [cancel all pending futures on exit](https://docs.python.org/3.9/whatsnew/3.9.html#concurrent-futures). However, that's only the first item on my list, the rest still need to be addressed so we'll happily continue making our own Executor.\n", "\n", "## Processes or threads?\n", "Most of the work I do involves not only I/O but a good amount of compute so I'll use processes to avoid the GIL. The overhead of starting processes is negligible as these scripts usually run for minutes, not milliseconds.\n", "\n", "## Stop signals and k8s: 3 ways to handle SIGTERM\n", "When running in a shell, one will tend to stop the main process and all its worker processes/threads by doing CTRL-C which sends a SIGINT to the main process. This is subsequently caught by Python after which a `KeyboardInterrupt` is raised.\n", "\n", "If you want your application to run in Kubernetes (or other systems that communicate with your process using signals), you'll need to catch SIGTERM, which is k8s' way to say \"please shut down gracefully\". If the process is still running after the \"grace period\" (30 seconds by default), it is killed the hard way with SIGKILL (which cannot be caught). An easy way to handle this is to just catch SIGTERM and raise a `KeyboardInterrupt`, so that it has the same effect as a SIGINT.\n", "\n", "```python\n", "import signal\n", "\n", "def handler(signal_received, frame):\n", " raise KeyboardInterrupt(\"SIGTERM received\")\n", "\n", "signal.signal(signal.SIGTERM, handler)\n", "```\n", "\n", "Another option that has the same result is to set STOPSIGNAL in the Dockerfile, which is respected by k8s.\n", "\n", "```dockerfile\n", "FROM python:3.8\n", "\n", "STOPSIGNAL SIGINT\n", "\n", "COPY main.py /usr/src/app/main.py\n", "\n", "WORKDIR /usr/src/app\n", "\n", "CMD [\"python\", \"-u\", \"main.py\"]\n", "```\n", "\n", "A third way is to catch SIGTERM/SIGINT and set some flag (Event) instead of raising `KeyboardInterrupt`. This method might provide for a more graceful shutdown in some cases.\n", "\n", "```python\n", "import signal\n", "\n", "panic = True\n", "\n", "def handler(signal_received, frame):\n", " global panic\n", " panic = True\n", "\n", "signal.signal(signal.SIGTERM, handler)\n", "# You might want to handle ctrl-c the same way (this avoids KeyboardInterrupt being raised)\n", "signal.signal(signal.SIGINT, handler)\n", "\n", "for task in tasks:\n", " if panic:\n", " break\n", " do_task()\n", "```\n", "\n", "## Implementation of a robust worker process pool\n", "\n", "Let's have a look at how we can combine all these features. Please note that this is made for linux, I'm not sure if these specific signals would work on windows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os, signal, logging, sys\n", "from time import sleep, time\n", "from random import random, randint\n", "from multiprocessing import JoinableQueue, Event, Process\n", "from queue import Empty\n", "from typing import Optional\n", "\n", "logger = logging.getLogger(__name__)\n", "\n", "# Make sure to log process/thread name\n", "logging.basicConfig(level=logging.INFO, format=\"%(processName)s %(relativeCreated)d %(message)s\")\n", "\n", "print(\"Python version:\", sys.version.split(\" \")[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def do_task(sleep_sec: int) -> int:\n", " \"Dummy task\"\n", " sleep(sleep_sec)\n", " logger.info(\"Slept for %.3fs\", sleep_sec)\n", " return randint(0, 10)\n", "\n", "\n", "def worker(q: JoinableQueue, stop_event: Event):\n", " logger.info(\"Starting worker...\")\n", " while 1:\n", " if stop_event.is_set():\n", " logger.info(\"Worker exiting because of stop_event\")\n", " break\n", " # We set a timeout so we loop past \"stop_event\" even if the queue is empty\n", " try:\n", " args = q.get(timeout=.01)\n", " except Empty:\n", " # Run next iteration of loop\n", " continue\n", " \n", " # Exit if end of queue\n", " if args is None:\n", " logger.info(\"Worker exiting because of None on queue\")\n", " q.task_done()\n", " break\n", " \n", " # Do the task\n", " try:\n", " do_task(*args)\n", " # Will also catch KeyboardInterrupt\n", " except:\n", " logger.exception(\"Failed to process args %s\", args)\n", " # Can implement some kind of retry handling here\n", " finally:\n", " q.task_done()\n", "\n", "def main(\n", " n_workers: int = 2,\n", " n_tasks: int = 10,\n", " max_queue_size: int = 1,\n", " grace_period: int = 2,\n", " kill_period: int = 30,\n", " interrupt: Optional[int] = None\n", " ) -> None:\n", " \"\"\"\n", " Run a process pool of workers.\n", " \n", " Args:\n", " n_workers: Start this many processes\n", " n_tasks: Launch this many tasks\n", " max_queue_size: If queue exceeds this size, block when putting items on the queue\n", " grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM\n", " kill_period: Send SIGKILL to processes if they don't exit after this many seconds\n", " interrupt: If given, send signal SIGTERM to itself after queueing this many tasks\n", " \"\"\"\n", " # The JoinableQueue has an internal counter that increments when an item is put on the queue and\n", " # decrements when q.task_done() is called. This allows us to wait until it's empty using .join()\n", " q = JoinableQueue(maxsize=max_queue_size)\n", " # This is a process-safe version of the \"panic\" variable shown above\n", " stop_event = Event()\n", " \n", " def handler(signalname):\n", " \"\"\"\n", " Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.\n", " Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.\n", " \"\"\"\n", " def f(signal_received, frame):\n", " raise KeyboardInterrupt(f\"{signalname} received\")\n", " return f\n", "\n", " # This will be inherited by the child process if it is forked (not spawned)\n", " signal.signal(signal.SIGINT, handler(\"SIGINT\"))\n", " signal.signal(signal.SIGTERM, handler(\"SIGTERM\"))\n", " \n", " procs = []\n", "\n", " for i in range(n_workers):\n", " # Make it a daemon process so it is definitely terminated when this process exits,\n", " # might be overkill but is a nice feature. See\n", " # https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.Process.daemon\n", " p = Process(name=f\"Worker-{i:02d}\", daemon=True, target=worker, args=(q, stop_event))\n", " procs.append(p)\n", " p.start()\n", " try:\n", " # Put tasks on queue\n", " for i_task in range(n_tasks):\n", " # For demonstration purposes\n", " if interrupt and i_task == interrupt:\n", " os.kill(os.getpid(), signal.SIGTERM)\n", " s = random()\n", " logger.info(\"Put %.3f on queue\", s)\n", " q.put([s])\n", " # Put exit tasks on queue\n", " for i in range(n_workers):\n", " q.put(None)\n", " # Wait until all tasks are processed\n", " q.join()\n", " except KeyboardInterrupt:\n", " logger.warning(\"Caught KeyboardInterrupt! Setting stop event...\")\n", " finally:\n", " stop_event.set()\n", " t = time()\n", " # Send SIGINT if process doesn't exit quickly enough, and kill it as last resort\n", " # .is_alive() also implicitly joins the process (good practice in linux)\n", " while alive_procs := [p for p in procs if p.is_alive()]:\n", " if time() > t + grace_period:\n", " for p in alive_procs:\n", " os.kill(p.pid, signal.SIGINT)\n", " logger.warning(\"Sending SIGINT to %s\", p)\n", " elif time() > t + kill_period:\n", " for p in alive_procs:\n", " logger.warning(\"Sending SIGKILL to %s\", p)\n", " # Queues and other inter-process communication primitives can break when\n", " # process is killed, but we don't care here\n", " p.kill()\n", " sleep(.01)\n", "\n", " sleep(.1)\n", " for p in procs:\n", " logger.info(\"Process status: %s\", p)\n" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Worker-00 425 Starting worker...\n", "MainProcess 426 Put 0.006 on queue\n", "Worker-01 428 Starting worker...\n", "MainProcess 430 Put 0.717 on queue\n", "MainProcess 431 Put 0.953 on queue\n", "MainProcess 432 Put 0.306 on queue\n", "Worker-00 437 Slept for 0.006s\n", "MainProcess 439 Put 0.101 on queue\n", "Worker-01 1149 Slept for 0.717s\n", "MainProcess 1153 Put 0.654 on queue\n", "Worker-00 1393 Slept for 0.953s\n", "MainProcess 1397 Put 0.456 on queue\n", "Worker-01 1459 Slept for 0.306s\n", "MainProcess 1463 Put 0.566 on queue\n", "Worker-00 1497 Slept for 0.101s\n", "MainProcess 1500 Put 0.061 on queue\n", "Worker-00 1957 Slept for 0.456s\n", "MainProcess 1961 Put 0.721 on queue\n", "Worker-01 2117 Slept for 0.654s\n", "Worker-01 2182 Slept for 0.061s\n", "Worker-00 2527 Slept for 0.566s\n", "Worker-00 2531 Worker exiting because of None on queue\n", "Worker-01 2906 Slept for 0.721s\n", "Worker-01 2910 Worker exiting because of None on queue\n", "MainProcess 3024 Process status: \n", "MainProcess 3025 Process status: \n" ] } ], "source": [ "main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Okay! That works all right. We have \"backpressure\" when putting things on the queue, we shut down the workers by putting `None` on the queue, and we correctly wait for the workers to finish.\n", "\n", "We also stop the workers in between tasks if possible:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "MainProcess 3046 Put 0.706 on queue\n", "MainProcess 3051 Put 0.265 on queue\n", "Worker-01 3049 Starting worker...\n", "MainProcess 3053 Put 0.632 on queue\n", "Worker-00 3045 Starting worker...\n", "MainProcess 3073 Put 0.558 on queue\n", "Worker-00 3338 Slept for 0.265s\n", "MainProcess 3342 Caught KeyboardInterrupt! Setting stop event...\n", "Worker-01 3760 Slept for 0.706s\n", "Worker-01 3763 Worker exiting because of stop_event\n", "Worker-00 3975 Slept for 0.632s\n", "Worker-00 3978 Worker exiting because of stop_event\n", "MainProcess 4088 Process status: \n", "MainProcess 4090 Process status: \n" ] } ], "source": [ "main(interrupt=4)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And if it takes too long for the worker to exit we send SIGINT, which results in the worker getting a `KeyboardInterrupt` in the middle of processing the task, after which it iterates through the loop again where it encounters the `stop_signal`. If it's not ok to crash the task like this, we could rely solely on the `stop_event`. (This is just one of many variations possible.)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Worker-00 4124 Starting worker...\n", "MainProcess 4127 Put 0.564 on queue\n", "MainProcess 4131 Put 0.253 on queue\n", "Worker-01 4129 Starting worker...\n", "MainProcess 4133 Put 0.672 on queue\n", "MainProcess 4134 Caught KeyboardInterrupt! Setting stop event...\n", "MainProcess 4145 Sending SIGINT to \n", "MainProcess 4145 Sending SIGINT to \n", "Worker-00 4145 Failed to process args [0.5635664781555114]\n", "Traceback (most recent call last):\n", " File \"\", line 29, in worker\n", " do_task(*args)\n", " File \"\", line 3, in do_task\n", " sleep(sleep_sec)\n", " File \"\", line 68, in f\n", " raise KeyboardInterrupt(f\"{signalname} received\")\n", "KeyboardInterrupt: SIGINT received\n", "Worker-01 4145 Failed to process args [0.2526945976750854]\n", "Traceback (most recent call last):\n", " File \"\", line 29, in worker\n", " do_task(*args)\n", " File \"\", line 3, in do_task\n", " sleep(sleep_sec)\n", " File \"\", line 68, in f\n", " raise KeyboardInterrupt(f\"{signalname} received\")\n", "KeyboardInterrupt: SIGINT received\n", "Worker-00 4147 Worker exiting because of stop_event\n", "Worker-01 4147 Worker exiting because of stop_event\n", "MainProcess 4256 Process status: \n", "MainProcess 4257 Process status: \n" ] } ], "source": [ "main(interrupt=3, grace_period=.01)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "One thing to note in the logs above is that the signal handlers of the parent process were inherited by the child processes because they were forked (\"cloned\") and not spawned. In our case this is fine, in other situations you might want to attach new handlers.\n", "\n", "If the process does not exit after setting the `stop_event` *and* sending SIGINT, we send SIGKILL as seen below." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "MainProcess 4292 Put 0.080 on queue\n", "MainProcess 4296 Put 0.854 on queue\n", "Worker-01 4295 Starting worker...\n", "MainProcess 4298 Put 0.520 on queue\n", "Worker-00 4289 Starting worker...\n", "MainProcess 4315 Caught KeyboardInterrupt! Setting stop event...\n", "MainProcess 4326 Sending SIGKILL to \n", "MainProcess 4327 Sending SIGKILL to \n", "MainProcess 4439 Process status: \n", "MainProcess 4440 Process status: \n" ] } ], "source": [ "main(interrupt=3, kill_period=.01)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion\n", "Implementing parallelism is tricky because you're working not just with Python but also with OS primitives like threads and processes. We've explored some of the functionality available in Python's standard library to build a robust worker pool that is infinitely customizable. One easy tweak would be to send back results from worker to main process (using a queue). Another useful endeavor might be to make a `QueueingProcessPoolExecutor` that has the same API as other Executors.\n", "\n", "Useful articles for me to read were [\"Things I Wish They Told Me About Multiprocessing in Python\"](https://www.cloudcity.io/blog/2019/02/27/things-i-wish-they-told-me-about-multiprocessing-in-python/), [multiprocessing programming guidelines](https://docs.python.org/3.8/library/multiprocessing.html#programming-guidelines) and [multiprocessing examples](https://docs.python.org/3.8/library/multiprocessing.html#examples)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" }, "nikola": { "date": "2020-10-17 12:00:00 UTC", "slug": "python-multiprocessing", "title": "Graceful exit with Python multiprocessing" } }, "nbformat": 4, "nbformat_minor": 4 }