If you’re like me, you’ve probably noticed that, by default, the python scripts we write only use a portion of the processing power at our disposal.. As such, you’ve probably said to yourself:

Hey, I paid good money for a quad-core CPU ! What’s happening ?

While it’s true that nowadays, most CPUs are multi-core, the code we write must also be tailored appropriately in order to make use of more than one at a time.

So let’s dive into python’s multiprocessing module and figure out how we can have our scripts run on multiple CPUs.

This is actually pretty simple to put in place if your code is already split into functions (which it should…).
What I’ll be putting in place is a queue of tasks and a set of workers which will consume tasks from the queue.

We’ll start off with the following function which computes a hypothetical treatment potential between 2 individuals:

def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

Obviously, we are missing the definition for the “Individual” class but we don’t really need it to understand what is going on.
Now if we wanted to run this simulation 1 million times, we could do the following:

def main():
    for i in range(1000000):
        heals, nb_therapeutic_minors = simulate_treatment(param1, param2)
        # do some stuff with the results

And that would execute 1 million iterations of the function sequentially on a single core. Boooo…
Let’s modify our main() just a little bit to spread our million iterations over 4 CPUs:

import multiprocessing

def main():
    p = Pool(processes=4)
    tasks = [(param1, param2) for i in range(1000000)]
    simulations = p.map_async(simulate_treatment, tasks)
    p.close()
    p.join()
    for sim in simulations.get():
        heals = sim[0]
        nb_therapeutic_minors = sim[1]
        # do some stuff with the results

There are a few things happening here:

p = Pool(processes=4)

initializes a pool of 4 consumer processes which will nibble at the list of simulation tasks instantiated by:

tasks = [(param1, param2) for i in range(1000000)]

the list is then submitted to the Pool, asked to run asynchronously and results gathered inside a new iterable appropriately called “simulations”.

simulations = p.map_async(simulate_treatment, tasks)

We then close the pool to avoid any extra submissions and wait for all processes to complete using:

p.close()
p.join()

Pretty simple, huh ?

But JP, you actually submitted a tuple of params to a function with a signature asking for 2 parameters.. What gives ?

Hehe, good of you to have caught that ! In fact, the code up here will break if you run it.
So let me suggest one last addendum here.
If you’re like me, you like nicely defined functions with clear signatures. Unfortunately, map_async only accepts an iterable as input.. So my suggestion here is to use a a wrapper around our simulate_treatment function and use it to expand the contents of our submitted tuple as input for the real function:

def simulate_treatment_wrapper(args):
    return simulate_treatment(*args)

Let’s put it all together

This snippet:






def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

def main():
    for i in range(1000000):
        heals, nb_therapeutic_minors = simulate_treatment(param1, param2)
        # do some stuff with the results

Becomes:

import multiprocessing

def simulate_treatment_wrapper(args):
    return simulate_treatment(*args)

def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

def main():
    p = Pool(processes=4)
    tasks = [(param1, param2) for i in range(1000000)]
    simulations = p.map_async(simulate_treatment_wrapper, tasks)
    p.close()
    p.join()
    for sim in simulations.get():
        heals = sim[0]
        nb_therapeutic_minors = sim[1]
        # do some stuff with the results

As you can see, we haven’t changed our code a whole lot, and now our code runs on 4 CPUs !
You’re welcome.. 🙂