top of page

Multiprocessing in python

Writer: James FergusonJames Ferguson

Preamble

There are many debates and arguments around which programming language to use when writing bioinformatic software. I think they tend to be informed by each person's background and experience. So while one person will advocate till blue in the face for R and its various packages, another won't be able to understand this position at all, and advocate for C. Between all the strong opinions lies the simple truth that "it depends". Do you want to build a tool or analyse data? Do you want to write something new or use something out of the box? Do you want other people to use it, or is it just a one off? This isn't an extensive list of questions to ask before deciding what you will use, and most of the time the answer is as simple as "use what you know", the list highlights quickly that there isn't a "right" answer for every context. Why do I bring this up when I'm about to nerd out about multiprocessing in python? Well, because of course threading is better. Using C, or Rust, or anything other than python probably does threading/multiprocessing better. But sometimes you just gotta make something work in the language you are using, and that's why I'm writing this. My hope is to educate and potentially solve some headaches for others in the future, by providing some methods of managing a suboptimal solution. Anyway, let's get on with it.


Introduction

So you've written something in python. It's probably 2x longer and more complicated than you would like, and there are some parts you want to speed up with multiprocessing. Either you want to know a quick way to get it done, or you have already tried, and google lead you after asking it "how to kill parent of child process that exits with error python" or something like that. Well, hopefully you will leave here with some new tricks that can help.


Multiprocessing

Multiprocessing in python means to create another process of your program, and run part of it on another processor of your CPU. The advantage is you can do this on multiple processors for a particular function, effectively reducing the time taken to process something that otherwise would take a long time.


Things to consider when multiprocessing:

  1. When you spawn the process, it effectively copies the main parent process and doesn't have much communication with it other than what you set up (we will get to this)

  2. The part of code you are trying to speed up has to be able to be run non-sequentially. (more on this in a moment)

  3. Multiprocessing is asynchronous. Data is not processed in order, and thus can be returned out of order.


Multiprocessing isn't magic. You don't just point it at something as a function and suddenly your software runs twice as fast. It takes a little setting up, and you should really plan out your "thread model" before doing so.


Thread model

What is a thread model? well if you are writing some code that does multithreading or multiprocessing, knowing what to expect, especially accounting for asynchronous computing.


A thread model can be as simple as:


  • Data read by main process

  • Data split into batches by main process

  • Batches are sent to a pool of child processes

    • Each batch is processed by a child process

    • Results are returned to the main process

  • Results are received by the main process

  • Results are analysed by main process

  • Analysis is written to a file by the main process.


Or it can be a little more complicated, were you have dedicated processes doing particular things


  • Main process takes arguments and sets up queues

    • input queue created

    • output queue created

  • Reader child process spawned

    • Reader opens and reads a file

    • data split into batches

    • batches pushed to input queue

  • Worker children processes spawned

  • Worker pulls a batch from input queue

    • batch is processed into results

    • results are pushed to output queue

  • Writer child process spawned

    • Writer gets results from output queue

    • Results are analysed

    • Analysis is written to output file

  • Main process wraps up when everything is finished


You can keep things simple, or go waaaay deeper than the more complicated example above. The point is, knowing what process is doing what and where it sits in your data pipeline is very helpful when you are writing the code and testing things.


When things go wrong

The main reason I am writing this is because setting up some multiprocessing workflows isn't super hard, but catching and handling errors in multiprocessing is hard. When things go wrong in any software, an error occurs, should be captured then reported back to the user so they know something went wrong and what it was, and then the software should exit with a non-zero exit code. When a child process encounters an error in python, that error is not automatically handled by the main process. The user must handle this themselves. In almost every tutorial or bit of example code I found online when trying to troubleshoot errors in child processes there was no sign of error handling. Let's change that.


Multiprocessing library

Python comes with a built-in library called multiprocessing. You can read more about its many features in the python docs. Here we will only talk about Pool, Process, and Joinable_queues.


Multiprocessing with Pool

Remember when I said "Multiprocessing isn't magic"? Well when you do very simple things using a Pool, it kinda is like magic.


from multiprocessing import Pool

def square_n(n):
    return n*n

if __name__ == '__main__':
    procs = 4
    data = [1, 2, 3]
    with Pool(procs) as p:
        print(p.map(square_n, data))

Prints:

[1, 4, 9]

There are variants of the Pool.map() command, and the rabbit hole goes deep on this `Pool` method, but it is pretty simple and looks like magic, right? I mean, this giant red banner in the python docs doesn't mean we have to worry right?...RIGHT???


Warning message from python docs about the multiprocessing.pool() function
Warning message from python docs about the multiprocessing.pool() function

Basically what this means is if something goes wrong with a process in the pool, and you don't bend over backwards to catch it, your software can hang, and you need to manually kill it.


In the example above, p.map() will block until each process has completed, before printing the result. This is pretty normal behaviour in any programming language, that 1 line of code finishes before the next line. In this case, it's ordered by nesting, where the inner most nest of p.map(square_n, data) finishes before the outer part of the line, print() is called on the result. If one of the processes hangs, and never returns and there isn't some kind of timeout to kill the process, then print() is never called and the user won't have any information about what happened.


Multiprocessing with Process

Process() is the way I prefer to do multiprocessing, especially if I want to define particular roles to a process.


For example, having 1 process for reading data, many for processing, and 1 for writing (see thread model above)


You can send arguments to your process and even name them to help with error handling. What I like most about the Process() method is how simple it is. However, again, if a process has an error, it has to be handled, or bad things can happen.


Before getting into some code, to demonstrate the problem with error handling Process(), consider the following

# start first proc
p1 = Process(target=function1, args=(x, y))
p1.start()

# start second proc
p2 = Process(target=function2, args=(a, b))
p2.start()

p1.join() # <---- THIS BLOCKS
p2.join()

If function2() running on p2 has an error, it won't be shown until p1 finishes.


If p1 was a reader and p2 was a writer, this means you wouldn't know something was wrong until p1 was finished. In genomics, if you are reading a file in the hundreds of Gb range, this could be a while. Ideally a user wants to know an error occurred as soon as possible after it happened. So while the above code is the common way of running 2 processes at the same time doing different things, it doesn't handle errors well.


Sending information between processes

Sending and receiving data between different processes can be done a number of ways, and some ways are better than others depending on the kind of information or data you want to communicate.


Personally, I'm a big fan of joinable_queue().


It creates a first-in-first-out (FIFO) queue, that can be pushed to or pull from multiple processes, and the task_done() call leads to a nice concise coding style that a batch is completed.


Putting processes and queues together

Using the more complicated thread model example above, here is how I put Process() and joinable_queue() together.


import multiprocessing as mp

def read_worker(args, iq):
    with open(args.input, 'r') as r:
        batch = []
        counter = 0
        for line in r:
            batch.append(line)
            counter += 1
            if counter >= args.batch_size:
                iq.put(batch)
                batch = []
    # account for anything left over
    if len(batch) > 0:
        iq.put(batch)
    

def data_worker(args, iq, oq):
    while True:
        data = iq.get()
        if data is None:
            iq.task_done()
            break
        # do stuff to data
        results = do_stuff(data)
        oq.put(results)
        iq.task_done()

def write_worker(args, oq):
    with open(args.output, 'w') as w:
        while True:
            results = oq.get()
            if results is None:
                oq.task_done()
                break
            for result in results
                w.write(result)
                w.write("\n")
            oq.task_done()


def main():
	# args = <-- some arguments from argparse

	mp.set_start_method('spawn')

	input_queue = mp.JoinableQueue()
	output_queue = mp.JoinableQueue()

	# spawn 1 reader process
	reader = mp.Process(target=read_worker, args=(args, input_queue), name='reader')
	reader.start()

	# spawn 1 writer process
	writer = mp.Process(target=write_worker, args=(args, output_queue), name='writer')
	writer.start()

	# spawn a N workers to process data
	# list to hold the worker processes
	processes = []
	for i in range(args.procs):
	    worker = mp.Process(target=data_worker, args=(args, input_queue, output_queue, i), daemon=True, name='worker{}'.format(i))
    	worker.start()
	    # add worker to process list
    	processes.append(worker)

	reader.join()

	# Add None to the end of the input_queue to let workers break
	for p in processes:
    	input_queue.put(None)

	# join each worker
	for p in processes:
    	p.join()

	# Add None to the end of the output_queue to let writer break
	output_queue.put(None)
	writer.join()

	print("Jobs done")

if __name__ == '__main__':
    main()

As you can see, things get complicated, I think it's still straight forward enough to figure out what's going on


We start in the main function for the parent process, create the queues, start the writer, reader, and data workers, and then we wait for each to finish with the `join()` command.


The problem

Now, let's say this is running on some big dataset, and the writer process can't open the `args.output` filepath to write the output. Some kind of permission denied error.


Well, we won't be seeing that error until the reader process has finished, and all the worker processes. And even then, the main parent process will exit with an errorcode of 0, meaning it didn't encounter any errors. This is obviously bad!


The solution

What we need is a way to detect when a process encounters an error, regardless of the order in which it will be joined, and then handle that error, and kill the other child processes before exiting the main parent process with an error code.


So in steps:


  1. Error occurs in writer child

  2. Detect error has occurred

  3. Print some info to the user about the error

  4. Find all the child processes and kill them

  5. exit the main process with an error code


We can do this a few different ways, but the way I have been doing it recently is with a while loop with a sleep throttle.


import sys
import time

# process.start() calls here

# Anakin, the process supervisor
# If any of the children exit with an error, find all the children, and kill them
while True:
    if reader.exitcode is not None:
        if reader.exitcode != 0:
            print("ERROR: Reader process encountered an error. exitcode: ", reader.exitcode)
            for child in mp.active_children():
                child.terminate()
            sys.exit(1)
    if writer.exitcode is not None:
        if writer.exitcode != 0:
            print("ERROR: Writer process encountered an error. exitcode: ", writer.exitcode)
            for child in mp.active_children():
                child.terminate()
            sys.exit(1)
    for p in processes:
        if p.exitcode is not None:
            if p.exitcode != 0:
                print("ERROR: Worker client encountered an error. exitcode: ", p.exitcode)
                for child in mp.active_children():
                    child.terminate()
                sys.exit(1)
    if reader.exitcode == 0:
        p_sum = 0
        for p in processes:
            if p.exitcode != 0:
                p_sum += 1
        if p_sum == 0:
            output_queue.put(None)
            time.sleep(3)
            if writer.exitcode == 0:
                print("Proc supervisor: all processes completed without error")
                break
    # throttle the while loop
    time.sleep(5)

# process join() calls here

Any time a child process exits, either normally or via an error, the process.exitcode value will be updated.


If the process is still running with no exitcode, it is `None`


This allows us to do 2 things:


  1. Check if the process is still running (exitcode == None)

  2. If process has ended

    1. test if it was a success (exitcode == 0)

    2. test if it was a failure (exitcode != 0)


When an error is detected, we need to find all the child processes and kill them.

for child in mp.active_children():
    child.terminate()
sys.exit(1)

This finds them all, and calls terminate()

Once they are dead, a call to sys.exit(1) exits the main process with an exitcode of 1. You could instead pass the actual exitcode from the error, or even have a dictionary of known error codes with pre-set error messages, depending on how helpful you want to be.


Finally, there needs to be a way to exit the while loop, and in this case, send a signal to the output_queue to end the writer process. If all other processes have ended with a 0 exitcode, then the sum of all their exitcodes will be 0, otherwise there was an error.



Wrap up


And that's about it. That I think is one of the simplest ways to handle multiprocessing in python. It can monitor the child processes, catch the errors, kill the other children when it happens, and can be tweaked pretty easily.


Yes, there are other ways to do this. Yes, they mostly work fine too. However this is the method I have been using lately and it's really simple to set up and get things done.


I hope you found this helpful and learned something new.


If instead, you are not so happy, and want to write your own blog post about how it should really be done, please send me the link, I'd love to read it.


Cheers,


James



References


Comments


Contact
Information

Genomic Technologies Group
Genomics and Inherited Disease Program

Garvan Institute of Medical Research

  • github
  • LinkedIn
  • Twitter
  • mastodon

Thanks for submitting!

©2022 by James M. Ferguson. Created with Wix.com

bottom of page