Skip to content

Python - Multiprocess

Class/Method

Pool

La classe Pool permet de créer un pool de processus. Elle prend en argument le nombre de processus en parallèle. Si aucune valeur n'est entrée elle prendra la valeur de la fonction multiprocessing.cpu_count().

import time
import multiprocessing as mp

def f(x):
    time.sleep(1)
    print x*x

if __name__ == '__main__':
    p = mp.Pool(5)
    p.map(f, range(1, 10))

Process

La classe Process permet de créer un processus qui sera démarré avec la fonction start(). La classe prend en argument la fonction cible ainsi que les arguments envoyés à la fonction. La fonction join() sert à attendre la fin du processus.

import multiprocessing as mp

def f(name):
    print 'hello {0}'.format(name)

if __name__ == '__main__':
    p = mp.Process(target=f, args=('LoKo',))
    p.start()
    p.join()

Queue

La classe Queue permet d'envoyer et de récupérer des informations depuis le processus. Pour ce faire l'on utilise les fonctions get() et put().

import multiprocessing as mp
import time

def f(q, name):
    q.put('Hello {0}'.format(name))

if __name__ == '__main__':

    p_lst = []
    q_lst = []

    for n in ['LoKo', 'SangoKu', 'Trunk']:
        q = mp.Queue()
        p = mp.Process(target=f, args=(q, n,))

        p.start()
        p_lst.append(p)
        q_lst.append(q.get())

    for p in p_lst:
        p.join()

    print q_lst

Output

['hello LoKo', 'hello SangoKu', 'hello Trunk']

Exemples

MP avec Lock

Code

import random
import time
import os
import signal
import multiprocessing as mp

def data_treatment(data, out_file, lock):

    print '   Data <{0}> Start'.format(data)

    rnd_int = random.randint(1, 3)
    time.sleep(rnd_int)

    with lock:
        with open(out_file, 'a', 0) as f:
            f.write('{0}++ ({1})\n'.format(data, rnd_int))

    print '   Data <{0}> End'.format(data)

def process_treatment(data_by_thread_lst, out_file, lock):

    signal.signal(signal.SIGINT, signal.SIG_IGN)
    prc_name = mp.current_process().name

    print ' {0} Start - Data : {1}'.format(prc_name, data_by_thread_lst)

    for data in data_by_thread_lst:
        data_treatment(data, out_file, lock)

    print ' {0} End'.format(prc_name)

if __name__ == '__main__':

    data_lst = ['A', 'B', 'C', 'D', 'E', 'F', 'G']
    thread_cnt = 2
    out_file = 'mp_new_data.out'

    data_by_thread_lst = [data_lst[i:i + thread_cnt] for i in xrange(0, len(data_lst), thread_cnt)]

    # Suppression du fihier de sortie #

    if os.path.exists(out_file):
        os.remove(out_file)

    # Lancement des taches en Multithread #

    print '-> MP Start <-'

    proc_lst = []

    for data_lst in data_by_thread_lst:

        lock = mp.Lock()
        proc = mp.Process(
            target = process_treatment,
            args = (
                data_lst,
                out_file,
                lock,
            )
        )

        proc_lst.append(proc)
        proc.start()

    try:
        for proc in proc_lst:
            proc.join()

    except KeyboardInterrupt:
        print "<!> SIGINT Signal Received"

        for proc in proc_lst:
            proc.terminate()

    print '-> MP End <-'

Output

Voici la sortie du script:

-> MP Start <-
 Process-1 Start - Data : ['A', 'B']
   Data <A> Start
 Process-2 Start - Data : ['C', 'D']
   Data <C> Start
 Process-3 Start - Data : ['E', 'F']
   Data <E> Start
 Process-4 Start - Data : ['G']
   Data <G> Start
   Data <E> End
   Data <F> Start
   Data <G> End
 Process-4 End
   Data <A> End
   Data <B> Start
   Data <B> End
 Process-1 End
   Data <C> End
   Data <D> Start
   Data <F> End
 Process-3 End
   Data <D> End
 Process-2 End
-> MP End <-

Le fichier généré par le script (mp_new_data.out):

E++ (1)
G++ (1)
A++ (2)
B++ (1)
C++ (3)
F++ (2)
D++ (2)
Back to top