Python Thread-safe Queue
Summary: in this tutorial, you’ll learn how to use a synchronized queue to exchange data safely between multiple threads.
Introduction to the Python thread-safe queue
The built-in queue
module allows you to exchange data safely between multiple threads. The Queue
class in the queue
module implements all required locking semantics.
Creating a new queue
To create a new queue, you use the Queue
constructor as follows:
from queue import Queuequeue = Queue()
Code language: Python (python)
To create a queue with a size limit, you can use the maxsize
parameter. For example, the following creates a queue that can store up to 10 items:
from queue import Queuequeue = Queue(maxsize=10)
Code language: Python (python)
Adding an item to the queue
To add an item to the queue, you use the put()
method like this:
# ...
queue.add(item)
Code language: Python (python)
Once the queue is full, you cannot add an item to the queue. The call to the put()
method will block until the queue has space available.
If you don’t want the put()
method to block if the queue is full, you can set the block
argument to False
:
queue.put(item, block=False)
Code language: Python (python)
In this case, the put()
method will raise the queue.Full
exception if the queue is full:
try:
queue.put(item, block=False)
except queue.Full as e:
# handle exceptoin
Code language: Python (python)
To add an item to a sized limited queue and block with a timeout, you can use the timeout
parameter like this:
try:
queue.put(item, timeout=3)
except queue.Full as e:
# handle exceptoin
Code language: Python (python)
Getting an item from the queue
To get an item from the queue, you can use the get()
method:
item = queue.get()
Code language: Python (python)
The get()
method will block until an item is available for retrieval from the queue.
To get an item from the queue without blocking, you can set the block parameter to False
:
try:
queue.get(block=False)
except queue.Empty:
# handle exception
Code language: Python (python)
To get an item from the queue and block with a time limit, you can use the get()
method with a timeout:
try:
item = queue.get(timeout=10)
except queue.Empty:
# ...
Code language: Python (python)
Getting the size of the queue
The qsize()
method returns the number of items in the queue:
size = queue.size()
Code language: Python (python)
Also, the empty()
method returns True if the queue is empty or False otherwise. On the other hand, the full()
method returns True if the queue is full or False otherwise.
Marking a task as completed
An item that you add to the queue represents a unit of work or a task.
When a thread calls the get()
method to get the item from the queue, it may need to process it before the task is considered completed.
Once completed, the thread may call the task_done()
method of the queue to indicate that it has processed the task completely:
item = queue.get()# process the item
# ...
# mark the item as completed
queue.task_done()
Code language: Python (python)
Waiting for all tasks on the queue to be completed
To wait for all tasks on the queue to be completed, you can call the join()
method on the queue object:
queue.join()
Code language: Python (python)
Python thread-safe queue example
The following example illustrates how to use the thread-safe queue to exchange data between two threads:
import time
from queue import Empty, Queue
from threading import Threaddef producer(queue):
for i in range(1, 6):
print(f'Inserting item {i} into the queue')
time.sleep(1)
queue.put(i)
def consumer(queue):
while True:
try:
item = queue.get()
except Empty:
continue
else:
print(f'Processing item {item}')
time.sleep(2)
queue.task_done()
def main():
queue = Queue()
# create a producer thread and start it
producer_thread = Thread(
target=producer,
args=(queue,)
)
producer_thread.start()
# create a consumer thread and start it
consumer_thread = Thread(
target=consumer,
args=(queue,),
daemon=True
)
consumer_thread.start()
# wait for all tasks to be added to the queue
producer_thread.join()
# wait for all tasks on the queue to be completed
queue.join()
if __name__ == '__main__':
main()
Code language: Python (python)
How it works.
First, define the producer()
function that adds numbers from 1 to 11 to the queue. It delays one second in each iteration:
def producer(queue):
for i in range(1, 6):
print(f'Inserting item {i} into the queue')
time.sleep(1)
queue.put(i)
Code language: Python (python)
Second, define the consumer()
function that gets an item from the queue and processes it. It delays two seconds after processing each item on the queue:
def consumer(queue):
while True:
try:
item = queue.get()
except Empty:
continue
else:
print(f'Processing item {item}')
time.sleep(2)
queue.task_done()
Code language: Python (python)
The queue.task_done()
indicates that the function has processed the item on the queue.
Third, define the main()
function that creates two threads, one thread adds a number to the queue every one second while another thread process an item on the queue every two seconds:
def main():
queue = Queue() # create a producer thread and start it
producer_thread = Thread(
target=producer,
args=(queue,)
)
producer_thread.start()
# create a consumer thread and start it
consumer_thread = Thread(
target=consumer,
args=(queue,),
daemon=True
)
consumer_thread.start()
# wait for all tasks to be added to the queue
producer_thread.join()
# wait for all tasks on the queue to be completed
queue.join()
Code language: Python (python)
Output:
Inserting item 1 into the queue
Inserting item 2 into the queue
Processing item 1
Inserting item 3 into the queue
Processing item 2
Inserting item 4 into the queue
Inserting item 5 into the queue
Processing item 3
Processing item 4
Processing item 5
Code language: Python (python)
The following are steps in the main()
function:
- Create a new queue by calling the
Queue()
constructor - Create a new thread called
producer_thread
and start it immediately - Create a daemon thread called
consumer_thread
and start it immediately. - Wait for all the numbers to be added to the queue using the
join()
method of the thread. - Wait for all the tasks on the queue to be completed by calling the
join()
method of the queue.
In this picture, the producer adds a number to the queue every second, and the consumer process a number from the queue every two seconds. It also displays the numbers on the queue every second.
Summary
- Use the
Queue
class of thequeue
module to safely exchange data between multiple threads.