Gibt es in der Python-Multiprocessing-Bibliothek eine Variante von pool.map, die mehrere Argumente unterstützt?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __== '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
Die Antwort darauf ist versions- und situationsabhängig. Die allgemeinste Antwort für die neuesten Versionen von Python (seit 3.3) wurde zuerst von J.F. Sebastian beschrieben.1 Sie verwendet die Methode Pool.starmap
, die eine Folge von Argumenttupeln akzeptiert. Dann entpackt er automatisch die Argumente aus jedem Tuple und übergibt sie an die angegebene Funktion:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __== '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
Für frühere Versionen von Python müssen Sie eine Hilfsfunktion schreiben, um die Argumente explizit zu entpacken. Wenn Sie with
verwenden möchten, müssen Sie auch einen Wrapper schreiben, um aus Pool
einen Kontextmanager zu machen. (Danke an muon für diesen Hinweis.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __== '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
In einem einfacheren Fall können Sie mit einem festen zweiten Argument auch partial
verwenden, jedoch nur in Python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __== '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Vieles davon war von seiner Antwort inspiriert, die wahrscheinlich stattdessen hätte akzeptiert werden müssen. Da dieser jedoch an der Spitze steckt, schien es für zukünftige Leser am besten, ihn zu verbessern.
gibt es eine Variante von pool.map, die mehrere Argumente unterstützt?
Python 3.3 enthält pool.starmap()
-Methode :
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, Zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
Für ältere Versionen:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
1 1
2 1
3 1
Beachten Sie, wie hier itertools.izip()
und itertools.repeat()
verwendet werden.
Aufgrund von dem von @unutbu genannten Fehler können Sie functools.partial()
oder ähnliche Funktionen in Python 2.6 nicht verwenden. Daher sollte die einfache Wrapperfunktion func_star()
explizit definiert werden. Siehe auch die Problemumgehungvorgeschlagen von uptimebox
.
Ich denke, das Folgende wird besser sein
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __== "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
ausgabe
[3, 5, 7]
Python 3.3+ verwenden mit pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, Zip(a,b))
pool.close()
pool.join()
Ergebnis:
1 --- 4
2 --- 5
3 --- 6
Wenn Sie möchten, können Sie auch weitere Argumente mit Zip () packen: Zip(a,b,c,d,e)
Wenn Sie einen konstanten Wert als Argument übergeben möchten, müssen Sie beispielsweise import itertools
und dann Zip(itertools.repeat(constant), a)
verwenden.
Nachdem ich die itertools in JF Sebastian answer kennen gelernt hatte, entschied ich mich, einen Schritt weiter zu gehen und ein parmap
-Paket zu schreiben, das sich um die Parallelisierung kümmert und map
- und starmap
-Funktionen für Python-2.7 und Python-3.2 (und später auch) anbietet kann eine beliebige Anzahl von Positionsargumenten annehmen.
Installation
pip install parmap
Parallelisieren:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in Zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, Zip(listx, listy), param1, param2)
Ich habe Parmap in PyPI und in ein github-Repository hochgeladen.
Als Beispiel kann die Frage wie folgt beantwortet werden:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __== "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
Es gibt eine Verzweigung von multiprocessing
mit dem Namen pathos (note: Benutze die Version von github), die starmap
nicht benötigt. Mit pathos
können Sie in der Regel auch Multiprozessoren im Interpreter ausführen, anstatt im __main__
-Block hängen zu bleiben. Pathos ist nach einer geringfügigen Aktualisierung für ein Release fällig - hauptsächlich Konvertierung in Python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
Sie können die folgenden zwei Funktionen verwenden, um zu vermeiden, dass für jede neue Funktion ein Wrapper geschrieben wird:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return Zip(itertools.repeat(function), Zip(*args))
Verwenden Sie die Funktion function
mit der Liste der Argumente arg_0
, arg_1
und arg_2
wie folgt:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Eine bessere Lösung für Python2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
aus[]:
[3, 5, 7]
Eine weitere einfache Alternative besteht darin, Ihre Funktionsparameter in einen Tupel zu packen und dann die Parameter, die in Tupeln übergeben werden sollen, zu verpacken. Dies ist möglicherweise nicht ideal, wenn Sie mit großen Datenmengen arbeiten. Ich glaube, es würde Kopien für jeden Tupel machen.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __== '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
Gibt die Ausgabe in einer zufälligen Reihenfolge aus:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Besser ist es, decorator zu verwenden, anstatt die wrapper-Funktion von Hand zu schreiben. Insbesondere wenn Sie viele Funktionen für die Zuordnung haben, spart der Decorator Zeit, da er für jede Funktion keinen Wrapper benötigt. Normalerweise ist eine dekorierte Funktion nicht wählbar, wir können jedoch functools
verwenden, um sie zu umgehen. Weitere Diskrepanzen finden Sie hier .
Hier das Beispiel
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
Dann können Sie es mit gezippten Argumenten zuordnen
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, Zip(xlist, ylist))
pool.close()
pool.join()
Natürlich können Sie Pool.starmap
in Python 3 (> = 3.3) immer verwenden, wie in anderen Antworten erwähnt.
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __== "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
Eine andere Möglichkeit besteht darin, eine Liste von Listen an eine Ein-Argument-Routine zu übergeben:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
Man kann dann eine Liste mit Argumenten nach seiner bevorzugten Methode erstellen.
Mit python 3.4.4 können Sie mit multiprocessing.get_context () ein Kontextobjekt abrufen, um mehrere Startmethoden zu verwenden:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __== '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
Oder einfach nur ersetzen
pool.map(harvester(text,case),case, 1)
durch:
pool.apply_async(harvester(text,case),case, 1)
Hier gibt es viele Antworten, aber keine scheint Python 2/3-kompatiblen Code bereitzustellen, der für jede Version geeignet ist. Wenn Sie möchten, dass Ihr Code nur funktioniert, funktioniert dies für beide Python-Versionen:
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
Danach können Sie die Mehrfachverarbeitung auf die normale Python 3-Methode anwenden, ganz wie Sie möchten. Zum Beispiel:
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
funktioniert in Python 2 oder Python 3.
text = "test"
def unpack(args):
return args[0](*args[1:])
def harvester(text, case):
X = case[0]
text+ str(X)
if __== '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
# args is a list of tuples
# with the function to execute as the first item in each Tuple
args = [(harvester, text, c) for c in case]
# doing it this way, we can pass any function
# and we don't need to define a wrapper for each different function
# if we need to use more than one
pool.map(unpack, args)
pool.close()
pool.join()
In der offiziellen Dokumentation heißt es, dass nur ein iterierbares Argument unterstützt wird. In solchen Fällen benutze ich gerne apply_async. In deinem Fall würde ich tun:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __== '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
Dies ist ein Beispiel der Routine, die ich verwende, um mehrere Argumente an eine Ein-Argument-Funktion zu übergeben, die in einer pool.imap fork verwendet wird:
from multiprocessing import Pool
# Wrapper of the function to map:
class makefun:
def __init__(self, var2):
self.var2 = var2
def fun(self, i):
var2 = self.var2
return var1[i] + var2
# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]
# Open the pool:
pool = Pool(processes=2)
# Wrapper loop
for j in range(len(var2)):
# Obtain the function to map
pool_fun = makefun(var2[j]).fun
# Fork loop
for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
print(var1[i], '+' ,var2[j], '=', value)
# Close the pool
pool.close()