自然語言處理 |與 execnet 並行處理列表

| | | | | | | | | | | | | | | | | | | |

在下面的代碼中,整數只是簡單地加倍,可以完成任何干淨的計算。這是由 execnet 執行的模塊。它接收 2 個元組 (i, arg),假設 arg 是一個數字,然後發回 (i, arg * 2)。

代碼:

if __ name__ = = `__channelexec__` :

for (i, arg) in 頻道:

channel.send ((i, arg * 2 ))

使用此模塊將列表中的每個項目加倍,導入 plists 模塊並調用 plists .map () wi一個 remote_double 模塊和一個要加倍的整數列表。

代碼:使用 plist


import plists,remote_double

plists。 map (remote_double, range ( 10 ))

輸出:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

地圖( ) 函數在 plists.py 中定義。它需要一個純模塊、一個參數列表和一個可選的 (spec, count) 的二元組列表。默認使用specs [(' popen ', 2)],表示用戶將打開兩個本地網關和通道。一旦這些管道打開,用戶可以將它們放置在一個 itertools 循環中,這會創建一個無限迭代器,一旦它到達末尾就會返回到開頭。

現在每個參數都可以作為參數發送到管道進行處理,並且由於通道是循環的,每個通道接收到幾乎相等的參數分佈。這就是進來的地方 —結果返回的順序是未知的,所以 i 作為列表中每個參數的索引,傳入和傳出通道,以便用戶可以按原始順序組合結果.然後等待 MultiChannel 接收隊列的結果,並將它們插入到與原始參數長度相同的預填充列表中。得到所有預期結果後,退出網關並返回結果如下面代碼所示 —

代碼:


import itertools, execnet

def map (mod, args, specs = [( `popen` , 2 )]):

網關 <代碼類 ="keyword "> = []

頻道 = []

for 規範,計數 in 規範:

for i in range (count):

gw = execnet.makegateway (spec)

gateways.append (gw )

channels.append (gw.remote_exec (mod))


cyc = itertools.cycle (channels)


for i, arg in 枚舉 (args):

channel = 下一個 (cyc )

channel.send ((i, arg))

mch = execnet.MultiChannel(頻道)

隊列 = mch.make_receive_queue()

l = len (args)

# 創建一個長度為 l 的列表,

# 其中每個元素都是無

結果 = [ None ] * l

for i in range (l):

channel, (i, result) = queue.get ()

結果 [i] = result

for gw in 網關:

gw.exit()

return 結果

代碼:通過更改規范增加並行化


plists。 map (remote_double, range ( 10 ), [( `popen` , 4 )])

輸出:

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

然而,沒有更多的並行化必然意味著更快的處理。這取決於可用的資源,打開的網關和通道越多,開銷就越大。理想情況下,每個 CPU 內核應該有一個網關和通道,以最大限度地利用資源。 plists.map () 與任何干淨的模塊一起使用,只要它接收和發送回 2 個元組,其中 i 是第一個元素。當需要處理許多數字以便盡快處理它們時,此模式最有用。