Python - Multiprocess
Class/Method
Pool
La classe Pool permet de créer un pool de processus. Elle prend en argument le nombre de processus en parallèle. Si aucune valeur n'est entrée elle prendra la valeur de la fonction multiprocessing.cpu_count().
import time
import multiprocessing as mp
def f(x):
time.sleep(1)
print x*x
if __name__ == '__main__':
p = mp.Pool(5)
p.map(f, range(1, 10))
Process
La classe Process permet de créer un processus qui sera démarré avec la fonction start(). La classe prend en argument la fonction cible ainsi que les arguments envoyés à la fonction. La fonction join() sert à attendre la fin du processus.
import multiprocessing as mp
def f(name):
print 'hello {0}'.format(name)
if __name__ == '__main__':
p = mp.Process(target=f, args=('LoKo',))
p.start()
p.join()
Queue
La classe Queue permet d'envoyer et de récupérer des informations depuis le processus. Pour ce faire l'on utilise les fonctions get() et put().
import multiprocessing as mp
import time
def f(q, name):
q.put('Hello {0}'.format(name))
if __name__ == '__main__':
p_lst = []
q_lst = []
for n in ['LoKo', 'SangoKu', 'Trunk']:
q = mp.Queue()
p = mp.Process(target=f, args=(q, n,))
p.start()
p_lst.append(p)
q_lst.append(q.get())
for p in p_lst:
p.join()
print q_lst
Output
Exemples
MP avec Lock
Code
import random
import time
import os
import signal
import multiprocessing as mp
def data_treatment(data, out_file, lock):
print ' Data <{0}> Start'.format(data)
rnd_int = random.randint(1, 3)
time.sleep(rnd_int)
with lock:
with open(out_file, 'a', 0) as f:
f.write('{0}++ ({1})\n'.format(data, rnd_int))
print ' Data <{0}> End'.format(data)
def process_treatment(data_by_thread_lst, out_file, lock):
signal.signal(signal.SIGINT, signal.SIG_IGN)
prc_name = mp.current_process().name
print ' {0} Start - Data : {1}'.format(prc_name, data_by_thread_lst)
for data in data_by_thread_lst:
data_treatment(data, out_file, lock)
print ' {0} End'.format(prc_name)
if __name__ == '__main__':
data_lst = ['A', 'B', 'C', 'D', 'E', 'F', 'G']
thread_cnt = 2
out_file = 'mp_new_data.out'
data_by_thread_lst = [data_lst[i:i + thread_cnt] for i in xrange(0, len(data_lst), thread_cnt)]
# Suppression du fihier de sortie #
if os.path.exists(out_file):
os.remove(out_file)
# Lancement des taches en Multithread #
print '-> MP Start <-'
proc_lst = []
for data_lst in data_by_thread_lst:
lock = mp.Lock()
proc = mp.Process(
target = process_treatment,
args = (
data_lst,
out_file,
lock,
)
)
proc_lst.append(proc)
proc.start()
try:
for proc in proc_lst:
proc.join()
except KeyboardInterrupt:
print "<!> SIGINT Signal Received"
for proc in proc_lst:
proc.terminate()
print '-> MP End <-'
Output
Voici la sortie du script:
-> MP Start <-
Process-1 Start - Data : ['A', 'B']
Data <A> Start
Process-2 Start - Data : ['C', 'D']
Data <C> Start
Process-3 Start - Data : ['E', 'F']
Data <E> Start
Process-4 Start - Data : ['G']
Data <G> Start
Data <E> End
Data <F> Start
Data <G> End
Process-4 End
Data <A> End
Data <B> Start
Data <B> End
Process-1 End
Data <C> End
Data <D> Start
Data <F> End
Process-3 End
Data <D> End
Process-2 End
-> MP End <-
Le fichier généré par le script (mp_new_data.out):