Асинхронность в Python

Теория: Конкурентные шаблоны

Асинхронные программы становятся сложнее по мере роста числа задач, которые нужно выполнять одновременно. Когда мы запускаем несколько корутин, важно не только правильно их планировать, но и управлять потоком данных между ними, ограничивать нагрузку и избегать избыточного параллелизма. Для этого используются базовые конкурентные шаблоны — fan-out/fan-in, конвейеры корутин и ограничение параллелизма с Semaphore. Все эти приёмы работают внутри цикла событий asyncio и позволяют создавать надёжные и предсказуемые асинхронные приложения.

Fan-out и fan-in с asyncio.gather

Шаблон fan-out описывает ситуацию, когда одна корутина порождает множество подзадач, каждая из которых выполняется независимо. Это позволяет эффективно использовать асинхронность: пока одна задача ожидает завершения операции ввода-вывода (например, сетевого запроса), другие продолжают выполняться. Таким образом, мы получаем параллельное выполнение без использования потоков или процессов — всё происходит в рамках одного event loop.

Для создания нескольких задач одновременно в asyncio чаще всего используется функция asyncio.create_task(), которая планирует выполнение корутины и возвращает объект задачи (Task). Затем, для ожидания завершения всех этих задач, удобно применять функцию asyncio.gather(). Именно она является центральным инструментом шаблонов fan-out и fan-in.

asyncio.gather() принимает произвольное количество awaitable объектов и выполняет их конкурентно в рамках одного цикла событий, ожидая завершения всех. При этом возвращаемое значение представляет собой список результатов в том же порядке, в котором были переданы задачи. Это и есть этап fan-in — объединение результатов обратно в единый поток.

Рассмотрим пример:

import asyncio
import random


async def fetch_data(url: str) -> str:
    await asyncio.sleep(random.uniform(0.5, 1.5))
    print(f"Загружено: {url}")
    return f"Результат {url}"


async def main() -> None:
    urls = [f"https://example.com/{i}" for i in range(5)]
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]
    results = await asyncio.gather(*tasks)
    print("Все загрузки завершены:", results)


asyncio.run(main())

В этом примере функция main() создаёт пять задач, каждая из которых выполняет загрузку данных. asyncio.gather(*tasks) запускает их одновременно и приостанавливает выполнение до тех пор, пока все задачи не завершатся. Результаты возвращаются в виде списка строк.

Если хотя бы одна задача завершится с ошибкой, asyncio.gather() немедленно выбрасывает это исключение, не дожидаясь завершения остальных задач. При этом другие задачи продолжают выполняться в фоне, но их результаты будут потеряны. Чтобы дождаться всех задач и получить все результаты (включая ошибки), можно указать параметр return_exceptions=True. В этом случае gather() ждёт завершения всех задач, а исключения возвращаются как обычные объекты в результирующем списке. Например:

import asyncio


async def fetch_data(url: str) -> str:
    delay = 1 if "good" in url else 2
    await asyncio.sleep(delay)
    if "bad" in url:
        raise ValueError(f"Ошибка при загрузке {url}")
    return f"OK: {url}"


async def main() -> None:
    urls = ["good-1", "bad-2", "good-3"]
    print("Без return_exceptions:")
    try:
        tasks = [fetch_data(url) for url in urls]
        results = await asyncio.gather(*tasks)
    except ValueError as e:
        print(f"Получено исключение: {e}")
        print("good-3 не успела завершиться\n")

    print("С return_exceptions=True:")
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, r in enumerate(results):
        if isinstance(r, Exception):
            print(f"Результат {i}: Ошибка - {r}")
        else:
            print(f"Результат {i}: {r}")


asyncio.run(main())

Теперь программа с return_exceptions=True не прерывается из-за одной ошибки, а дожидается завершения всех задач и возвращает как успешные результаты, так и исключения. Это удобно, когда нужно обработать большое количество запросов и не допустить, чтобы единичный сбой привёл к потере результатов успешных операций.

Таким образом, asyncio.gather() выполняет две ключевые функции: во-первых, он реализует fan-out, позволяя запускать множество задач одновременно; во-вторых, он реализует fan-in, обеспечивая сбор результатов в один список (при условии успешного завершения всех задач или использования return_exceptions=True).

Ещё одно важное свойство gather()сохранение порядка результатов. Даже если задачи завершаются в разное время, итоговый список соответствует исходному порядку вызовов, хотя фактическое выполнение происходит в произвольном порядке, определяемом циклом событий.

Раннее потребление результатов с asyncio.as_completed

Основное ограничение asyncio.gather() заключается в том, что он ждёт завершения всех задач перед возвратом результатов. Это означает, что даже если некоторые задачи завершились быстро, мы не можем начать обработку их результатов до тех пор, пока не завершатся все остальные задачи.

В сценариях, где важно получать результаты по мере их готовности и сразу начинать обработку, используется функция asyncio.as_completed(). Она возвращает итератор, который позволяет перебирать задачи в порядке их завершения, а не в порядке создания.

Рассмотрим пример:

import asyncio
import random
import time


async def fetch_data(url: str) -> str:
    delay = random.uniform(0.5, 2.0)
    print(f"[{time.time():.2f}] Начало загрузки {url}, ожидание {delay:.2f}с")
    await asyncio.sleep(delay)
    print(f"[{time.time():.2f}] Завершена загрузка {url}")
    return f"Данные с {url}"


async def process_result(data: str) -> None:
    print(f"[{time.time():.2f}] Обработка: {data}")
    await asyncio.sleep(0.3)
    print(f"[{time.time():.2f}] Обработано: {data}")


async def main_with_gather() -> None:
    print("=== С asyncio.gather (ждём всех) ===")
    start = time.time()
    urls = [f"site-{i}" for i in range(5)]
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]

    results = await asyncio.gather(*tasks)

    # Обработка начинается только после завершения ВСЕХ загрузок
    for result in results:
        await process_result(result)

    print(f"Общее время: {time.time() - start:.2f}с\n")


async def main_with_as_completed() -> None:
    print("=== С asyncio.as_completed (обработка по мере готовности) ===")
    start = time.time()
    urls = [f"site-{i}" for i in range(5)]
    tasks = [asyncio.create_task(fetch_data(url)) for url in urls]

    # Обрабатываем результаты сразу по мере готовности
    for coro in asyncio.as_completed(tasks):
        result = await coro
        await process_result(result)

    print(f"Общее время: {time.time() - start:.2f}с\n")


asyncio.run(main_with_gather())
asyncio.run(main_with_as_completed())

В примере с gather() все загрузки выполняются параллельно, но обработка начинается только после завершения самой медленной загрузки. С as_completed() обработка каждого результата начинается сразу после его готовности, что снижает общее время выполнения.

Ключевое различие:

  • gather() — ждёт завершения всех задач, затем возвращает результаты в исходном порядке
  • as_completed() — возвращает корутины по мере завершения, порядок не гарантируется

Такой подход особенно эффективен при работе с задачами, имеющими сильно различающееся время выполнения, например при загрузке данных с серверов разной скорости или при обработке запросов с непредсказуемым временем отклика.

Измерение производительности асинхронных операций

При разработке асинхронных приложений важно понимать, где именно тратится время. Простая телеметрия помогает выявить узкие места и оптимизировать производительность. Ключевые метрики включают:

  • Время в очереди — как долго задача ждала начала выполнения
  • Время обработки — сколько заняло само выполнение задачи
  • Общее время — от постановки в очередь до получения результата

Рассмотрим пример с базовой телеметрией:

import asyncio
import time
from dataclasses import dataclass


@dataclass
class TaskMetrics:
    """Метрики выполнения задачи"""

    task_id: str
    created_at: float
    started_at: float | None = None
    completed_at: float | None = None

    @property
    def queue_time(self) -> float:
        """Время ожидания в очереди"""
        if self.started_at is None:
            return 0.0
        return self.started_at - self.created_at

    @property
    def processing_time(self) -> float:
        """Время обработки"""
        if self.started_at is None or self.completed_at is None:
            return 0.0
        return self.completed_at - self.started_at

    @property
    def total_time(self) -> float:
        """Общее время"""
        if self.completed_at is None:
            return 0.0
        return self.completed_at - self.created_at


async def fetch_with_metrics(url: str, sem: asyncio.Semaphore, metrics: TaskMetrics) -> str:
    """Загрузка данных с измерением метрик"""
    async with sem:
        metrics.started_at = time.time()
        await asyncio.sleep(0.5)
        metrics.completed_at = time.time()
        return f"Данные с {url}"


async def main() -> None:
    sem = asyncio.Semaphore(3)
    urls = [f"site-{i}" for i in range(10)]
    metrics_list = [TaskMetrics(task_id=url, created_at=time.time()) for url in urls]
    tasks = [
        asyncio.create_task(fetch_with_metrics(url, sem, metrics))
        for url, metrics in zip(urls, metrics_list)
    ]
    await asyncio.gather(*tasks)
    print("\n=== Метрики выполнения ===")
    for metrics in metrics_list:
        print(f"{metrics.task_id}:")
        print(f"  Время в очереди: {metrics.queue_time:.3f}с")
        print(f"  Время обработки: {metrics.processing_time:.3f}с")
        print(f"  Общее время: {metrics.total_time:.3f}с")
    avg_queue = sum((m.queue_time for m in metrics_list)) / len(metrics_list)
    avg_processing = sum((m.processing_time for m in metrics_list)) / len(metrics_list)
    print(f"\nСреднее время в очереди: {avg_queue:.3f}с")
    print(f"Среднее время обработки: {avg_processing:.3f}с")


asyncio.run(main())

Такая телеметрия позволяет:

  • Выявить задачи, которые долго ждут в очереди (возможно, нужно увеличить лимит семафора)
  • Обнаружить медленные операции обработки
  • Оценить эффективность распределения нагрузки

В реальных приложениях эти метрики можно отправлять в системы мониторинга (Prometheus, Grafana) для анализа в реальном времени.

Конвейеры корутин

Когда данные проходят через несколько стадий обработки, удобно выстроить цепочку задач — асинхронный конвейер. Он похож на производственную линию: одна корутина производит данные, следующая их обрабатывает, а последняя сохраняет или передаёт дальше.

Представим, что нужно получать данные, фильтровать их и сохранять результат. Это можно сделать так:

import asyncio


async def producer(queue: asyncio.Queue[int | None]) -> None:
    """Производитель данных"""
    for i in range(5):
        await asyncio.sleep(0.3)
        print(f"Создан элемент {i}")
        await queue.put(i)
    await queue.put(None)  # сигнал завершения


async def processor(
    queue_in: asyncio.Queue[int | None], queue_out: asyncio.Queue[int | None]
) -> None:
    """Обработчик данных"""
    while True:
        item = await queue_in.get()
        if item is None:
            await queue_out.put(None)
            break
        await asyncio.sleep(0.5)
        result = item * 2
        print(f"Обработан элемент {item}")
        await queue_out.put(result)


async def consumer(queue: asyncio.Queue[int | None]) -> None:
    """Потребитель результатов"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Получен результат {item}")


async def main() -> None:
    q1: asyncio.Queue[int | None] = asyncio.Queue()
    q2: asyncio.Queue[int | None] = asyncio.Queue()
    await asyncio.gather(producer(q1), processor(q1, q2), consumer(q2))


asyncio.run(main())

Корутины соединены через очереди (будут рассмотрены в следующем уроке), что позволяет им работать независимо, но согласованно. Производитель создаёт элементы, обработчик их преобразует, а потребитель принимает результат. Такой подход гарантирует непрерывный поток данных и хорошо масштабируется: можно добавить несколько процессоров или потребителей, не меняя структуру кода.

Этот шаблон часто используется в асинхронных сервисах обработки событий, логов, данных из баз или сетевых потоков. Он помогает разграничить обязанности корутин и сделать код устойчивым к временным задержкам — если один этап временно замедлится, другие смогут работать с очередями, не блокируя общий процесс.

Ограничение параллелизма (Semaphore)

Иногда запуск большого количества задач одновременно приводит к перегрузке. Например, при 1000 сетевых запросах сервер может начать возвращать ошибки или программа — тратить слишком много ресурсов. Чтобы этого избежать, используется asyncio.Semaphore(), который ограничивает количество одновременно выполняющихся задач.

Семафор задаёт «максимальное число пропусков» для одновременного выполнения. Когда количество активных задач достигает этого порога, следующие задачи ждут, пока одна из текущих не завершится. Рассмотрим пример:

import asyncio
import random


async def fetch(url: str, sem: asyncio.Semaphore) -> None:
    async with sem:
        print(f"Запуск запроса {url}")
        await asyncio.sleep(random.uniform(0.5, 1.5))
        print(f"Завершён запрос {url}")


async def main() -> None:
    sem = asyncio.Semaphore(3)
    tasks = [asyncio.create_task(fetch(f"url-{i}", sem)) for i in range(10)]
    await asyncio.gather(*tasks)


asyncio.run(main())

Здесь одновременно выполняются только три задачи, остальные ждут своей очереди. Это не только снижает нагрузку за счет ограничения одновременно выполняющихся задач, но и делает систему более предсказуемой — например, можно контролировать скорость обращений к внешнему API или базам данных.

Важное замечание: нужно учитывать, что все задачи создаются сразу и занимают память в ожидании освобождения семафора. При работе с тысячами задач следует учитывать потребление памяти и рассмотреть альтернативные подходы, такие как создание задач по мере освобождения слотов семафора.

Важно понимать, что async with sem: создаёт контекст, внутри которого выполняется ограниченная секция. Как только корутина выходит из этого блока, слот освобождается, и следующая задача может начать выполнение.

Семафоры особенно полезны, когда задачи совершают операции ввода-вывода: сетевые запросы, доступ к файлам или к внешним сервисам. В таких случаях ограничение параллелизма предотвращает перегрузку внешних систем и повышает стабильность приложения.

Механизмы backpressure

Backpressure (противодавление) — это механизм управления потоком данных, который предотвращает перегрузку системы, когда производитель генерирует данные быстрее, чем потребитель может их обработать.

В асинхронных системах backpressure обычно реализуется через:

Ограниченные очереди — очереди с максимальным размером, которые блокируют производителя при переполнении:

import asyncio


async def fast_producer(queue: asyncio.Queue[int | None]) -> None:
    """Быстрый производитель"""
    for i in range(20):
        print(f"Попытка добавить элемент {i}")
        await queue.put(i)  # Заблокируется, если очередь полна
        print(f"Добавлен элемент {i}")
    await queue.put(None)


async def slow_consumer(queue: asyncio.Queue[int | None]) -> None:
    """Медленный потребитель"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Обработка элемента {item}")
        await asyncio.sleep(0.5)  # медленная обработка


async def main() -> None:
    # Очередь с ограниченным размером
    queue: asyncio.Queue[int | None] = asyncio.Queue(maxsize=5)
    await asyncio.gather(fast_producer(queue), slow_consumer(queue))


asyncio.run(main())

В этом примере производитель вынужден ждать, пока потребитель освободит место в очереди. Это предотвращает неограниченный рост памяти.

Связь шаблонов между собой

Fan-out и fan-in отвечают за распределение и сбор работы, конвейеры корутин — за последовательную передачу данных, а семафоры — за контроль интенсивности параллельных операций. Вместе они образуют основу конкурентного проектирования в asyncio.

Один из распространённых сценариев — объединение всех трёх подходов. Например, можно создать поток данных (fan-out), передавать его через конвейер обработки, а на каждом этапе ограничивать количество активных задач с помощью семафора. Такой подход позволяет эффективно использовать ресурсы и избегать перегрузки.

Пример комбинированного использования может выглядеть так:

import asyncio
import random


async def fetch(url: str, sem: asyncio.Semaphore) -> str:
    async with sem:
        await asyncio.sleep(random.uniform(0.5, 1.0))
        print(f"Загружено {url}")
        return url


async def process(url: str) -> None:
    await asyncio.sleep(0.3)
    print(f"Обработано {url}")


async def main() -> None:
    sem = asyncio.Semaphore(4)
    urls = [f"url-{i}" for i in range(10)]

    async def worker(url: str) -> None:
        data = await fetch(url, sem)
        await process(data)

    await asyncio.gather(*(worker(url) for url in urls))


asyncio.run(main())

В этом примере реализован упрощённый асинхронный конвейер с ограничением параллелизма. Одновременно выполняются не более четырёх загрузок, каждая из которых после завершения передаёт данные в следующую стадию обработки. Такой подход сохраняет баланс между скоростью и стабильностью.

Использование этих шаблонов делает асинхронные программы не только быстрее, но и структурно чище. Они позволяют строить системы, где каждая часть выполняет свою роль, а взаимодействие между ними остаётся предсказуемым и управляемым.

Контрольный список для конкурентных шаблонов

Всегда задавайте лимит параллелизма — используйте Semaphore для ограничения количества одновременно выполняющихся задач

Настройте механизмы backpressure — используйте ограниченные очереди (maxsize) или семафоры для предотвращения перегрузки

Выбирайте правильный инструмент сбора результатовgather() для ожидания всех задач, as_completed() для обработки по мере готовности

Измеряйте производительность — отслеживайте время в очереди и время обработки для выявления узких мест

Обрабатывайте ошибки корректно — используйте return_exceptions=True в gather() для получения всех результатов, включая ошибки

Учитывайте потребление памяти — при создании тысяч задач сразу рассмотрите создание задач по мере освобождения ресурсов

Используйте именование задач — задавайте осмысленные имена через name для упрощения отладки

Пример правильного использования:

import asyncio
import time
from dataclasses import dataclass


@dataclass
class Config:
    """Конфигурация для управления конкурентностью"""

    max_concurrent: int = 5
    queue_size: int = 10
    timeout: float = 30.0


async def fetch_with_limits(url: str, sem: asyncio.Semaphore, config: Config) -> str:
    """Загрузка с ограничениями и таймаутом"""
    async with sem:
        try:
            async with asyncio.timeout(config.timeout):
                await asyncio.sleep(0.5)
                return f"Данные с {url}"
        except asyncio.TimeoutError:
            return f"Таймаут для {url}"


async def main() -> None:
    config = Config(max_concurrent=3, queue_size=5)
    sem = asyncio.Semaphore(config.max_concurrent)

    urls = [f"site-{i}" for i in range(10)]

    # Используем as_completed для ранней обработки
    tasks = [
        asyncio.create_task(fetch_with_limits(url, sem, config), name=f"fetch_{url}")
        for url in urls
    ]

    results = []
    for coro in asyncio.as_completed(tasks):
        try:
            result = await coro
            results.append(result)
            print(f"Обработан результат: {result}")
        except Exception as e:
            print(f"Ошибка: {e}")

    print(f"\nВсего обработано результатов: {len(results)}")


if __name__ == "__main__":
    asyncio.run(main())

Рекомендуемые программы

Завершено

0 / 10