NLP | Elaborazione di elenchi in parallelo con execnet

| | | | | | | | | | | | | | | | | | | |

Nel codice seguente, gli interi vengono semplicemente raddoppiati, è possibile eseguire qualsiasi calcolo pulito. Questo è il modulo che deve essere eseguito da execnet. Riceve 2 tuple (i, arg), presume che arg sia un numero e rimanda (i, arg * 2).

Codice:

if __ nome__ = = `__channelexec__` :

per (i, arg) in canale:

channel.send ((i, arg * 2 ))

Da usare questo modulo per raddoppiare ogni elemento nell`elenco, importare il modulo plists e chiamare plists .map() wi un modulo remote_double e un elenco di interi da raddoppiare.

Codice: utilizzo di plist


import plist, remote_double

plist. mappa (remote_double, range ( 10 ))

Risultato:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

La mappa ( ) è definita in plists.py. Richiede un modulo puro, un elenco di argomenti e un elenco opzionale di due tuple di (spec, count). Per impostazione predefinita, vengono utilizzate le specifiche [(' popen ', 2)], il che significa che l`utente aprirà due gateway e canali locali. Una volta aperte queste pipe, l`utente può inserirle in un ciclo itertools, che crea un iteratore infinito che torna all`inizio non appena raggiunge la fine.

Ora ogni argomento può essere inviato come argomento alla pipe per l`elaborazione e, poiché i canali sono ciclici, ogni canale riceve una distribuzione quasi uguale di argomenti. Ecco dove sono arrivato io — l`ordine in cui vengono restituiti i risultati è sconosciuto, quindi i , come indice di ogni argomento nell`elenco, viene passato da e verso il canale in modo che l`utente possa combinare i risultati nell`ordine originale . Quindi attendi i risultati con la coda di ricezione multicanale e inseriscili in un elenco precompilato della stessa lunghezza degli argomenti originali. Dopo aver ottenuto tutti i risultati attesi, esci dai gateway e restituisci i risultati come mostrato nel codice seguente —

Codice:


import itertools, execnet

def mappa (mod, args, specifiche = [( `popen` , 2 )]):

gateway = []

canali = []

for spec, count in specifiche:

for i in range (conteggio):

gw = execnet.makegateway (spec)

gateways.append (gw )

channels.append (gw.remote_exec (mod))


cyc = itertools.cycle (canali)


for i, arg in enumera (args):

canale = next (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel (canali)

queue = mch.make_receive_queue () >

l = len (args)

# crea un elenco di lunghezza l,

# dove ogni elemento è Nessuno

risultati = [ Nessuno ] * l

for i in intervallo (l):

canale, (i, risultato) = queue.get()

results [i] = risultato

for gw in gateway:

gw.exit()

return risultati

Codice: aumenta la parallelizzazione modificando le specifiche


liste. mappa (remote_double, range ( 10 ), [( `popen` , 4 )])

Risultato:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

Tuttavia, non più parallelizzazione significa necessariamente un`elaborazione più rapida. Dipende dalle risorse disponibili e più gateway e canali si aprono, maggiore è il sovraccarico. Idealmente, dovrebbe esserci un gateway e un canale per core CPU per massimizzare l`utilizzo delle risorse. Usa plists.map() con qualsiasi modulo pulito purché riceva e rispedisca 2 tuple, dove i è il primo elemento. Questo schema è particolarmente utile quando ci sono molti numeri che devono essere elaborati per elaborarli il più rapidamente possibile.