Synchronization and process pooling in Python

Python Methods and Functions

Prerequisite — Multiprocessing in Python | Set 2
This article discusses two important concepts related to multiprocessing in Python:

  • Cross-Process Synchronization
  • Process Concatenation

Cross-Process Synchronization

Process Synchronization is defined as a mechanism that ensures that two or more parallel processes are not concurrently execute any specific program segment, known as a critical section .

Critical section refers to the parts of the program where the shared resource is accessed.

For example, in the diagram below, 3 processes are trying to access a Share or Critical Section at the same time.

Simultaneous access to a share may result in a race condition .

A race condition occurs when two or more processes can access shared data and they try to change it at the same time. As a result, the values ​​of variables may be unpredictable and vary depending on the timings of context switches of the processes.

Consider the program below to understand the concept of a race condition:

# Python program for illustration
# concept of race condition
# in multiprocessing

import multiprocessing

 
# function for withdrawing from account

def withdraw (balance): 

for _ in range ( 10000 ):

balance.value = balance.value - 1

 
# function for deposits

def deposit (balance): 

for _ in range ( 10000 ):

balance.value = balance.value + 1

 

def perform_transactions ():

 

# initial balance (in shared memory)

balance = multiprocessing.Value ( 'i' , 100 )

 

# create new processes

  p1 = multiprocessing.Process (target = withdraw, args = (balance,))

  p2 = multiprocessing.Process ( target = deposit, args = (balance,))

 

# starting processes

p1.start ( )

p2.start ()

 

# wait for the processes to finish

p1.join ()

  p2.join ()

 

# print final balance

  print ( "Final balance = {}" . format (balance.value))

 

if __ name__ = = "__ main__" :

for _ in range ( 10 ):

 

# execute one and the same same transaction process 10 times

perform_transactions ()

If you run the program above, you will get some unexpected values, such as:

 Final balance = 1311 Final balance = 199 Final balance = 558 Final balance = -2265 Final balance = 1371 Final balance = 1158 Final balance = -577 Final balance = -1300 Final balance = -341 Final balance = 157 

In the above program, 10,000 withdrawals and 10,000 deposit transactions are executed with an initial balance of 100. The expected final balance is 100, but what we get in 10 iterations of the execute_transactions function is — these are slightly different values.

This is due to the simultaneous access of processes to the general balance of data. This unpredictability in book value — nothing more than a race condition .

Let's try to understand this better using the sequence diagrams below. These are different sequences that can be obtained in the above example for one withdrawal and deposit action.

  • This is a possible sequence that gives the wrong answer, since both processes read the same value and write it accordingly.

    <>
    p1 p2 balance
    read (balance)
    current = 100
    100
    read (balance)
    current = 100
    100
    balance = current-1b> 99 / write (balance) 99
    balance = current + 1 = 101
    write (balance) td>
    101

  • These are two possible sequences that are desired in the above scenario.

    <>
    p1 p2 balance
    read (balance)
    current = 100
    100
    balance = current-1 = 99
    write(balance)
    99
    = "amp td"> = "amp td" -wp-inline-2f0158eb062d1ac553a7edcb8a744628 "> read (balan ce)
    current = 99
    99
    100

    class = "amp-wp-inline-2f0158eb062d1ac553a7edcb8a744628">
    th> p2 balance
    read (balance)
    current = 100 current = 100 >
    100
    write (balance) 101
    read (balance)
    current = 101
    101
    100

Using locks

Multiprocessor module provides a class Lock to handle race conditions. Blocking is implemented using a semaphore provided by the operating system.

A semaphore is a synchronization object that controls access by multiple processes to a common resource in a parallel programming environment. It is simply a value in a designated place in operating system (or kernel) storage that each process can check and then change. Depending on the value that is found, the process can use the resource or will find that it is already in use and must wait for some period before trying again. Semaphores can be binary (0 or 1) or can have additional values. Typically, a process using semaphores checks the value and then, if it using the resource, changes the value to reflect this so that subsequent semaphore users will know to wait.

Consider an example below:

# Python program for illustration
# concept of locks
# in multiprocessor

import multiprocessing

 
# function for withdrawing from the account

def withdraw (balance, lock): 

for _ in range ( 10000 ):

lock.acquire ()

balance.value = balance.value - 1

lock.release ( )

 
# function for depositing

def deposit (balance, lock): 

for _ in range ( 10000 ):

  lock.acquire ()

balance.value = balance.value + 1

  lock.release ()

 

def perform_transactions ():

 

# initial balance (in shared memory)

balance = multiprocessing.Value ( ' i' , 100 )

 

  # create blocking object

  lock = multiprocessing.Lock ()

  

  # create new processes

p1 = multiprocessing.Process (target = withdraw, args = (balance, lock))

p2 = multiprocessing.Process (target = deposit , args = (balance, lock))

 

# start processes

p1.start ()

p2.start ()

  

# wait for processes to finish

p1.join ()

  p2.join ()

  

  # print the final balance

print ( "Final balance = {}" . f ormat (balance.value))

 

if __ name__ = = "__ main__" :

for _ in range ( 10 ):

 

# execute the same transaction process 10 times

perform_transactions ()

Output:

 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 10 0 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 

Let's try to understand the above code step by step:

  • First , the Lock object is created with:
     lock = multiprocessing.Lock () 
  • Then the lock is passed as an argument to the target function :
     p1 = multiprocessing.Process (target = withdraw, args = (balance, lock)) p2 = multiprocessing.Process (target = deposit, args = (balance, lock)) 
  • In the critical section of the objective function, we apply locking using the lock.acquire () method . Once the lock is obtained, no other process can access its critical section until the lock is released using the lock.release () method .
     lock. acquire () balance.value = balance.value - 1 lock.release () 

    As you can see from the results, the final balance is 100 each time (which is the expected final result).

Interprocess concatenation

Let's look at a simple program to find squares of numbers in a given list.

# Python search program
# squares of numbers in the given list

def square (n):

return (n * n)

 

if __ name__ = = "__ main__" :

  

  # input list

mylist = [ 1 , 2 , 3 , 4 , 5 ]

 

# empty list to save the result

result = []

 

for num in mylist:

result.append (square (num))

 

print (result)

Output:

 [1, 4, 9, 16, 25] 

This is a simple program to calculate the squares of the elements of a given list. On a multi-core / multiprocessor system, see the diagram below to understand how the above program works:

Only one of the cores is used to execute the program, and it is possible that other cores remain idle.

To use all cores, the multiprocessor module provides class Pool . The Pool class represents a pool of worker processes. It has methods that allow you to offload tasks into workflows in several different ways. Consider the diagram below:

Here the task is unloaded / distributed between cores / processes automatically by the object pool . The user does not need to worry about creating processes explicitly.

Consider the program below:

# Python program for understanding
# pool concept

import multiprocessing

import os

 

def square (n):

print ( "Worker process id for {0}: {1} " . format (n, os .getpid ()))

return (n * n)

 

if __ name__ = = "__ main__" :

# input list

mylist = [ 1 , 2 , 3 , 4 , 5 ]

 

# create a pool object

  p = multiprocessing.Pool ()

 

# map the list to the target function

result = p. map (square, mylist)

 

print (result)

Exit:

 Worker process id for 2: 4152 Worker process id for 1: 4151 Worker process id for 4: 4151 Worker process id for 3: 4153 Worker process id for 5: 4152 [1, 4, 9, 16, 25] 

Let's try to understand the above code step by step:

  • We create a Pool object, using:
      p = multiprocessing.Pool () 

    There are several arguments to give you more control over the unloading of the task. These are:

    • processes: specify the number of worker processes.
    • maxtasksperchild: specify the maximum number of tasks for each child.

    All processes in the pool can be executed for some initialization using these arguments:

    • initializer: specify an initialization function for workers processes.
    • initargs: arguments to pass to the initializer.
  • Now, in order to perform some task, we must match her with some kind of function. In the above example, we are mapping mylist to the square function. As a result, the content of mylist and the definition of square will be distributed among the cores.
      result = p.map (square, mylist)  
  • Once all worker processes have completed their task, a list with the final result is returned.

This article is courtesy of Nikhil Kumar . If you are as Python.Engineering and would like to contribute, you can also write an article using contribute.python.engineering or by posting an article contribute @ python.engineering. See my article appearing on the Python.Engineering homepage and help other geeks.

Please write in comments if you find anything wrong or if you'd like to share more information on the topic discussed above.