在下面的代碼中,整數只是簡單地加倍,可以完成任何干淨的計算。這是由 execnet 執行的模塊。它接收 2 個元組 (i, arg),假設 arg 是一個數字,然後發回 (i, arg * 2)。
代碼:
|
使用此模塊將列表中的每個項目加倍,導入 plists 模塊並調用 plists .map () wi一個 remote_double 模塊和一個要加倍的整數列表。
代碼:使用 plist
import
plists,remote_double
plists。
map
(remote_double,
range
(
10
))
輸出:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
地圖( ) 函數在 plists.py 中定義。它需要一個純模塊、一個參數列表和一個可選的 (spec, count) 的二元組列表。默認使用specs [(' popen ', 2)],表示用戶將打開兩個本地網關和通道。一旦這些管道打開,用戶可以將它們放置在一個 itertools 循環中,這會創建一個無限迭代器,一旦它到達末尾就會返回到開頭。
現在每個參數都可以作為參數發送到管道進行處理,並且由於通道是循環的,每個通道接收到幾乎相等的參數分佈。這就是我進來的地方 —結果返回的順序是未知的,所以 i 作為列表中每個參數的索引,傳入和傳出通道,以便用戶可以按原始順序組合結果.然後等待 MultiChannel 接收隊列的結果,並將它們插入到與原始參數長度相同的預填充列表中。得到所有預期結果後,退出網關並返回結果如下面代碼所示 —
代碼:
import
itertools, execnet
def
map
(mod, args, specs
=
[(
`popen`
,
2
)]):
網關
<代碼類 ="keyword "> = []
頻道
=
[]
for
規範,計數
in
規範:
for
i
in
range
(count):
gw
=
execnet.makegateway (spec)
gateways.append (gw )
channels.append (gw.remote_exec (mod))
cyc
=
itertools.cycle (channels)
for
i, arg
in
枚舉
(args):
channel
=
下一個
(cyc )
channel.send ((i, arg))
mch
=
execnet.MultiChannel(頻道)
隊列
=
mch.make_receive_queue()
l
=
len
(args)
# 創建一個長度為 l 的列表,
# 其中每個元素都是無
結果
=
None
]
*
l
for
i
in
range
(l):
channel, (i, result)
=
queue.get ()
結果 [i]
=
result
for
gw
in
網關:
gw.exit()
return
結果
代碼:通過更改規范增加並行化
plists。
map
(remote_double,
range
(
10
), [(
`popen`
,
4
)])
輸出:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
然而,沒有更多的並行化必然意味著更快的處理。這取決於可用的資源,打開的網關和通道越多,開銷就越大。理想情況下,每個 CPU 內核應該有一個網關和通道,以最大限度地利用資源。 plists.map () 與任何干淨的模塊一起使用,只要它接收和發送回 2 個元組,其中 i 是第一個元素。當需要處理許多數字以便盡快處理它們時,此模式最有用。