NLP | Traitement de liste parallèle avec execnet

| | | | | | | | | | | | | | | | | | | |

Dans le code ci-dessous, les nombres entiers sont simplement doublés, tout calcul propre peut être fait. C`est le module à exécuter par execnet. Il reçoit 2 tuples (i, arg), suppose que arg est un nombre et renvoie (i, arg * 2).

Code :

si __ name__ = = `__channelexec__`  :

pour (i, arg) dans canal :

channel.send ((i, arg * 2 ))

À utiliser ce module pour doubler chaque élément de la liste, importez le module plists et appelez plists .map () wi th un module remote_double et une liste d`entiers à doubler.

Code : Utilisation de plist


import plists, remote_double

plists. map (remote_double, range ( 10 ))

Sortie :

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

La carte ( ) La fonction est définie dans plists.py. Il prend un module pur, une liste d`arguments et une liste optionnelle à deux tuples de (spec, count). Par défaut, les spécifications sont utilisées [(' popen ', 2)], ce qui signifie que l`utilisateur ouvrira deux passerelles et canaux locaux. Une fois ces tubes ouverts, l`utilisateur peut les placer dans une boucle itertools, ce qui crée un itérateur infini qui remonte au début dès qu`il atteint la fin.

Désormais, chaque argument peut être envoyé en tant qu`arguments au tube de traitement , et puisque les canaux sont cycliques, chaque canal reçoit une distribution presque égale d`arguments. C`est là que je est intervenu — l`ordre dans lequel les résultats sont renvoyés est inconnu, donc i , en tant qu`index de chaque argument de la liste, est transmis vers et depuis le canal afin que l`utilisateur puisse combiner les résultats dans l`ordre d`origine . Attendez ensuite les résultats avec la file d`attente de réception MultiChannel et insérez-les dans une liste préremplie de la même longueur que les arguments d`origine. Après avoir obtenu tous les résultats attendus, quittez les passerelles et renvoyez les résultats comme indiqué dans le code ci-dessous —

Code :


import itertools, execnet

def map (mod, args, specs = [( `popen` , 2 )]) :

passerelles = []

canaux = []

for spec, count in specs :

pour i dans range (count):

gw = execnet.makegateway (spec)

gateways.append (gw )

channels.append (gw.remote_exec (mod))


cyc = itertools.cycle (canaux)


for i, arg in énumérer (args) :

channel = suivant (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel (canaux)

file d`attente = mch.make_receive_queue ()

l = len (args)

# crée une liste de longueur l,

# où chaque élément est None

résultats = [ Aucun ] * l

for i dans plage (l) :

channel, (i, result) = queue.get ()

résultats [i] = résultat

pour gw dans passerelles :

gw.exit ()

retour résultats

Code : augmentez la parallélisation en modifiant la spécification


plistes. map (remote_double, range ( 10 ), [( `popen` , 4 )])

Sortie :

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

Cependant, plus de parallélisation signifie nécessairement un traitement plus rapide. Cela dépend des ressources disponibles, et plus il y a de passerelles et de canaux ouverts, plus la surcharge est importante. Idéalement, il devrait y avoir une passerelle et un canal par cœur de processeur pour optimiser l`utilisation des ressources. Utilisez plists.map () avec n`importe quel module propre tant qu`il reçoit et renvoie 2 tuples, où i est le premier élément. Ce modèle est particulièrement utile lorsqu`il y a de nombreux nombres à traiter afin de les traiter le plus rapidement possible.