Асинхронность в Python

Теория: Очереди и синхронизация

Асинхронные очереди (asyncio.Queue)

Асинхронные очереди в asyncio — это один из ключевых инструментов для организации взаимодействия между задачами без явной блокировки. Они позволяют передавать данные от одной корутины к другой, синхронизируя выполнение задач в рамках одного цикла событий без использования сложных блокировок. Класс asyncio.Queue реализует принцип «производитель–потребитель», где одна или несколько задач помещают элементы в очередь, а другие их извлекают. Всё это выполняется в одном потоке и в одном цикле событий. При этом использование await делает операции неблокирующими, позволяя другим задачам выполняться параллельно в рамках того же цикла.

Создание очереди выполняется через конструктор asyncio.Queue(). Она может быть неограниченной по размеру или иметь параметр maxsize, который задаёт максимальное количество элементов. Если очередь заполнена, корутина, пытающаяся добавить новый элемент, приостанавливается до тех пор, пока кто-то не извлечёт данные. Это обеспечивает естественное регулирование нагрузки и предотвращает переполнение памяти.

Пример базового взаимодействия:

import asyncio
from asyncio import Queue, Task


async def producer(queue: Queue[int]) -> None:
    for i in range(5):
        await asyncio.sleep(1)
        await queue.put(i)
        print(f"Производитель добавил: {i}")


async def consumer(queue: Queue[int]) -> None:
    try:
        while True:
            item: int = await queue.get()
            print(f"Потребитель получил: {item}")
            queue.task_done()
    except asyncio.CancelledError:
        print("Потребитель корректно завершён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue()
    prod: Task[None] = asyncio.create_task(producer(queue))
    cons: Task[None] = asyncio.create_task(consumer(queue))
    await prod
    await queue.join()
    cons.cancel()
    try:
        await cons
    except asyncio.CancelledError:
        pass


asyncio.run(main())

Здесь задача producer добавляет данные в очередь, а задача consumer извлекает их. Когда вызывается await queue.put(i), элемент помещается в очередь, а корутина приостанавливается до освобождения места, если очередь заполнена. Метод queue.get() приостанавливает выполнение корутины и возвращает управление циклу событий до тех пор, пока в очереди не появится элемент.

Важно: обратите внимание на обработку CancelledError в воркере. Это критически важно для корректного завершения задач. Также важно правильно ожидать отменённую задачу через await cons — это позволяет исключению корректно распространиться.

Критическая важность task_done() и join()

Каждый вызов queue.get() обязательно должен сопровождаться вызовом queue.task_done(). Количество вызовов task_done() должно точно соответствовать количеству добавленных элементов. Если их меньше, queue.join() будет бесконечно ожидать завершения — это одна из самых распространённых ошибок при работе с очередями.

Рассмотрим пример зависания из-за забытого task_done():

import asyncio
from asyncio import Queue, Task


async def faulty_consumer(queue: Queue[int]) -> None:
    """Потребитель с ошибкой: забыли task_done()"""
    try:
        while True:
            item: int = await queue.get()
            print(f"Обработан: {item}")
            # ОШИБКА: забыли вызвать queue.task_done()
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main_with_bug() -> None:
    queue: Queue[int] = asyncio.Queue()

    # Добавляем элементы
    for i in range(3):
        await queue.put(i)

    cons: Task[None] = asyncio.create_task(faulty_consumer(queue))

    print("Ожидание join()...")
    try:
        # Будет висеть вечно, так как task_done() не вызывается!
        await asyncio.wait_for(queue.join(), timeout=2.0)
    except asyncio.TimeoutError:
        print("ЗАВИСАНИЕ: join() не завершился из-за отсутствия task_done()")

    cons.cancel()
    try:
        await cons
    except asyncio.CancelledError:
        pass


asyncio.run(main_with_bug())
# => Ожидание join()...
# => Обработан: 0
# => Обработан: 1
# => Обработан: 2
# => ЗАВИСАНИЕ: join() не завершился из-за отсутствия task_done()
# => Потребитель отменён

Метод join() используется, когда нужно дождаться завершения всех задач, связанных с очередью. Он блокирует выполнение до тех пор, пока количество вызовов task_done() не станет равным количеству добавленных элементов. Это удобно, когда требуется дождаться полной обработки всех данных, прежде чем завершить программу.

Правило: на каждый get() должен быть ровно один task_done(), даже если обработка элемента завершилась с ошибкой. Рассмотрим правильную обработку ошибок:

import asyncio
from asyncio import Queue, Task


async def safe_consumer(queue: Queue[int]) -> None:
    """Потребитель с корректной обработкой ошибок"""
    try:
        while True:
            item: int = await queue.get()
            try:
                if item == 2:
                    raise ValueError("Ошибка обработки элемента 2")
                print(f"Обработан: {item}")
            except ValueError as e:
                print(f"Ошибка: {e}")
            finally:
                # ВАЖНО: task_done() вызывается всегда, даже при ошибке
                queue.task_done()
    except asyncio.CancelledError:
        print("Потребитель корректно завершён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue()

    for i in range(5):
        await queue.put(i)

    cons: Task[None] = asyncio.create_task(safe_consumer(queue))

    await queue.join()  # Теперь корректно завершится
    print("Все элементы обработаны")

    cons.cancel()
    try:
        await cons
    except asyncio.CancelledError:
        pass


asyncio.run(main())

Ненадёжность qsize()

Метод qsize() возвращает приблизительный размер очереди, но не должен использоваться для принятия критических решений. В многозадачной среде размер очереди может измениться между вызовом qsize() и последующей операцией. Этот метод полезен только для мониторинга и отладки, но не для логики программы.

import asyncio
from asyncio import Queue


async def unreliable_check(queue: Queue[int]) -> None:
    """Пример ненадёжной проверки размера"""
    if queue.qsize() > 0:  # Ненадёжно!
        # Между проверкой и get() другая задача может забрать элемент
        await asyncio.sleep(0.01)  # имитация задержки
        try:
            item = queue.get_nowait()  # Может выбросить QueueEmpty!
            print(f"Получен: {item}")
            queue.task_done()
        except asyncio.QueueEmpty:
            print("Очередь оказалась пустой!")


async def reliable_approach(queue: Queue[int]) -> None:
    """Надёжный подход без проверки размера"""
    try:
        item: int = await queue.get()
        print(f"Получен: {item}")
        queue.task_done()
    except asyncio.CancelledError:
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue()
    await queue.put(1)

    # Запускаем две задачи, которые могут конкурировать
    await asyncio.gather(
        unreliable_check(queue),
        unreliable_check(queue),
    )


asyncio.run(main())

Правило: используйте await queue.get() вместо проверки qsize() с последующим get_nowait(). Асинхронный get() корректно обработает пустую очередь, приостановив выполнение до появления элемента.

Асинхронная очередь поддерживает не только базовые методы put и get, но и их неблокирующие аналоги — put_nowait и get_nowait. Эти методы выбрасывают исключения QueueFull или QueueEmpty, если операция невозможна. Они полезны, когда важно избежать приостановки корутины и продолжить выполнение других операций без ожидания. Однако в большинстве случаев предпочтительно использовать обычные асинхронные методы, чтобы не терять преимущества кооперативного выполнения.

Асинхронные очереди позволяют естественным образом строить пайплайны — цепочки обработки данных. Например, одна корутина может загружать данные, другая — обрабатывать, а третья — сохранять результат. Между ними передаются данные через asyncio.Queue. Такой подход делает код модульным, управляемым и легко расширяемым. В отличие от традиционных потоков, здесь не нужно заботиться о синхронизации доступа: очередь обеспечивает безопасную передачу данных между задачами.

Несколько потребителей с корректным завершением

Для более наглядного понимания можно рассмотреть пример с несколькими потребителями и корректной обработкой завершения:

import asyncio
from asyncio import Queue, Task


async def worker(name: str, queue: Queue[int | None]) -> None:
    """Воркер с корректной обработкой завершения"""
    try:
        while True:
            item: int | None = await queue.get()
            try:
                if item is None:
                    print(f"{name} получил сигнал завершения")
                    break
                print(f"{name} обрабатывает элемент {item}")
                await asyncio.sleep(2)
            finally:
                queue.task_done()
    except asyncio.CancelledError:
        print(f"{name} был отменён")
        raise


async def main() -> None:
    queue: Queue[int | None] = asyncio.Queue()

    # Добавляем рабочие элементы
    for i in range(10):
        await queue.put(i)

    # Создаём воркеров
    workers: list[Task[None]] = [
        asyncio.create_task(worker(f"Рабочий-{i}", queue)) for i in range(3)
    ]

    # Ждём обработки всех элементов
    await queue.join()

    # Отправляем сигнал завершения каждому воркеру
    for _ in workers:
        await queue.put(None)

    # Ждём корректного завершения всех воркеров
    await asyncio.gather(*workers, return_exceptions=True)
    print("Все воркеры завершены")


asyncio.run(main())

В этом примере три задачи работают параллельно, распределяя между собой элементы очереди. Каждый элемент извлекается ровно одним потребителем, что демонстрирует естественное разделение нагрузки. Важные моменты:

  1. Используется try/finally для гарантии вызова task_done() даже при ошибках
  2. Специальное значение None используется как сигнал завершения
  3. Количество None равно количеству воркеров
  4. Воркеры завершаются естественным образом через break, а не через отмену

Асинхронные очереди особенно полезны в системах, где данные поступают с разной скоростью. Например, в сетевых приложениях производитель может получать данные быстрее, чем потребители успевают их обрабатывать. В этом случае maxsize помогает ограничить буфер и избежать переполнения памяти. Если очередь достигла максимального размера, корутина, выполняющая put, временно «засыпает», пока не освободится место. Это создаёт естественный механизм регулирования нагрузки.

Когда очередь пуста, вызов get() приостанавливает выполнение до появления новых элементов. Таким образом, очередь играет роль точки синхронизации между независимыми задачами, не требуя явных блокировок или уведомлений.

Работа с таймаутами

Асинхронная очередь также может быть использована в связке с таймаутами. Например, можно установить ограничение на время ожидания элемента:

import asyncio
from asyncio import Queue


async def consumer_with_timeout(queue: Queue[int]) -> None:
    try:
        item: int = await asyncio.wait_for(queue.get(), timeout=3)
        print(f"Получен элемент: {item}")
        queue.task_done()
    except asyncio.TimeoutError:
        print("Истекло время ожидания элемента")
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue()
    await consumer_with_timeout(queue)


asyncio.run(main())
# => Истекло время ожидания элемента

В этом случае, если элемент не появится в течение 3 секунд, выбрасывается исключение TimeoutError, и выполнение продолжается без блокировки. Это полезно в системах, где ожидание не должно быть бесконечным.

Таким образом, asyncio.Queue — это универсальный инструмент для обмена данными между асинхронными задачами. Он обеспечивает безопасную передачу информации, автоматическую синхронизацию и естественное управление скоростью выполнения. В отличие от многопоточных блокирующих программ, где требуются блокировки и мьютексы, здесь всё основано на ожидании событий и кооперативном переключении внутри одного потока. Очередь становится связующим элементом между частями программы, упрощая построение асинхронных конвейеров и систем распределённой обработки.

Примитивы синхронизации: Lock, Event, Condition

В асинхронных программах, где несколько задач выполняются одновременно в рамках одного цикла событий, иногда требуется согласовать доступ к общим ресурсам. Например, две корутины могут пытаться записать данные в один и тот же файл или изменять общую переменную. Если не контролировать такие ситуации, можно столкнуться с непредсказуемыми ошибками — одни задачи будут перезаписывать результаты других, а состояние программы станет неконсистентным. Для решения этих проблем в модуле asyncio предусмотрены примитивы синхронизации, которые позволяют координировать выполнение задач, не блокируя поток исполнения. Основные из них — Lock, Event и Condition.

Блокировка (Lock)

Асинхронная блокировка — это самый простой и часто используемый примитив. Объект asyncio.Lock позволяет ограничить доступ к ресурсу, чтобы только одна задача могла работать с ним в данный момент. Если одна задача захватила блокировку, остальные будут ожидать её освобождения, приостанавливаясь с помощью await, а не блокируя поток.

Создать блокировку можно просто вызвав asyncio.Lock(). Для захвата блокировки используется await lock.acquire(), а для освобождения — lock.release(). Однако на практике чаще применяют контекстный менеджер async with lock, который автоматически захватывает блокировку при входе и освобождает при выходе. Это делает код безопасным и защищает от ситуаций, когда блокировка может остаться «висеть» из-за ошибки или исключения.

Пример иллюстрирует работу нескольких задач, обращающихся к общему ресурсу:

import asyncio
from asyncio import Lock


async def worker(name: str, lock: Lock) -> None:
    try:
        print(f"{name} ожидает доступ")
        async with lock:
            print(f"{name} получил доступ")
            await asyncio.sleep(2)
            print(f"{name} освобождает ресурс")
    except asyncio.CancelledError:
        print(f"{name} был отменён")
        raise


async def main() -> None:
    lock: Lock = asyncio.Lock()
    await asyncio.gather(worker("A", lock), worker("B", lock))


asyncio.run(main())

В этом примере обе задачи пытаются войти в критическую секцию, защищённую lock. Первая получает доступ сразу, а вторая приостанавливается, пока блокировка не будет освобождена. При этом цикл событий продолжает работать — программа не зависает, просто вторая корутина ждёт своей очереди.

asyncio.Lock хранит ожидающие корутины в очереди (deque) и при release() пробуждает первого в очереди — то есть поведение по факту реализовано как FIFO. Документация описывает поведение, но реализация и планировщик могут давать тонкие отличия. Если требуется соблюдение справедливого порядка — asyncio.Lock обычно обеспечивает это: первая ожидающая задача пробуждается первой. Если вам нужна абсолютная гарантия порядка в любых условиях (без даже малейшего шанса на перескакивание), стоит использовать явную очередь.

Когда Lock уместен и альтернативы

asyncio.Lock полезен для координации доступа к общим ресурсам в рамках одного цикла событий, но важно понимать его ограничения.

Когда Lock уместен:

  • Защита общих структур данных в памяти (словари, списки, счётчики)
  • Координация последовательного доступа к внешним ресурсам (например, последовательные запросы к API)
  • Гарантия атомарности последовательности операций

Когда Lock НЕ является лучшим решением:

Для файлового ввода-вывода Lock часто избыточен или недостаточен:

import asyncio
from asyncio import Lock
from pathlib import Path


# Плохо: Lock не защищает от проблем с файлами
async def write_with_lock(lock: Lock, data: str) -> None:
    async with lock:
        # Lock не защищает от проблем с буферизацией или ошибок ФС
        with open("data.txt", "a") as f:
            f.write(data)


# Лучше: отдельный writer-воркер с очередью
async def writer_worker(queue: asyncio.Queue[str | None]) -> None:
    """Единственный воркер для записи в файл"""
    try:
        with open("data.txt", "a") as f:
            while True:
                data: str | None = await queue.get()
                try:
                    if data is None:
                        break
                    f.write(data)
                    f.flush()  # Явная буферизация
                finally:
                    queue.task_done()
    except asyncio.CancelledError:
        print("Writer завершён")
        raise


async def main_with_writer() -> None:
    """Пример с выделенным воркером для записи"""
    queue: asyncio.Queue[str | None] = asyncio.Queue()
    writer = asyncio.create_task(writer_worker(queue))

    # Множество задач могут отправлять данные
    async def producer(n: int) -> None:
        for i in range(3):
            await queue.put(f"Задача {n}, строка {i}\n")

    await asyncio.gather(*(producer(i) for i in range(5)))

    # Завершаем writer
    await queue.put(None)
    await queue.join()
    await writer

# Ещё лучше: использовать специализированные библиотеки
# Для логирования: logging с ThreadPoolExecutor или aiofiles
# Для базы данных: пул соединений с ограничением

Альтернативы Lock для файлового I/O:

  1. Выделенный writer-воркер — один воркер обрабатывает все записи через очередь
  2. Буферизация — накапливать данные в памяти и записывать пакетами
  3. Библиотека aiofiles — асинхронное API для работы с файлами
  4. ThreadPoolExecutor — для синхронных операций с файлами

Событие (Event)

Объект asyncio.Event используется для уведомления задач о том, что произошло определённое событие. Это своего рода флаг, который изначально находится в состоянии «ложь». Когда одна задача вызывает метод set(), флаг переключается в состояние «истина», и все задачи, ожидавшие его через await event.wait(), немедленно продолжают выполнение.

События удобны, когда одна часть программы должна сигнализировать другой о готовности данных или завершении операции. Например, можно реализовать ситуацию, когда потребитель ждёт, пока производитель не завершит инициализацию ресурса:

import asyncio
from asyncio import Event


async def waiter(event: Event) -> None:
    try:
        print("Ожидание события...")
        await event.wait()
        print("Событие произошло!")
    except asyncio.CancelledError:
        print("Waiter отменён")
        raise


async def setter(event: Event) -> None:
    try:
        await asyncio.sleep(2)
        print("Устанавливаем событие")
        event.set()
    except asyncio.CancelledError:
        print("Setter отменён")
        raise


async def main() -> None:
    event: Event = asyncio.Event()
    await asyncio.gather(waiter(event), setter(event))


asyncio.run(main())

Сначала запускается задача, которая ждёт сигнала, затем — задача, которая его подаст через 2 секунды. Когда вызывается event.set(), ожидание завершается, и корутина waiter продолжает выполнение. После вызова event.set() все последующие вызовы event.wait() будут немедленно завершаться, пока событие не будет сброшено через event.clear(). Это значит, что если другие задачи будут вызывать wait() после этого, они пройдут без ожидания.

Если требуется повторно использовать событие для нескольких циклов ожидания и срабатывания, нужно вызывать clear() после каждой обработки. Таким образом, Event подходит для сценариев, где одна или несколько задач должны быть уведомлены о наступлении определённого состояния, но не участвуют в общей очереди или блокировке ресурса.

Условие (Condition)

Примитив asyncio.Condition объединяет свойства блокировки и события. Он используется, когда несколько задач должны синхронизироваться не просто по факту наступления события, а с учётом конкретных условий и общих данных. Обычно условие применяется, когда одна задача должна дождаться определённого состояния ресурса, изменяемого другими задачами.

Каждое условие содержит внутренний Lock, который защищает доступ к данным. Когда задача хочет изменить состояние, она захватывает условие через async with condition. Метод condition.wait() временно отпускает связанную блокировку и приостанавливает корутину до тех пор, пока не будет вызван condition.notify(). После уведомления он перезахватывает блокировку перед возвратом управления. Этот вызов пробуждает одну или несколько ожидающих задач, которые могут проверить состояние данных и при необходимости снова заснуть.

Рассмотрим пример, где одна задача производит данные, а другая ждёт, пока они появятся:

import asyncio
from asyncio import Condition


async def producer(condition: Condition, shared: list[str]) -> None:
    try:
        await asyncio.sleep(2)
        async with condition:
            shared.append("данные")
            print("Производитель добавил данные")
            condition.notify()
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def consumer(condition: Condition, shared: list[str]) -> None:
    try:
        async with condition:
            print("Потребитель ждёт данные")
            await condition.wait()
            print(f"Потребитель получил: {shared.pop()}")
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    condition: Condition = asyncio.Condition()
    shared: list[str] = []
    await asyncio.gather(consumer(condition, shared), producer(condition, shared))


asyncio.run(main())

Пока производитель не добавит элемент в список и не вызовет notify(), задача-потребитель будет находиться в ожидании. После уведомления условие разблокируется, и потребитель сможет обработать данные. Этот механизм особенно полезен для реализации схем типа «производитель–потребитель», где несколько задач совместно используют общий буфер или последовательность операций.

Если уведомить нужно не одну задачу, а всех ожидающих, используется метод notify_all(). Это удобно, когда несколько потребителей ждут одинакового события, например, поступления данных. При этом важно помнить, что пробуждение задач не гарантирует немедленного выполнения — цикл событий сам решает порядок переключения между ними.

Взаимодействие примитивов и очередей

Асинхронные примитивы часто применяются вместе с очередями. Например, можно использовать Lock для защиты доступа к очереди при изменении состояния, если требуется выполнение некоторых дополнительных действий, не предусмотренных самой очередью. Или использовать Event, чтобы уведомлять задачи о том, что в очередь добавлены новые данные.

В отличие от блокирующих примитивов в многопоточном программировании, asyncio-примитивы не создают реальной блокировки потока. Они лишь переводят задачу в состояние ожидания и возвращают управление циклу событий, не блокируя поток исполнения. Это делает их лёгкими и безопасными для использования даже при большом количестве параллельных задач.

Все эти примитивы — Lock, Event, Condition — работают по кооперативному принципу. Они не заставляют поток простаивать, а лишь управляют порядком выполнения задач внутри event loop. Если одна задача ждёт сигнала или освобождения блокировки, другие в это время продолжают выполняться. Такой подход обеспечивает высокую эффективность и предсказуемость поведения программы.

Практическое значение

В реальных приложениях примитивы синхронизации часто используются для управления сложными сценариями взаимодействия. Например, Lock может защищать запись в лог-файл, чтобы строки из разных задач не перемешивались (хотя лучше использовать выделенный writer-воркер). Event — сигнализировать о готовности внешнего ресурса, например подключения к базе данных. А Condition — координировать выполнение нескольких задач, зависящих от общего состояния, например буфера данных.

Комбинируя эти инструменты, можно строить устойчивые и управляемые асинхронные системы, где задачи работают независимо, но при этом согласованно. Каждый из примитивов выполняет свою роль: Lock — защищает критические секции, Event — уведомляет о событиях, Condition — обеспечивает гибкое взаимодействие между задачами.

Управление backpressure

В асинхронных системах часто возникает ситуация, когда одни задачи производят данные быстрее, чем другие успевают их обрабатывать. Например, корутина получает сообщения из сети, а другая их записывает в базу. Если скорость обработки ниже скорости поступления, необработанные данные накапливаются, что может привести к перегрузке памяти. Такое несоответствие скоростей называется backpressure — обратным давлением в потоке данных, возникающее при дисбалансе скоростей. Управление им включает как замедление производителей, так и оптимизацию потребителей для поддержания стабильности системы.

В asyncio управление этим процессом обычно выполняется с помощью ограниченных очередей. Асинхронная очередь asyncio.Queue позволяет задать максимальный размер при создании:

queue: Queue[int] = asyncio.Queue(maxsize=100)

Если очередь заполнена, операция await queue.put(item) приостанавливает производителя до тех пор, пока потребитель не освободит место. Таким образом система сама регулирует скорость — источник данных ждёт, когда обработчик завершит работу.

Приостановка не блокирует поток выполнения: цикл событий продолжает выполнять другие задачи. Благодаря этому программа остаётся отзывчивой, а скорость поступления данных подстраивается под возможности потребителя.

Пример демонстрирует, как ограниченная очередь защищает систему от переполнения:

import asyncio
from asyncio import Queue, Task


async def producer(queue: Queue[int]) -> None:
    try:
        for i in range(10):
            await queue.put(i)
            print(f"Производитель добавил {i}")
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def consumer(queue: Queue[int]) -> None:
    try:
        while True:
            item: int = await queue.get()
            print(f"Потребитель обработал {item}")
            await asyncio.sleep(0.5)
            queue.task_done()
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue(maxsize=3)
    prod: Task[None] = asyncio.create_task(producer(queue))
    cons: Task[None] = asyncio.create_task(consumer(queue))
    await prod
    await queue.join()
    cons.cancel()
    try:
        await cons
    except asyncio.CancelledError:
        pass


asyncio.run(main())

Здесь производитель добавляет элементы быстрее, чем потребитель их обрабатывает. Когда очередь достигает лимита в три элемента, производитель автоматически приостанавливается. Это пример естественной балансировки нагрузки — система замедляется, но остаётся стабильной.

Если в системе несколько производителей и потребителей, механизм работает аналогично. Все производители ждут, если очередь заполнена, и возобновляют работу, когда потребители освободят место. Это предотвращает накопление данных и удерживает нагрузку в допустимых пределах.

Адаптивное управление скоростью

Иногда важно не просто приостановить источник, а регулировать его скорость. Например, производитель может замедляться, если очередь почти заполнена, и ускоряться, когда она почти пуста. Это создаёт адаптивную систему, которая сама поддерживает баланс.

Пример динамического управления скоростью производителя:

import asyncio
from asyncio import Queue, Task


async def adaptive_producer(queue: Queue[int]) -> None:
    try:
        delay: float = 0.1
        for i in range(20):
            await queue.put(i)
            print(f"Добавлен {i}, размер очереди: {queue.qsize()}")
            delay = 0.3 if queue.qsize() > 5 else 0.1
            await asyncio.sleep(delay)
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def consumer(queue: Queue[int]) -> None:
    try:
        while True:
            item: int = await queue.get()
            print(f"Обработан {item}")
            await asyncio.sleep(0.4)
            queue.task_done()
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue(maxsize=10)
    prod: Task[None] = asyncio.create_task(adaptive_producer(queue))
    cons: Task[None] = asyncio.create_task(consumer(queue))
    await prod
    await queue.join()
    cons.cancel()
    try:
        await cons
    except asyncio.CancelledError:
        pass


asyncio.run(main())

Здесь производитель анализирует размер очереди и подстраивает задержку. Такой механизм используется, когда скорость поступления данных непостоянна и важно избежать пиков нагрузки.

Использование Semaphore

Backpressure можно контролировать и через ограничение числа одновременно выполняемых задач с помощью asyncio.Semaphore. Если установить семафор на фиксированное значение, новые операции не запустятся, пока старые не завершатся. Это особенно полезно при взаимодействии с внешними системами — базами данных или веб-API, которые не могут обрабатывать большое количество запросов одновременно:

import asyncio
from asyncio import Semaphore


async def fetch_data(url: str, semaphore: Semaphore) -> None:
    try:
        async with semaphore:
            print(f"Получение данных из {url}")
            await asyncio.sleep(1)
            print(f"Завершено для {url}")
    except asyncio.CancelledError:
        print(f"Запрос к {url} отменён")
        raise


async def main() -> None:
    semaphore: Semaphore = asyncio.Semaphore(3)
    urls: list[str] = [f"http://example.com/{i}" for i in range(10)]

    await asyncio.gather(*[fetch_data(url, semaphore) for url in urls])


asyncio.run(main())

Отбрасывание данных

Иногда обработка требует отбрасывания части данных. Например, в потоковых системах мониторинга важнее анализировать свежие события, чем хранить все старые. В этом случае при переполнении можно удалять старые элементы перед добавлением новых — такой подход часто применяется в потоковых системах реального времени, где важнее обрабатывать актуальные данные. Этот механизм не встроен в asyncio.Queue, но легко реализуется с помощью collections.deque и контроля размера вручную:

import asyncio
from collections import deque
from typing import Deque


class DropOldQueue:
    def __init__(self, maxsize: int) -> None:
        self.maxsize: int = maxsize
        self.queue: Deque[int] = deque(maxlen=maxsize)
        self._event: asyncio.Event = asyncio.Event()

    async def put(self, item: int) -> None:
        self.queue.append(item)
        self._event.set()

    async def get(self) -> int:
        while not self.queue:
            self._event.clear()
            await self._event.wait()
        return self.queue.popleft()

    def qsize(self) -> int:
        return len(self.queue)


async def producer(queue: DropOldQueue) -> None:
    try:
        for i in range(20):
            await queue.put(i)
            print(f"Добавлен {i}, размер: {queue.qsize()}")
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def consumer(queue: DropOldQueue) -> None:
    try:
        for _ in range(10):
            item: int = await queue.get()
            print(f"Обработан {item}")
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    queue: DropOldQueue = DropOldQueue(maxsize=5)
    await asyncio.gather(producer(queue), consumer(queue))


asyncio.run(main())

Анализ и оптимизация

Управление backpressure помогает не только стабилизировать выполнение, но и понять слабые места системы. Если очередь часто заполняется, это сигнал, что обработка данных слишком медленная. Анализ этих состояний помогает оптимизировать архитектуру — например, добавить больше потребителей или перераспределить нагрузку.

Пример мониторинга состояния очереди:

import asyncio
from asyncio import Queue, Task


async def monitor(queue: Queue[int]) -> None:
    try:
        while True:
            size: int = queue.qsize()
            if size > 8:
                print(f"Внимание: очередь почти заполнена ({size}/10)")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Монитор отменён")
        raise


async def producer(queue: Queue[int]) -> None:
    try:
        for i in range(50):
            await queue.put(i)
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def consumer(queue: Queue[int]) -> None:
    try:
        while True:
            item: int = await queue.get()
            await asyncio.sleep(0.3)
            queue.task_done()
    except asyncio.CancelledError:
        print("Потребитель отменён")
        raise


async def main() -> None:
    queue: Queue[int] = asyncio.Queue(maxsize=10)

    tasks: list[Task] = [
        asyncio.create_task(producer(queue)),
        asyncio.create_task(consumer(queue)),
        asyncio.create_task(monitor(queue)),
    ]

    await tasks[0]
    await queue.join()

    for task in tasks[1:]:
        task.cancel()

    await asyncio.gather(*tasks[1:], return_exceptions=True)


asyncio.run(main())

Таким образом, контроль "обратного давления" в asyncio делает асинхронные программы устойчивыми. Вместо бесконтрольного накопления данных задачи взаимодействуют согласованно: производители ждут, когда потребители готовы, и вся система остаётся в равновесии. Ограниченные очереди, семафоры и адаптивные задержки позволяют добиться плавного и безопасного потока данных даже при изменении нагрузки.

Контрольный список для работы с очередями и синхронизацией

Обеспечьте баланс вызовов put() и task_done() — количество вызовов task_done() должно точно соответствовать количеству элементов в очереди

Всегда вызывайте task_done() в блоке finally — это гарантирует вызов даже при ошибках обработки

Используйте сигналы завершения для воркеров — отправляйте специальные значения (например, None) для корректного завершения цикла while True

Обрабатывайте CancelledError во всех воркерах — это критически важно для корректного завершения задач

Не полагайтесь на qsize() в логике программы — используйте его только для мониторинга и отладки

Используйте await queue.get() вместо проверки размера — асинхронный get() корректно обработает пустую очередь

Для файлового I/O предпочитайте выделенный writer-воркер вместо Lock — это надёжнее и эффективнее

Настройте backpressure через ограниченные очереди — используйте maxsize для предотвращения переполнения памяти

Ожидайте отменённые задачи корректно — используйте await task после cancel() для обработки исключения

Используйте gather(..., return_exceptions=True) для массовой отмены — это позволяет дождаться завершения всех задач

Пример правильного использования:

import asyncio
from asyncio import Queue, Task


async def worker(name: str, queue: Queue[str | None]) -> None:
    """Воркер с корректной обработкой всех аспектов"""
    try:
        while True:
            item: str | None = await queue.get()
            try:
                if item is None:
                    print(f"{name} получил сигнал завершения")
                    break

                # Обработка с возможной ошибкой
                if "error" in item:
                    raise ValueError(f"Ошибка обработки: {item}")

                print(f"{name} обработал: {item}")
                await asyncio.sleep(0.5)
            except ValueError as e:
                print(f"{name}: {e}")
            finally:
                # КРИТИЧЕСКИ ВАЖНО: всегда вызываем task_done()
                queue.task_done()
    except asyncio.CancelledError:
        print(f"{name} корректно завершён через отмену")
        raise


async def producer(queue: Queue[str | None], items: list[str]) -> None:
    """Производитель с обработкой отмены"""
    try:
        for item in items:
            await queue.put(item)
            print(f"Добавлен: {item}")
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        print("Производитель отменён")
        raise


async def main() -> None:
    # Ограниченная очередь для backpressure
    queue: Queue[str | None] = asyncio.Queue(maxsize=5)

    items = [f"item-{i}" for i in range(10)]
    items.append("error-item")  # элемент с ошибкой
    items.extend([f"item-{i}" for i in range(10, 15)])

    # Создаём воркеров
    num_workers = 3
    workers: list[Task[None]] = [
        asyncio.create_task(worker(f"Worker-{i}", queue)) for i in range(num_workers)
    ]

    # Запускаем производителя
    prod = asyncio.create_task(producer(queue, items))

    # Ждём завершения производства
    await prod

    # Ждём обработки всех элементов
    await queue.join()
    print("Все элементы обработаны")

    # Отправляем сигналы завершения
    for _ in workers:
        await queue.put(None)

    # Ждём корректного завершения воркеров
    await asyncio.gather(*workers, return_exceptions=True)
    print("Все воркеры завершены")


if __name__ == "__main__":
    asyncio.run(main())

Рекомендуемые программы

Завершено

0 / 10