NLP | Parallel list processing with execnet

In the code below, the integers are simply doubled, any clean computation can be done. This is the module to be executed by execnet. It receives 2 tuples (i, arg), assumes arg is a number, and sends back (i, arg * 2).

Code :

if __ name__ = = `__channelexec__` :

  for (i, arg) in channel:

channel.send ((i, arg * 2 ))

To use this module to double each item in the list, import the plists module and call plists .map () with a remote_double module and a list of integers to double.

Code: Using plist

import plists, remote_double

plists. map (remote_double, range ( 10 ))

Output:

 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]  

The map () function is defined in plists.py. It takes a pure module, an argument list, and an optional two-tuple list of (spec, count). By default, the specs are used [(& # 39; popen & # 39 ;, 2)], which means that the user will open two local gateways and channels. Once these pipes are open, the user can place them in an itertools loop, which creates an infinite iterator that goes back to the beginning as soon as it reaches the end.

Now each argument can be sent as arguments to the pipe for processing , and since the channels are cyclical, each channel receives an almost equal distribution of arguments. That`s where I came in — the order in which the results are returned is unknown, so i , as the index of each argument in the list, is passed to and from the channel so that the user can combine the results in the original order. Then wait for the results with the MultiChannel receive queue and insert them into a prepopulated list of the same length as the original arguments. After getting all expected results, exit the gateways and return the results as shown in the code below —

Code :

import itertools, execnet

def map (mod, args, specs = [( `popen` , 2 )]):

  gateways = []

  channels = []

  

  for spec, count in specs:

for i in range (count):

gw = execnet.makegateway (spec)

gateways.append (gw)

channels.append (gw.remote_exec (mod))

 

cyc = itertools.cycle (channels)

 

  for i, arg in enumerate (args):

  channel = next (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel (channels)

queue = mch.make_receive_queue ()

  l = len (args)

  # creates a list of length l,

# where each element is None

results = [ None ] *

 

for i in range (l):

  channel, (i, result) = queue.get ()

results [i] = result

  

  for gw in gateways:

gw.exit ()

  return results

Code: Increase parallelization by changing the spec

plists. map (remote_double, range ( 10 ), [( `popen` , 4 < / code> )])

Output:

 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

However, no more parallelization necessarily means faster processing. It depends on the resources available, and the more gateways and channels open, the more overhead. Ideally, there should be one gateway and channel per CPU core to maximize resource utilization. Use plists.map () with any clean module as long as it receives and sends back 2 tuples, where i is the first element. This pattern is most useful when there are many numbers that need to be processed in order to process them as quickly as possible.