Si vous êtes comme moi, vous avez sûrement réalisé que, par défaut, les scripts python que l’on écrit n’utilisent qu’une portion de la puissance computationnelle à notre disposition.. Vous vous êtes sûrement dit:

J’ai pourtant payé une somme rondelette pour un CPU à 4 coeurs ! Mais qu’est-ce qui se passe ?

Bien que la plupart des CPUs modernes comportent plusieurs coeurs, le code que l’on écrit doit aussi être formatté adéquatement afin d’en tirer pleinement avantage.

Alors explorons ensemble le module multiprocessing de Python afin de découvrir comment faire rouler notre code sur plus d’un coeur à la fois.

C’est en fait assez simple si notre code est déjà bien segmenté en fonctions (comme il devrait l’être…).
L’exemple que je mettrai en place utilisera un ensemble de processus travailleurs qui consumeront une liste de tâches contenue dans une queue.

Commençons par définir une fonction qui calcule un hypothétique potentiel thérapeutique entre 2 individus :

def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

Il nous manque évidemment la définition de la classe « Individual » mais elle n’est pas vraiment nécessaire pour comprendre ce qui se passe ici.
Si nous voulions rouler 1 million d’itérations de cette simulation, nous pourrrions faire la chose suivante:

def main():
    for i in range(1000000):
        heals, nb_therapeutic_minors = simulate_treatment(param1, param2)
        # do some stuff with the results

Et ceci exécuterait 1 million d’iterations de la fonction « simulate_treatment » de manière séquentielle sur un seul coeur. Bouououo…
Modifions donc notre main() un peu afin de répartir notre million d’itérations sur 4 coeurs:

import multiprocessing

def main():
    p = Pool(processes=4)
    tasks = [(param1, param2) for i in range(1000000)]
    simulations = p.map_async(simulate_treatment, tasks)
    p.close()
    p.join()
    for sim in simulations.get():
        heals = sim[0]
        nb_therapeutic_minors = sim[1]
        # do some stuff with the results

Plusieurs choses se produisent alors:

p = Pool(processes=4)

initialise un « pool » de 4 processus travailleurs qui grignoteront la liste de tâches de simulation instantiée par:

tasks = [(param1, param2) for i in range(1000000)]

la liste est ensuite soumise au Pool, les processus rouleront de façon asynchrone et les résultats seront cumulés dans un nouvel iterable portant le nom approprié de « simulations ».

simulations = p.map_async(simulate_treatment, tasks)

Nous fermons ensuite le Pool afin d’éviter la soumission de nouvelles tâches et patientons jusqu’à la complétion des tâches de la queue à l’aide des commandes:

p.close()
p.join()

Plutôt simple, non ?

Mais JP, tu as soumis un tuple de params à une fonction présentant une signature qui requiert 2 paramètres distincts.. Ça va pas ?

Hehe, c’est bien, je vois que vous êtes attentifs ! En fait, ce code est incorrect et plantera si vous tentez de le rouler.
Alors laissez-moi suggérer un dernier addendum.
Si vous êtes comme moi, vous appréciez les belles fonctions bien définies avec des signatures claires. Malheureusement, map_async n’accepte qu’un iterable en entrée.. Ma suggestion est donc la suivante: utilisons une fonction « enveloppe » afin de récupérer le tuple de paramètres soumis à la queue et l’expandre afin de la passer à notre vraie fonction:

def simulate_treatment_wrapper(args):
    return simulate_treatment(*args)

Une fois que tout est en place

Ce bout de code:






def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

def main():
    for i in range(1000000):
        heals, nb_therapeutic_minors = simulate_treatment(param1, param2)
        # do some stuff with the results

Devient:

import multiprocessing

def simulate_treatment_wrapper(args):
    return simulate_treatment(*args)

def simulate_treatment(param1, param2):
    donor = Individual(param1, param2)
    patient = Individual(param1, param2)

    heals = 0
    nb_therapeutic_targets = 0
    if donor.can_heal(patient):
        heals = 1
        nb_therapeutic_targets = len(donor.targets(patient))

    return heals, nb_therapeutic_targets

def main():
    p = Pool(processes=4)
    tasks = [(param1, param2) for i in range(1000000)]
    simulations = p.map_async(simulate_treatment_wrapper, tasks)
    p.close()
    p.join()
    for sim in simulations.get():
        heals = sim[0]
        nb_therapeutic_minors = sim[1]
        # do some stuff with the results

Comme vous pouvez le voir, nous n’avons pas fait de changements trop importants et pourtant, notre code roule maintenant en parallèle sur 4 coeurs physiques !
Bienvenue… 🙂