Python threading thread queue

Python Thread-safe Queue

Summary: in this tutorial, you’ll learn how to use a Python thread-safe queue to exchange data safely between multiple threads.

Introduction to the Python thread-safe queue

The built-in queue module allows you to exchange data safely between multiple threads. The Queue class in the queue module implements all required locking semantics.

Creating a new queue

To create a new queue, you import the Queue class from the queue module:

from queue import QueueCode language: Python (python)

and use the Queue constructor as follows:

queue = Queue()Code language: Python (python)

To create a queue with a size limit, you can use the maxsize parameter. For example, the following creates a queue that can store up to 10 items:

queue = Queue(maxsize=10)Code language: Python (python)

Adding an item to the queue

To add an item to the queue, you use the put() method like this:

queue.add(item)Code language: Python (python)

Once the queue is full, you won’t be able to add an item to it. Also, the call to the put() method will block until the queue has space available.

If you don’t want the put() method to block if the queue is full, you can set the block argument to False :

queue.put(item, block=False)Code language: Python (python)

In this case, the put() method will raise the queue.Full exception if the queue is full:

try: queue.put(item, block=False) except queue.Full as e: # handle exceptoinCode language: Python (python)

To add an item to a sized limited queue and block with a timeout, you can use the timeout parameter like this:

try: queue.put(item, timeout=3) except queue.Full as e: # handle exceptoinCode language: Python (python)

Getting an item from the queue

To get an item from the queue, you can use the get() method:

item = queue.get()Code language: Python (python)

The get() method will block until an item is available for retrieval from the queue.

To get an item from the queue without blocking, you can set the block parameter to False :

try: queue.get(block=False) except queue.Empty: # handle exceptionCode language: Python (python)

To get an item from the queue and block it with a time limit, you can use the get() method with a timeout:

try: item = queue.get(timeout=10) except queue.Empty: # . Code language: Python (python)

Getting the size of the queue

The qsize() method returns the number of items in the queue:

size = queue.size()Code language: Python (python)

Also, the empty() method returns True if the queue is empty or False otherwise. On the other hand, the full() method returns True if the queue is full or False otherwise.

Читайте также:  Java map get key with value

Marking a task as completed

An item that you add to the queue represents a unit of work or a task.

When a thread calls the get() method to get the item from the queue, it may need to process it before the task is considered completed.

Once completed, the thread may call the task_done() method of the queue to indicate that it has processed the task completely:

item = queue.get() # process the item # . # mark the item as completed queue.task_done()Code language: Python (python)

Waiting for all tasks on the queue to be completed

To wait for all tasks on the queue to be completed, you can call the join() method on the queue object:

queue.join()Code language: Python (python)

Python thread-safe queue example

The following example illustrates how to use the thread-safe queue to exchange data between two threads:

import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Inserting item into the queue') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item ') time.sleep(2) queue.task_done() def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join() if __name__ == '__main__': main()Code language: Python (python)

First, define the producer() function that adds numbers from 1 to 11 to the queue. It delays one second in each iteration:

def producer(queue): for i in range(1, 6): print(f'Inserting item into the queue') time.sleep(1) queue.put(i)Code language: Python (python)

Second, define the consumer() function that gets an item from the queue and processes it. It delays two seconds after processing each item on the queue:

def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Processing item ') time.sleep(2) queue.task_done()Code language: Python (python)

The queue. task_done() indicates that the function has processed the item on the queue.

Third, define the main() function that creates two threads, one thread adds a number to the queue every second while another thread processes an item on the queue every two seconds:

def main(): queue = Queue() # create a producer thread and start it producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # create a consumer thread and start it consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # wait for all tasks to be added to the queue producer_thread.join() # wait for all tasks on the queue to be completed queue.join()Code language: Python (python)
Inserting item 1 into the queue Inserting item 2 into the queue Processing item 1 Inserting item 3 into the queue Processing item 2 Inserting item 4 into the queue Inserting item 5 into the queue Processing item 3 Processing item 4 Processing item 5Code language: Python (python)

The following are steps in the main() function:

  1. Create a new queue by calling the Queue() constructor
  2. Create a new thread called producer_thread and start it immediately
  3. Create a daemon thread called consumer_thread and start it immediately.
  4. Wait for all the numbers to be added to the queue using the join() method of the thread.
  5. Wait for all the tasks on the queue to be completed by calling the join() method of the queue.

The producer adds a number to the queue every second, and the consumer process a number from the queue every two seconds. It also displays the numbers on the queue every second.

Summary

Источник

Потокобезопасная очередь в Python

Встроенный модуль queue позволяет безопасно обмениваться данными между несколькими потоками. Класс Queue из модуля queue реализует всю необходимую семантику блокировки.

Создаем новую очередь

Чтобы создать новую очередь, нужно использовать конструктор Queue следующим образом:

from queue import Queue queue = Queue()

Чтобы создать очередь с ограничением по размеру, можно использовать параметр maxsize . Например, ниже создается очередь, которая может хранить до 10 элементов:

from queue import Queue queue = Queue(maxsize=10)

Добавляем элемент в очередь

Чтобы добавить элемент в очередь, нужно использовать метод put() следующим образом:

Как только очередь заполнится, вы не сможете добавить в нее элемент. Вызов метода put() будет блокироваться до тех пор, пока в очереди не освободится место.

Если вы не хотите, чтобы метод put() блокировался, если очередь переполнена, вы можете передать аргумент block=False :

В примере ниже метод put() вызовет исключение queue.Full , если очередь переполнена:

try: queue.put(item, block=False) except queue.Full as e: # обработка исключения

Чтобы добавить элемент в ограниченную по размеру очередь и заблокировать его по таймауту, вы можете использовать параметр таймаута следующим образом:

try: queue.put(item, timeout=3) except queue.Full as e: # обработка исключения

Получаем элемент из очереди

Чтобы получить элемент из очереди, вы можете использовать метод get() :

Метод get() будет блокироваться до тех пор, пока элемент не будет доступен для получения из очереди.

Чтобы получить элемент из очереди без блокировки, можно передать аргумент block=False :

try: queue.get(block=False) except queue.Empty: # обработка исключения

Чтобы получить элемент из очереди и блокировать его с ограничением по времени, можно использовать метод get() с таймаутом:

try: item = queue.get(timeout=10) except queue.Empty: # . 

Получаем размер очереди

Метод qsize() возвращает количество элементов в очереди:

Кроме того, метод empty() возвращает True , если очередь пуста, или False в противном случае. С другой стороны, метод full() возвращает True , если очередь заполнена, или False в противном случае.

Помечаем задачу как выполненную

Элемент, который вы добавляете в очередь, представляет собой единицу работы или задачу.

Когда поток вызывает метод get() для получения элемента из очереди, ему может потребоваться обработать его, прежде чем задача будет считаться выполненной.

После завершения поток может вызвать метод task_done() , чтобы указать, что он полностью обработал задачу:

item = queue.get() # обработка элемента # . # помечаем элемент как выполненный queue.task_done()

Ждем завершение всех задач в очереди

Чтобы дождаться завершения всех задач в очереди, можно вызвать метод join() на объекте очереди:

Пример потокобезопасной очереди

Следующий пример демонстрирует использование потокобезопасной очереди для обмена данными между двумя потоками:

import time from queue import Empty, Queue from threading import Thread def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i) def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done() def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join() if __name__ == '__main__': main()

Как это работает

1. Сначала мы создаем функцию producer() , которая добавляет в очередь числа от 1 до 11. На каждой итерации она задерживается на одну секунду:

def producer(queue): for i in range(1, 6): print(f'Вставляем элемент в очередь') time.sleep(1) queue.put(i)

2. Создаем функцию consumer() , которая получает элемент из очереди и обрабатывает его. Она задерживается на две секунды после обработки каждого элемента в очереди:

def consumer(queue): while True: try: item = queue.get() except Empty: continue else: print(f'Обрабатываем элемент ') time.sleep(2) queue.task_done()

queue.task_done() указывает на то, что функция обработала элемент очереди.

3. Создаем функцию main() , которая создает два потока. Один поток добавляет номер в очередь каждую секунду, а другой обрабатывает элемент в очереди каждые две секунды:

def main(): queue = Queue() # создаем поток-производитель и запускаем его producer_thread = Thread( target=producer, args=(queue,) ) producer_thread.start() # создаем поток-потребитель и запускаем его consumer_thread = Thread( target=consumer, args=(queue,), daemon=True ) consumer_thread.start() # дожидаемся, пока все задачи добавятся в очередь producer_thread.join() # дожидаемся, пока все задачи в очереди будут завершены queue.join()
  • Создаем новую очередь, вызвав конструктор Queue() .
  • Создаем новый поток с именем producer_thread и немедленно запускаем его.
  • Создаем поток-демон consumer_thread и немедленно запускаем его.
  • Ждем добавления всех номеров в очередь с помощью метода join() .
  • Ждем завершения всех задач в очереди с помощью метода join() .

Поток-производитель добавляет число в очередь каждую секунду, а поток-потребитель обрабатывает число из очереди каждые две секунды. Он также отображает числа в очереди каждую секунду.

Вставляем 1 элемент в очередь
Вставляем 2 элемент в очередь
Обрабатываем элемент 1
Вставляем 3 элемент в очередь
Обрабатываем элемент 2
Вставляем 4 элемент в очередь
Вставляем 5 элемент в очередь
Обрабатываем элемент 3
Обрабатываем элемент 4
Обрабатываем элемент 5

Источник

Оцените статью