PNL | Processando listas em paralelo com execnet

| | | | | | | | | | | | | | | | | | | |

No código abaixo, os inteiros são simplesmente duplicados, qualquer cálculo limpo pode ser feito. Este é o módulo a ser executado pelo execnet. Ele recebe 2 tuplas (i, arg), assume que arg é um número e envia de volta (i, arg * 2).

Código:

if __ name__ = = `__channelexec__` :

para (i, arg) em canal:

channel.send ((i, arg * 2 ))

Para usar este módulo para dobrar cada item na lista, importe o módulo plists e chame plists .map() wi th um módulo remote_double e uma lista de inteiros para dobrar.

Código: Usando plist


import plists, remote_double

plists. map (remote_double, intervalo ( 10 ))

Saída:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

O mapa ( ) é definida em plists.py. É preciso um módulo puro, uma lista de argumentos e uma lista opcional de duas tuplas (spec, count). Por padrão, as especificações são usadas [(' popen ', 2)], o que significa que o usuário abrirá dois gateways e canais locais. Uma vez que esses pipes estão abertos, o usuário pode colocá-los em um loop itertools, que cria um iterador infinito que volta ao início assim que chega ao fim.

Agora cada argumento pode ser enviado como argumentos ao pipe para processamento e, como os canais são cíclicos, cada canal recebe uma distribuição quase igual de argumentos. Foi aí que eu entrei — a ordem em que os resultados são retornados é desconhecida, então i , como o índice de cada argumento na lista, é passado de e para o canal para que o usuário possa combinar os resultados na ordem original . Em seguida, aguarde os resultados com a fila de recebimento MultiChannel e insira-os em uma lista pré-preenchida com o mesmo tamanho dos argumentos originais. Após obter todos os resultados esperados, saia dos gateways e retorne os resultados conforme mostrado no código abaixo —

Código:


import itertools, execnet

def map (mod, args, specs = [( `popen` , 2 )]):

gateways = []

canais = []

para spec, count em specs:

para i em intervalo (contagem):

gw = execnet.makegateway (spec)

gateways.append (gw )

channels.append (gw.remote_exec (mod))


cyc = itertools.cycle (canais)


para i, arg em enumerar (args):

canal = próximo (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel (canais)

fila = mch.make_receive_queue()

l = len (args)

# cria uma lista de comprimento l,

# onde cada elemento é None

resultados = [ Nenhum ] * l

para i em intervalo (l):

canal, (i, resultado) = queue.get ()

resultados [i] = resultado

para gw em gateways:

gw.exit()

return resultados

Código: Aumente a paralelização alterando a especificação


listas. map (remote_double, intervalo ( 10 ), [( `popen` , 4 )])

Saída:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

No entanto, não mais paralelização significa necessariamente um processamento mais rápido. Depende dos recursos disponíveis, e quanto mais gateways e canais abertos, maior a sobrecarga. Idealmente, deve haver um gateway e um canal por núcleo de CPU para maximizar a utilização de recursos. Use plists.map() com qualquer módulo limpo desde que receba e envie de volta 2 tuplas, onde i é o primeiro elemento. Esse padrão é mais útil quando há muitos números que precisam ser processados para processá-los o mais rápido possível.