webentwicklung-frage-antwort-db.com.de

Python Multiprocessing mit verteiltem Cluster

Ich suche nach einem Python-Paket, das Multiprocessing nicht nur über verschiedene Kerne innerhalb eines einzelnen Computers, sondern auch mit einem Cluster, der auf mehrere Computer verteilt ist, ausführen kann. Es gibt viele verschiedene Python-Pakete für verteiltes Rechnen, aber die meisten scheinen eine Änderung des Codes zur Ausführung zu benötigen (zum Beispiel ein Präfix, das angibt, dass sich das Objekt auf einer Remote-Maschine befindet). Insbesondere möchte ich etwas so nah wie möglich an die Multiprozessor-pool.map-Funktion. Wenn das Skript beispielsweise auf einem einzelnen Computer lautet:

from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)

Dann wäre der Pseudocode für einen verteilten Cluster:

from distprocess import Connect, Pool, Cluster

pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)
23
Michael

Wenn Sie eine sehr einfache Lösung wünschen, gibt es keine.

Es gibt jedoch eine Lösung mit der multiprocessing-Schnittstelle - pathos -, die die Möglichkeit hat, Verbindungen zu Remote-Servern über eine parallele Map herzustellen und Multiprozessoren auszuführen.

Wenn Sie eine SSH-getunnelte Verbindung haben möchten, können Sie das tun… oder wenn Sie mit einer weniger sicheren Methode zufrieden sind, können Sie dies auch tun.

>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel       
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>> 
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
...   from time import sleep
...   sleep(1.0)
...   return x**2
... 
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Oder Sie könnten stattdessen für eine direkte Verbindung konfigurieren (keine SSH)

>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Es ist alles etwas pingelig, damit der Remote-Server funktionieren kann, müssen Sie zuvor einen Server unter remote.computer.com am angegebenen Port ausführen - und Sie müssen sicherstellen, dass sowohl die Einstellungen auf Ihrem localhost als auch auf dem Remote-Host zulässig sind entweder die direkte Verbindung oder die SSH-getunnelte Verbindung. Außerdem müssen Sie auf jedem Host dieselbe Version von pathos und des pathos-Abschnitts von pp ausführen. Für ssh muss außerdem ssh-agent ausgeführt werden, um die Anmeldung ohne Kennwort mit ssh zu ermöglichen.

Aber dann funktioniert es hoffentlich… wenn Ihr Funktionscode mit dill.source.importable zum entfernten Host übertragen werden kann.

Zu Ihrer Information, pathos ist eine Version längst überfällig, und im Grunde gibt es einige Fehler und Schnittstellenänderungen, die behoben werden müssen, bevor eine neue stabile Version abgeschnitten wird.

9
Mike McKerns

Etwas spät zur Party hier, aber da ich auch nach einer ähnlichen Lösung gesucht habe und diese Frage noch nicht als beantwortet markiert ist, dachte ich, ich würde meine Ergebnisse einbringen. 

Am Ende habe ich SCOOP verwendet. Sie bietet eine parallele Kartenimplementierung, die über mehrere Kerne und über mehrere Hosts hinweg funktionieren kann. Es kann auch auf die serielle map-Funktion von Python zurückgreifen, wenn dies während des Aufrufs gewünscht wird.

Auf der SCOOP-Einführungsseite werden die folgenden Funktionen aufgeführt:

SCOOP-Funktionen und -Vorteile gegenüber Futures, Multiprocessing und Ähnlichen Modulen lauten wie folgt:

  • Nutzen Sie die Leistung mehrerer Computer über das Netzwerk.
  • Fähigkeit, mehrere Aufgaben innerhalb einer Aufgabe zu erzeugen;
  • API-kompatibel mit PEP-3148 ;
  • Parallelisieren des Seriencodes mit nur geringfügigen Modifikationen;
  • Effiziente Lastverteilung.

Es hat einige Macken (Funktionen/Klassen müssen pickleable sein), und das Setup für einen reibungslosen Ablauf auf mehreren Hosts kann mühsam sein, wenn nicht alle dasselbe Dateisystemschema verwenden, aber insgesamt bin ich mit den Ergebnissen sehr zufrieden . Für unsere Zwecke bietet Numpy & Cython eine hervorragende Leistung.

Hoffe das hilft. 

8
bazel

Ich würde vorschlagen, einen Blick auf Ray zu werfen, um genau das zu tun.

Ray verwendet dieselbe Syntax zum Parallelisieren von Code in der Multicore-Einstellung für eine Maschine wie in der verteilten Einstellung. Wenn Sie bereit sind, anstelle eines Kartenanrufs eine for-Schleife zu verwenden, würde Ihr Beispiel folgendermaßen aussehen.

import ray
import time

ray.init()

@ray.remote
def function(x):
    time.sleep(0.1)
    return x

arglist = [1, 2, 3, 4]

result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)

Dadurch werden vier Tasks parallel ausgeführt, wobei jedoch viele Kerne vor Ort verwendet werden. Um dasselbe Beispiel in einem Cluster auszuführen, würde sich als einzige Zeile der Aufruf von ray.init() ändern. Die entsprechende Dokumentation finden Sie hier .

Beachten Sie, dass ich an der Entwicklung von Ray arbeite.

8

Haben Sie in Disco gesucht?

Eigenschaften:

  • Karte/Paradigma reduzieren
  • Python-Programmierung
  • Verteilte gemeinsam genutzte Festplatte
  • ssh unterliegt dem Transport
  • web- und Konsolenschnittstellen
  • ein Knoten kann leicht hinzugefügt/blockiert/gelöscht werden
  • master launch slaves Knoten ohne Benutzereingriff
  • slaves Knoten werden bei einem Ausfall automatisch neu gestartet
  • Schöne Dokumentation. Nach dem Install Guide konnte ich in wenigen Minuten einen 2-Computer-Cluster starten (das einzige, was ich tun musste, war das Erstellen eines $ DISCO_HOME/root-Ordners, um eine Verbindung zur WebUI herzustellen, denke ich aufgrund des Protokolls Dateifehlererstellung).

Ein einfaches Beispiel aus der Dokumentation der Diskothek:

from disco.core import Job, result_iterator

def map(line, params):
    for Word in line.split():
        yield Word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for Word, counts in kvgroup(sorted(iter)):
        yield Word, sum(counts)

if __== '__main__':
    job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
                    map=map,
                    reduce=reduce)
    for Word, count in result_iterator(job.wait(show=True)):
        print(Word, count)
0