NLP | Parallele Listenverarbeitung mit execnet

| | | | | | | | | | | | | | | | | | | |

Im folgenden Code werden die Ganzzahlen einfach verdoppelt, es kann jede saubere Berechnung durchgeführt werden. Dies ist das Modul, das von execnet ausgeführt werden soll. Es empfängt 2 Tupel (i, arg), nimmt an, dass arg eine Zahl ist, und sendet zurück (i, arg * 2).

Code:

if __ name__ = = `__channelexec__` :

für (i, arg) im Kanal:

channel.send ((i, arg * 2 ))

Zu verwenden dieses Modul, um jedes Element in der Liste zu verdoppeln, importieren Sie das Modul plists und rufen Sie plists .map () wi auf th ein remote_double-Modul und eine Liste von zu verdoppelnden Ganzzahlen.

Code: Using plist


import plists, remote_double

plists. map (remote_double, range ( 10 ))

Ausgabe:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

Die Karte ( ) Funktion ist in plists.py definiert. Es benötigt ein reines Modul, eine Argumentliste und eine optionale Zwei-Tupel-Liste von (spec, count). Standardmäßig werden die Spezifikationen [(' popen ', 2)] verwendet, was bedeutet, dass der Benutzer zwei lokale Gateways und Kanäle öffnet. Sobald diese Pipes geöffnet sind, kann der Benutzer sie in eine Itertools-Schleife einfügen, wodurch ein unendlicher Iterator erstellt wird, der zum Anfang zurückkehrt, sobald er das Ende erreicht.

Jetzt kann jedes Argument als Argument gesendet werden an die Pipe zur Verarbeitung , und da die Kanäle zyklisch sind, erhält jeder Kanal eine fast gleiche Verteilung von Argumenten. Hier kam ich ins Spiel — Die Reihenfolge, in der die Ergebnisse zurückgegeben werden, ist unbekannt, daher wird i als Index jedes Arguments in der Liste an den und aus dem Kanal übergeben, sodass der Benutzer die Ergebnisse in der ursprünglichen Reihenfolge kombinieren kann . Warten Sie dann mit der MultiChannel-Empfangswarteschlange auf die Ergebnisse und fügen Sie sie in eine vorbelegte Liste ein, die dieselbe Länge wie die ursprünglichen Argumente hat. Nachdem Sie alle erwarteten Ergebnisse erhalten haben, verlassen Sie die Gateways und geben Sie die Ergebnisse wie im folgenden Code gezeigt zurück —

Code:


import itertools, execnet

def map (mod, args, specs = [( `popen` , 2 )]):

Gateways = []

Kanäle = []

für spec, count in specs:

für i in Bereich (count):

gw = execnet.makegateway (spec)

gateways.append (gw )

channels.append (gw.remote_exec (mod))


cyc = itertools.cycle (Kanäle)


für i, arg in aufzählen (args):

channel = weiter (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel (Kanäle)

Warteschlange = mch.make_receive_queue ()

l = len (args)

# erstellt eine Liste der Länge l,

# wobei jedes Element None ist

Ergebnisse = [ None ] * l

für i im Bereich (l):

channel, (i, result) = queue.get ()

Ergebnisse [i] = Ergebnis

für gw in Gateways:

gw.exit ()

return Ergebnisse

Code: Erhöhen Sie die Parallelisierung durch Änderung der Spezifikation


plisten. map (remote_double, range ( 10 ), [( `popen` , 4 )])

Ausgabe:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

Aber keine Parallelisierung mehr bedeutet zwangsläufig eine schnellere Verarbeitung. Dies hängt von den verfügbaren Ressourcen ab, und je mehr Gateways und Kanäle geöffnet sind, desto höher ist der Overhead. Idealerweise sollte es ein Gateway und einen Kanal pro CPU-Kern geben, um die Ressourcennutzung zu maximieren. Verwenden Sie plists.map () mit jedem sauberen Modul, solange es 2 Tupel empfängt und zurücksendet, wobei i das erste Element ist. Dieses Muster ist am nützlichsten, wenn viele Zahlen verarbeitet werden müssen, um sie so schnell wie möglich zu verarbeiten.