Python | Communication between threads | Set-2

Python Methods and Functions

If a thread needs to immediately know when a consumer thread has processed a particular piece of data, it needs to bind the submitted data to an Event object that allows the manufacturer to track its progress, as shown in the code below —

Code # 1:

from queue import Queue

from threading import Thread, Event

 
# Stream that produces data

def producer (out_q):

  while running:

  # Produce some data

...

# Create a pair (data, event) and

# pass it to the consumer

evt = Event ()

out_q.put ( (data, evt))

...

# Wait for the consumer to process the element

evt.wait ()

 

  # Stream that consumes data

def consumer (in_q):

while True :

  # Get data

  data, evt = in_q.get ()

  # Data processing

  ...

# Specify completion

evt. set ()

Writing multithreaded programs based on simple queuing is often a good way to maintain sanity. If everything is broken up into simple thread-safe queues, there is no need to clog the program with locks and other low-level synchronization. In addition, communication with queues often results in designs that can later be extended to other types of message-based communication patterns. For example, you can split your program into multiple processes, or even a distributed system, without changing much of your queue architecture. 
One caveat with thread queuing is that queuing an item does not make a copy of the item. So communication actually involves passing an object reference between threads.

If general state is a concern, it might make sense to only pass immutable data structures (such as integers, strings, or tuples) or make deep copies of elements in queue as shown in the code below:

Code # 2:

from queue import Queue

from threading import Thread

import copy

  
# Stream that produces data

def producer (out_q):

while True :

# Produce some data

...

out_q.put (copy.deepcopy (data))

 
# Stream that consumes data

def consumer (in_q):

while True :

# Get data

  data = < / code> in_q.get ()

# Processing data

...

  • Queue objects provide several additional functions that may be useful in certain contexts.
  • If an optional queue is created, such as Queue (N), it imposes a limit on the number of items that can be queued before put () blocks the producer.
  • Adding a top queuing boundaries might make sense if there is a mismatch in speed between producer and consumer.
  • For example, if a producer generates items at a much faster rate than they can be used.
  • On the other hand hand, the creation of a queue block when it is full can also have an unintentional cascading effect. ct in the entire program, which can lead to deadlock or poor performance.
  • In general, the problem of "flow control" between linked threads is much more complicated than it seems.
  • If will you ever try to solve the problem by playing around with the queue sizes, it could be a sign of fragile design or some other inherent scaling problem.

Code # 3: Methods get () and put () support non-blocking and timeouts.

import queue

q = queue.Queue ()

 

try :

data = q.get (block = False )

except queue .Empty:

...

try :

q.put (item, block = False )

except queue.Full:

...

 

try :

data = q.get (timeout = 5.0 )

except queue.Empty:

...

Both of these options can be used to avoid the problem of simply blocking a certain queue operation indefinitely. For example, the non-blocking put () function can be used with a fixed-size queue to implement different types of processing code when the queue is full.

Code # 4: Issue a log message and discard

def producer (q):

...

try :

q.put (item, block = False )

except queue.Full:

log.warning (< / code> 'queued item% r discarded!' , item)

Timeout is useful if you are trying to force consumer threads to periodically abandon operations such as q.get () so they can check things like the completion flag.

Code # 5: Using a Timeout

_ running = True

  

def consumer (q):

while _ running:

  try :

 < / code> item = q.get (timeout = 5.0 )

  # Process element

.. .

except queue.Empty:

pass

Finally, there are utility methods q.qsize () , q.full () , q.empty () which can report the current size and state of the queue. Keep in mind, however, that they are all unreliable in a multithreaded environment. For example, a call to q.empty () might say that the queue is empty, but in the time since the call was made, another thread might have added an item to the queue. Honestly, it's better to write code so as not to rely on such features.





Get Solution for free from DataCamp guru