Preamble
There are many debates and arguments around which programming language to use when writing bioinformatic software. I think they tend to be informed by each person's background and experience. So while one person will advocate till blue in the face for R and its various packages, another won't be able to understand this position at all, and advocate for C. Between all the strong opinions lies the simple truth that "it depends". Do you want to build a tool or analyse data? Do you want to write something new or use something out of the box? Do you want other people to use it, or is it just a one off? This isn't an extensive list of questions to ask before deciding what you will use, and most of the time the answer is as simple as "use what you know", the list highlights quickly that there isn't a "right" answer for every context. Why do I bring this up when I'm about to nerd out about multiprocessing in python? Well, because of course threading is better. Using C, or Rust, or anything other than python probably does threading/multiprocessing better. But sometimes you just gotta make something work in the language you are using, and that's why I'm writing this. My hope is to educate and potentially solve some headaches for others in the future, by providing some methods of managing a suboptimal solution. Anyway, let's get on with it.
Introduction
So you've written something in python. It's probably 2x longer and more complicated than you would like, and there are some parts you want to speed up with multiprocessing. Either you want to know a quick way to get it done, or you have already tried, and google lead you after asking it "how to kill parent of child process that exits with error python" or something like that. Well, hopefully you will leave here with some new tricks that can help.
Multiprocessing
Multiprocessing in python means to create another process of your program, and run part of it on another processor of your CPU. The advantage is you can do this on multiple processors for a particular function, effectively reducing the time taken to process something that otherwise would take a long time.
Things to consider when multiprocessing:
When you spawn the process, it effectively copies the main parent process and doesn't have much communication with it other than what you set up (we will get to this)
The part of code you are trying to speed up has to be able to be run non-sequentially. (more on this in a moment)
Multiprocessing is asynchronous. Data is not processed in order, and thus can be returned out of order.
Multiprocessing isn't magic. You don't just point it at something as a function and suddenly your software runs twice as fast. It takes a little setting up, and you should really plan out your "thread model" before doing so.
Thread model
What is a thread model? well if you are writing some code that does multithreading or multiprocessing, knowing what to expect, especially accounting for asynchronous computing.
A thread model can be as simple as:
Data read by main process
Data split into batches by main process
Batches are sent to a pool of child processes
Each batch is processed by a child process
Results are returned to the main process
Results are received by the main process
Results are analysed by main process
Analysis is written to a file by the main process.
Or it can be a little more complicated, were you have dedicated processes doing particular things
Main process takes arguments and sets up queues
input queue created
output queue created
Reader child process spawned
Reader opens and reads a file
data split into batches
batches pushed to input queue
Worker children processes spawned
Worker pulls a batch from input queue
batch is processed into results
results are pushed to output queue
Writer child process spawned
Writer gets results from output queue
Results are analysed
Analysis is written to output file
Main process wraps up when everything is finished
You can keep things simple, or go waaaay deeper than the more complicated example above. The point is, knowing what process is doing what and where it sits in your data pipeline is very helpful when you are writing the code and testing things.
When things go wrong
The main reason I am writing this is because setting up some multiprocessing workflows isn't super hard, but catching and handling errors in multiprocessing is hard. When things go wrong in any software, an error occurs, should be captured then reported back to the user so they know something went wrong and what it was, and then the software should exit with a non-zero exit code. When a child process encounters an error in python, that error is not automatically handled by the main process. The user must handle this themselves. In almost every tutorial or bit of example code I found online when trying to troubleshoot errors in child processes there was no sign of error handling. Let's change that.
Multiprocessing library
Python comes with a built-in library called multiprocessing. You can read more about its many features in the python docs. Here we will only talk about Pool, Process, and Joinable_queues.
Multiprocessing with Pool
Remember when I said "Multiprocessing isn't magic"? Well when you do very simple things using a Pool, it kinda is like magic.
from multiprocessing import Pool
def square_n(n):
return n*n
if __name__ == '__main__':
procs = 4
data = [1, 2, 3]
with Pool(procs) as p:
print(p.map(square_n, data))
Prints:
[1, 4, 9]
There are variants of the Pool.map() command, and the rabbit hole goes deep on this `Pool` method, but it is pretty simple and looks like magic, right? I mean, this giant red banner in the python docs doesn't mean we have to worry right?...RIGHT???

Basically what this means is if something goes wrong with a process in the pool, and you don't bend over backwards to catch it, your software can hang, and you need to manually kill it.
In the example above, p.map() will block until each process has completed, before printing the result. This is pretty normal behaviour in any programming language, that 1 line of code finishes before the next line. In this case, it's ordered by nesting, where the inner most nest of p.map(square_n, data) finishes before the outer part of the line, print() is called on the result. If one of the processes hangs, and never returns and there isn't some kind of timeout to kill the process, then print() is never called and the user won't have any information about what happened.
Multiprocessing with Process
Process() is the way I prefer to do multiprocessing, especially if I want to define particular roles to a process.
For example, having 1 process for reading data, many for processing, and 1 for writing (see thread model above)
You can send arguments to your process and even name them to help with error handling. What I like most about the Process() method is how simple it is. However, again, if a process has an error, it has to be handled, or bad things can happen.
Before getting into some code, to demonstrate the problem with error handling Process(), consider the following
# start first proc
p1 = Process(target=function1, args=(x, y))
p1.start()
# start second proc
p2 = Process(target=function2, args=(a, b))
p2.start()
p1.join() # <---- THIS BLOCKS
p2.join()
If function2() running on p2 has an error, it won't be shown until p1 finishes.
If p1 was a reader and p2 was a writer, this means you wouldn't know something was wrong until p1 was finished. In genomics, if you are reading a file in the hundreds of Gb range, this could be a while. Ideally a user wants to know an error occurred as soon as possible after it happened. So while the above code is the common way of running 2 processes at the same time doing different things, it doesn't handle errors well.
Sending information between processes
Sending and receiving data between different processes can be done a number of ways, and some ways are better than others depending on the kind of information or data you want to communicate.
Personally, I'm a big fan of joinable_queue().
It creates a first-in-first-out (FIFO) queue, that can be pushed to or pull from multiple processes, and the task_done() call leads to a nice concise coding style that a batch is completed.
Putting processes and queues together
Using the more complicated thread model example above, here is how I put Process() and joinable_queue() together.
import multiprocessing as mp
def read_worker(args, iq):
with open(args.input, 'r') as r:
batch = []
counter = 0
for line in r:
batch.append(line)
counter += 1
if counter >= args.batch_size:
iq.put(batch)
batch = []
# account for anything left over
if len(batch) > 0:
iq.put(batch)
def data_worker(args, iq, oq):
while True:
data = iq.get()
if data is None:
iq.task_done()
break
# do stuff to data
results = do_stuff(data)
oq.put(results)
iq.task_done()
def write_worker(args, oq):
with open(args.output, 'w') as w:
while True:
results = oq.get()
if results is None:
oq.task_done()
break
for result in results
w.write(result)
w.write("\n")
oq.task_done()
def main():
# args = <-- some arguments from argparse
mp.set_start_method('spawn')
input_queue = mp.JoinableQueue()
output_queue = mp.JoinableQueue()
# spawn 1 reader process
reader = mp.Process(target=read_worker, args=(args, input_queue), name='reader')
reader.start()
# spawn 1 writer process
writer = mp.Process(target=write_worker, args=(args, output_queue), name='writer')
writer.start()
# spawn a N workers to process data
# list to hold the worker processes
processes = []
for i in range(args.procs):
worker = mp.Process(target=data_worker, args=(args, input_queue, output_queue, i), daemon=True, name='worker{}'.format(i))
worker.start()
# add worker to process list
processes.append(worker)
reader.join()
# Add None to the end of the input_queue to let workers break
for p in processes:
input_queue.put(None)
# join each worker
for p in processes:
p.join()
# Add None to the end of the output_queue to let writer break
output_queue.put(None)
writer.join()
print("Jobs done")
if __name__ == '__main__':
main()
As you can see, things get complicated, I think it's still straight forward enough to figure out what's going on
We start in the main function for the parent process, create the queues, start the writer, reader, and data workers, and then we wait for each to finish with the `join()` command.
The problem
Now, let's say this is running on some big dataset, and the writer process can't open the `args.output` filepath to write the output. Some kind of permission denied error.
Well, we won't be seeing that error until the reader process has finished, and all the worker processes. And even then, the main parent process will exit with an errorcode of 0, meaning it didn't encounter any errors. This is obviously bad!
The solution
What we need is a way to detect when a process encounters an error, regardless of the order in which it will be joined, and then handle that error, and kill the other child processes before exiting the main parent process with an error code.
So in steps:
Error occurs in writer child
Detect error has occurred
Print some info to the user about the error
Find all the child processes and kill them
exit the main process with an error code
We can do this a few different ways, but the way I have been doing it recently is with a while loop with a sleep throttle.
import sys
import time
# process.start() calls here
# Anakin, the process supervisor
# If any of the children exit with an error, find all the children, and kill them
while True:
if reader.exitcode is not None:
if reader.exitcode != 0:
print("ERROR: Reader process encountered an error. exitcode: ", reader.exitcode)
for child in mp.active_children():
child.terminate()
sys.exit(1)
if writer.exitcode is not None:
if writer.exitcode != 0:
print("ERROR: Writer process encountered an error. exitcode: ", writer.exitcode)
for child in mp.active_children():
child.terminate()
sys.exit(1)
for p in processes:
if p.exitcode is not None:
if p.exitcode != 0:
print("ERROR: Worker client encountered an error. exitcode: ", p.exitcode)
for child in mp.active_children():
child.terminate()
sys.exit(1)
if reader.exitcode == 0:
p_sum = 0
for p in processes:
if p.exitcode != 0:
p_sum += 1
if p_sum == 0:
output_queue.put(None)
time.sleep(3)
if writer.exitcode == 0:
print("Proc supervisor: all processes completed without error")
break
# throttle the while loop
time.sleep(5)
# process join() calls here
Any time a child process exits, either normally or via an error, the process.exitcode value will be updated.
If the process is still running with no exitcode, it is `None`
This allows us to do 2 things:
Check if the process is still running (exitcode == None)
If process has ended
test if it was a success (exitcode == 0)
test if it was a failure (exitcode != 0)
When an error is detected, we need to find all the child processes and kill them.
for child in mp.active_children():
child.terminate()
sys.exit(1)
This finds them all, and calls terminate()
Once they are dead, a call to sys.exit(1) exits the main process with an exitcode of 1. You could instead pass the actual exitcode from the error, or even have a dictionary of known error codes with pre-set error messages, depending on how helpful you want to be.
Finally, there needs to be a way to exit the while loop, and in this case, send a signal to the output_queue to end the writer process. If all other processes have ended with a 0 exitcode, then the sum of all their exitcodes will be 0, otherwise there was an error.
Wrap up
And that's about it. That I think is one of the simplest ways to handle multiprocessing in python. It can monitor the child processes, catch the errors, kill the other children when it happens, and can be tweaked pretty easily.
Yes, there are other ways to do this. Yes, they mostly work fine too. However this is the method I have been using lately and it's really simple to set up and get things done.
I hope you found this helpful and learned something new.
If instead, you are not so happy, and want to write your own blog post about how it should really be done, please send me the link, I'd love to read it.
Cheers,
James