top of page

Blog

Preamble

There are many debates and arguments around which programming language to use when writing bioinformatic software. I think they tend to be informed by each person's background and experience. So while one person will advocate till blue in the face for R and its various packages, another won't be able to understand this position at all, and advocate for C. Between all the strong opinions lies the simple truth that "it depends". Do you want to build a tool or analyse data? Do you want to write something new or use something out of the box? Do you want other people to use it, or is it just a one off? This isn't an extensive list of questions to ask before deciding what you will use, and most of the time the answer is as simple as "use what you know", the list highlights quickly that there isn't a "right" answer for every context. Why do I bring this up when I'm about to nerd out about multiprocessing in python? Well, because of course threading is better. Using C, or Rust, or anything other than python probably does threading/multiprocessing better. But sometimes you just gotta make something work in the language you are using, and that's why I'm writing this. My hope is to educate and potentially solve some headaches for others in the future, by providing some methods of managing a suboptimal solution. Anyway, let's get on with it.


Introduction

So you've written something in python. It's probably 2x longer and more complicated than you would like, and there are some parts you want to speed up with multiprocessing. Either you want to know a quick way to get it done, or you have already tried, and google lead you after asking it "how to kill parent of child process that exits with error python" or something like that. Well, hopefully you will leave here with some new tricks that can help.


Multiprocessing

Multiprocessing in python means to create another process of your program, and run part of it on another processor of your CPU. The advantage is you can do this on multiple processors for a particular function, effectively reducing the time taken to process something that otherwise would take a long time.


Things to consider when multiprocessing:

  1. When you spawn the process, it effectively copies the main parent process and doesn't have much communication with it other than what you set up (we will get to this)

  2. The part of code you are trying to speed up has to be able to be run non-sequentially. (more on this in a moment)

  3. Multiprocessing is asynchronous. Data is not processed in order, and thus can be returned out of order.


Multiprocessing isn't magic. You don't just point it at something as a function and suddenly your software runs twice as fast. It takes a little setting up, and you should really plan out your "thread model" before doing so.


Thread model

What is a thread model? well if you are writing some code that does multithreading or multiprocessing, knowing what to expect, especially accounting for asynchronous computing.


A thread model can be as simple as:


  • Data read by main process

  • Data split into batches by main process

  • Batches are sent to a pool of child processes

    • Each batch is processed by a child process

    • Results are returned to the main process

  • Results are received by the main process

  • Results are analysed by main process

  • Analysis is written to a file by the main process.


Or it can be a little more complicated, were you have dedicated processes doing particular things


  • Main process takes arguments and sets up queues

    • input queue created

    • output queue created

  • Reader child process spawned

    • Reader opens and reads a file

    • data split into batches

    • batches pushed to input queue

  • Worker children processes spawned

  • Worker pulls a batch from input queue

    • batch is processed into results

    • results are pushed to output queue

  • Writer child process spawned

    • Writer gets results from output queue

    • Results are analysed

    • Analysis is written to output file

  • Main process wraps up when everything is finished


You can keep things simple, or go waaaay deeper than the more complicated example above. The point is, knowing what process is doing what and where it sits in your data pipeline is very helpful when you are writing the code and testing things.


When things go wrong

The main reason I am writing this is because setting up some multiprocessing workflows isn't super hard, but catching and handling errors in multiprocessing is hard. When things go wrong in any software, an error occurs, should be captured then reported back to the user so they know something went wrong and what it was, and then the software should exit with a non-zero exit code. When a child process encounters an error in python, that error is not automatically handled by the main process. The user must handle this themselves. In almost every tutorial or bit of example code I found online when trying to troubleshoot errors in child processes there was no sign of error handling. Let's change that.


Multiprocessing library

Python comes with a built-in library called multiprocessing. You can read more about its many features in the python docs. Here we will only talk about Pool, Process, and Joinable_queues.


Multiprocessing with Pool

Remember when I said "Multiprocessing isn't magic"? Well when you do very simple things using a Pool, it kinda is like magic.


from multiprocessing import Pool

def square_n(n):
    return n*n

if __name__ == '__main__':
    procs = 4
    data = [1, 2, 3]
    with Pool(procs) as p:
        print(p.map(square_n, data))

Prints:

[1, 4, 9]

There are variants of the Pool.map() command, and the rabbit hole goes deep on this `Pool` method, but it is pretty simple and looks like magic, right? I mean, this giant red banner in the python docs doesn't mean we have to worry right?...RIGHT???


Warning message from python docs about the multiprocessing.pool() function
Warning message from python docs about the multiprocessing.pool() function

Basically what this means is if something goes wrong with a process in the pool, and you don't bend over backwards to catch it, your software can hang, and you need to manually kill it.


In the example above, p.map() will block until each process has completed, before printing the result. This is pretty normal behaviour in any programming language, that 1 line of code finishes before the next line. In this case, it's ordered by nesting, where the inner most nest of p.map(square_n, data) finishes before the outer part of the line, print() is called on the result. If one of the processes hangs, and never returns and there isn't some kind of timeout to kill the process, then print() is never called and the user won't have any information about what happened.


Multiprocessing with Process

Process() is the way I prefer to do multiprocessing, especially if I want to define particular roles to a process.


For example, having 1 process for reading data, many for processing, and 1 for writing (see thread model above)


You can send arguments to your process and even name them to help with error handling. What I like most about the Process() method is how simple it is. However, again, if a process has an error, it has to be handled, or bad things can happen.


Before getting into some code, to demonstrate the problem with error handling Process(), consider the following

# start first proc
p1 = Process(target=function1, args=(x, y))
p1.start()

# start second proc
p2 = Process(target=function2, args=(a, b))
p2.start()

p1.join() # <---- THIS BLOCKS
p2.join()

If function2() running on p2 has an error, it won't be shown until p1 finishes.


If p1 was a reader and p2 was a writer, this means you wouldn't know something was wrong until p1 was finished. In genomics, if you are reading a file in the hundreds of Gb range, this could be a while. Ideally a user wants to know an error occurred as soon as possible after it happened. So while the above code is the common way of running 2 processes at the same time doing different things, it doesn't handle errors well.


Sending information between processes

Sending and receiving data between different processes can be done a number of ways, and some ways are better than others depending on the kind of information or data you want to communicate.


Personally, I'm a big fan of joinable_queue().


It creates a first-in-first-out (FIFO) queue, that can be pushed to or pull from multiple processes, and the task_done() call leads to a nice concise coding style that a batch is completed.


Putting processes and queues together

Using the more complicated thread model example above, here is how I put Process() and joinable_queue() together.


import multiprocessing as mp

def read_worker(args, iq):
    with open(args.input, 'r') as r:
        batch = []
        counter = 0
        for line in r:
            batch.append(line)
            counter += 1
            if counter >= args.batch_size:
                iq.put(batch)
                batch = []
    # account for anything left over
    if len(batch) > 0:
        iq.put(batch)
    

def data_worker(args, iq, oq):
    while True:
        data = iq.get()
        if data is None:
            iq.task_done()
            break
        # do stuff to data
        results = do_stuff(data)
        oq.put(results)
        iq.task_done()

def write_worker(args, oq):
    with open(args.output, 'w') as w:
        while True:
            results = oq.get()
            if results is None:
                oq.task_done()
                break
            for result in results
                w.write(result)
                w.write("\n")
            oq.task_done()


def main():
	# args = <-- some arguments from argparse

	mp.set_start_method('spawn')

	input_queue = mp.JoinableQueue()
	output_queue = mp.JoinableQueue()

	# spawn 1 reader process
	reader = mp.Process(target=read_worker, args=(args, input_queue), name='reader')
	reader.start()

	# spawn 1 writer process
	writer = mp.Process(target=write_worker, args=(args, output_queue), name='writer')
	writer.start()

	# spawn a N workers to process data
	# list to hold the worker processes
	processes = []
	for i in range(args.procs):
	    worker = mp.Process(target=data_worker, args=(args, input_queue, output_queue, i), daemon=True, name='worker{}'.format(i))
    	worker.start()
	    # add worker to process list
    	processes.append(worker)

	reader.join()

	# Add None to the end of the input_queue to let workers break
	for p in processes:
    	input_queue.put(None)

	# join each worker
	for p in processes:
    	p.join()

	# Add None to the end of the output_queue to let writer break
	output_queue.put(None)
	writer.join()

	print("Jobs done")

if __name__ == '__main__':
    main()

As you can see, things get complicated, I think it's still straight forward enough to figure out what's going on


We start in the main function for the parent process, create the queues, start the writer, reader, and data workers, and then we wait for each to finish with the `join()` command.


The problem

Now, let's say this is running on some big dataset, and the writer process can't open the `args.output` filepath to write the output. Some kind of permission denied error.


Well, we won't be seeing that error until the reader process has finished, and all the worker processes. And even then, the main parent process will exit with an errorcode of 0, meaning it didn't encounter any errors. This is obviously bad!


The solution

What we need is a way to detect when a process encounters an error, regardless of the order in which it will be joined, and then handle that error, and kill the other child processes before exiting the main parent process with an error code.


So in steps:


  1. Error occurs in writer child

  2. Detect error has occurred

  3. Print some info to the user about the error

  4. Find all the child processes and kill them

  5. exit the main process with an error code


We can do this a few different ways, but the way I have been doing it recently is with a while loop with a sleep throttle.


import sys
import time

# process.start() calls here

# Anakin, the process supervisor
# If any of the children exit with an error, find all the children, and kill them
while True:
    if reader.exitcode is not None:
        if reader.exitcode != 0:
            print("ERROR: Reader process encountered an error. exitcode: ", reader.exitcode)
            for child in mp.active_children():
                child.terminate()
            sys.exit(1)
    if writer.exitcode is not None:
        if writer.exitcode != 0:
            print("ERROR: Writer process encountered an error. exitcode: ", writer.exitcode)
            for child in mp.active_children():
                child.terminate()
            sys.exit(1)
    for p in processes:
        if p.exitcode is not None:
            if p.exitcode != 0:
                print("ERROR: Worker client encountered an error. exitcode: ", p.exitcode)
                for child in mp.active_children():
                    child.terminate()
                sys.exit(1)
    if reader.exitcode == 0:
        p_sum = 0
        for p in processes:
            if p.exitcode != 0:
                p_sum += 1
        if p_sum == 0:
            output_queue.put(None)
            time.sleep(3)
            if writer.exitcode == 0:
                print("Proc supervisor: all processes completed without error")
                break
    # throttle the while loop
    time.sleep(5)

# process join() calls here

Any time a child process exits, either normally or via an error, the process.exitcode value will be updated.


If the process is still running with no exitcode, it is `None`


This allows us to do 2 things:


  1. Check if the process is still running (exitcode == None)

  2. If process has ended

    1. test if it was a success (exitcode == 0)

    2. test if it was a failure (exitcode != 0)


When an error is detected, we need to find all the child processes and kill them.

for child in mp.active_children():
    child.terminate()
sys.exit(1)

This finds them all, and calls terminate()

Once they are dead, a call to sys.exit(1) exits the main process with an exitcode of 1. You could instead pass the actual exitcode from the error, or even have a dictionary of known error codes with pre-set error messages, depending on how helpful you want to be.


Finally, there needs to be a way to exit the while loop, and in this case, send a signal to the output_queue to end the writer process. If all other processes have ended with a 0 exitcode, then the sum of all their exitcodes will be 0, otherwise there was an error.



Wrap up


And that's about it. That I think is one of the simplest ways to handle multiprocessing in python. It can monitor the child processes, catch the errors, kill the other children when it happens, and can be tweaked pretty easily.


Yes, there are other ways to do this. Yes, they mostly work fine too. However this is the method I have been using lately and it's really simple to set up and get things done.


I hope you found this helpful and learned something new.


If instead, you are not so happy, and want to write your own blog post about how it should really be done, please send me the link, I'd love to read it.


Cheers,


James



References


43 views0 comments

When basecalling nanopore reads with guppy or some other basecaller, it will split the reads into pass/fail files depending on some given mean qscore value. Mean qscore here is the average of the probabilities associated with the quality scores in the 4th line of your fastq read. It is calculated with the following equation:


Put into code:

def calculate_qscore(qstring):
    '''
    calculate a qscore from a qstring
    '''
    qs = (np.array(qstring, 'c').view(np.uint8) - 33)
    mean_err = np.exp(qs * (-np.log(10) / 10.)).mean()
    score = -10 * np.log10(max(mean_err, 1e-4))
    return score

So you have basecalled some nanopore reads, but now you want to be more or less stringent on your pass/fail filtering. How do you re-split your reads? One way is to use the sequencing_summary.txt file to extract the mean_qscore field, and search the fastq for the @readID. The sequencing_summary.txt file has changed many, many, many, many times over the years, and has the possibility of being removed or altered dramatically soon. So what other way is there?

Introducing, split_qscore.py, a script to split your fastq and sam files by a qscore cutoff.

usage: split_qscore.py [-h] [-p PREFIX] [-q QSCORE] [-f {fastq,sam}] input output

split a fastq or sam file by qscore value into pass/fail files

positional arguments:
  input                 fastq, sam file or - for STDIN/pipe
  output                output path to write pass/fail files

options:
  -h, --help            show this help message and exit
  -p PREFIX, --prefix PREFIX
                        filename prefix to give pass/fail output files, eg -p exampl gives example.pass.fastq & example.fail.fastq (default: reads)
  -q QSCORE, --qscore QSCORE
                        qscore to split on:pass=>qscore(default: 9)
  -f {fastq,sam}, --format {fastq,sam}
                        if using - for stdin, give format as fastq or sam (default: None)

You can now input your fastq to be split by qscore.


python3 split_qscore.py -q 15 test.fastq ./ > stats.txt
>cat stats.txt

~~~ STATS ~~~:

pass_reads: 2670
fail_reads: 1330
total_reads: 4000
pass fraction: 66.75%

You can also split sam/bam files. You can provide a sam file like the fastq file above, and everything will just work. If starting from a bam file, you can stream the data using the following piping pattern where - (dash) is for stdin:


samtools view -h test.bam | python3 split_qscore.py -q 15 -f sam - ./ > stats.txt

Don't forget the -h argument in samtools to also send the header. You can then convert the resulting sam files back to bam. This is useful for unaligned sam/bam output when calling methylation with remora in guppy/dorado.

When calculating the mean qscore of a read, split_qscore.py will first try to see if a tag is present containing the pre-computed score. If your data was basecalled using the slow5 wrapper for guppy, buttery-eel, your reads will have a mean_qscore=# in each read header. Any sam outputs from guppy/buttery-eel, dorado will have the unofficial sam tag qs:i:#. If these tags cannot be found, then the mean qscore will be manually calculated. This, of course, can be a little slower.

I hope this small script is helpful to those doing long-read sequencing and coming across this issue of read splitting.

733 views0 comments

There is a lot to be said about bioinformatic tools, and a lot of angles to take. I think on the discussion of Command Line Interfaces (CLI) vs Graphical User Interfaces (GUI), the following applies:


Almost all bioinformatic tools need a CLI, whereas only some bioinformatic tools need a GUI.


Now, a lot is riding on my use of “need” in the above description. What I mean, is the tool would not be usable in its most basic state without it. So a tool that requires a lot of human intervention and interaction, a GUI might very well be needed to be functional. While a CLI can add a different way of doing it, the GUI is the tool. I think of tools like IGV for viewing read alignments.



The GUI is the tool. Sure a CLI makes certain methods of getting screenshots of various regions nice to have, but it’s not the main reason for the tool. It’s to visualise the alignments and various annotations. Meanwhile, a tool like minimap2 needs a CLI but doesn’t need a GUI. Sure, having one might make some applications of the tool more intuitive or nicer to use for some users, but the core function of the tool is around its CLI and ability to be used in pipelines.


This brings me to why someone might want a GUI or CLI. There are many people in the bioinformatics space. Tool builders, tool users, scripters, pipeliners, and a variety of wet lab/dry lab use and exploration. Many people sit in multiple areas too. In some cases, a point-and-click GUI is going to be best for some users, while for others, a CLI is a must, and a GUI-only version of a tool is going to make it impossible to automate/implement in a pipeline.


Bioinformatics is a broad field with many people with varying expertise, writing and using tools in a variety of applications. So there is no one answer for everyone. That being said, I personally think all tools should have their basic functions accessible via a CLI, and if the tool has some visual aspect, wrapping that in some kind of GUI can make a lot of sense and make the user experience much nicer.


One thing you won’t catch me doing, however, is slamming tool builders for not building a GUI on their tool.


So to say it again, my position on CLI vs GUI in bioinformatics : Almost all bioinformatic tools need a CLI, whereas only some bioinformatic tools need a GUI.


108 views0 comments
2

Contact
Information

Genomic Technologies Group
Genomics and Inherited Disease Program

Garvan Institute of Medical Research

  • github
  • LinkedIn
  • Twitter
  • mastodon

Thanks for submitting!

©2022 by James M. Ferguson. Created with Wix.com

bottom of page