Асинхронность в Python
Теория: Конкурентные шаблоны
Асинхронные программы становятся сложнее по мере роста числа задач, которые нужно выполнять одновременно. Когда мы запускаем несколько корутин, важно не только правильно их планировать, но и управлять потоком данных между ними, ограничивать нагрузку и избегать избыточного параллелизма. Для этого используются базовые конкурентные шаблоны — fan-out/fan-in, конвейеры корутин и ограничение параллелизма с Semaphore. Все эти приёмы работают внутри цикла событий asyncio и позволяют создавать надёжные и предсказуемые асинхронные приложения.
Fan-out и fan-in с asyncio.gather
Шаблон fan-out описывает ситуацию, когда одна корутина порождает множество подзадач, каждая из которых выполняется независимо. Это позволяет эффективно использовать асинхронность: пока одна задача ожидает завершения операции ввода-вывода (например, сетевого запроса), другие продолжают выполняться. Таким образом, мы получаем параллельное выполнение без использования потоков или процессов — всё происходит в рамках одного event loop.
Для создания нескольких задач одновременно в asyncio чаще всего используется функция asyncio.create_task(), которая планирует выполнение корутины и возвращает объект задачи (Task). Затем, для ожидания завершения всех этих задач, удобно применять функцию asyncio.gather(). Именно она является центральным инструментом шаблонов fan-out и fan-in.
asyncio.gather() принимает произвольное количество awaitable объектов и выполняет их конкурентно в рамках одного цикла событий, ожидая завершения всех. При этом возвращаемое значение представляет собой список результатов в том же порядке, в котором были переданы задачи. Это и есть этап fan-in — объединение результатов обратно в единый поток.
Рассмотрим пример:
В этом примере функция main() создаёт пять задач, каждая из которых выполняет загрузку данных. asyncio.gather(*tasks) запускает их одновременно и приостанавливает выполнение до тех пор, пока все задачи не завершатся. Результаты возвращаются в виде списка строк.
Если хотя бы одна задача завершится с ошибкой, asyncio.gather() немедленно выбрасывает это исключение, не дожидаясь завершения остальных задач. При этом другие задачи продолжают выполняться в фоне, но их результаты будут потеряны. Чтобы дождаться всех задач и получить все результаты (включая ошибки), можно указать параметр return_exceptions=True. В этом случае gather() ждёт завершения всех задач, а исключения возвращаются как обычные объекты в результирующем списке. Например:
Теперь программа с return_exceptions=True не прерывается из-за одной ошибки, а дожидается завершения всех задач и возвращает как успешные результаты, так и исключения. Это удобно, когда нужно обработать большое количество запросов и не допустить, чтобы единичный сбой привёл к потере результатов успешных операций.
Таким образом, asyncio.gather() выполняет две ключевые функции: во-первых, он реализует fan-out, позволяя запускать множество задач одновременно; во-вторых, он реализует fan-in, обеспечивая сбор результатов в один список (при условии успешного завершения всех задач или использования return_exceptions=True).
Ещё одно важное свойство gather() — сохранение порядка результатов. Даже если задачи завершаются в разное время, итоговый список соответствует исходному порядку вызовов, хотя фактическое выполнение происходит в произвольном порядке, определяемом циклом событий.
Раннее потребление результатов с asyncio.as_completed
Основное ограничение asyncio.gather() заключается в том, что он ждёт завершения всех задач перед возвратом результатов. Это означает, что даже если некоторые задачи завершились быстро, мы не можем начать обработку их результатов до тех пор, пока не завершатся все остальные задачи.
В сценариях, где важно получать результаты по мере их готовности и сразу начинать обработку, используется функция asyncio.as_completed(). Она возвращает итератор, который позволяет перебирать задачи в порядке их завершения, а не в порядке создания.
Рассмотрим пример:
В примере с gather() все загрузки выполняются параллельно, но обработка начинается только после завершения самой медленной загрузки. С as_completed() обработка каждого результата начинается сразу после его готовности, что снижает общее время выполнения.
Ключевое различие:
gather()— ждёт завершения всех задач, затем возвращает результаты в исходном порядкеas_completed()— возвращает корутины по мере завершения, порядок не гарантируется
Такой подход особенно эффективен при работе с задачами, имеющими сильно различающееся время выполнения, например при загрузке данных с серверов разной скорости или при обработке запросов с непредсказуемым временем отклика.
Измерение производительности асинхронных операций
При разработке асинхронных приложений важно понимать, где именно тратится время. Простая телеметрия помогает выявить узкие места и оптимизировать производительность. Ключевые метрики включают:
- Время в очереди — как долго задача ждала начала выполнения
- Время обработки — сколько заняло само выполнение задачи
- Общее время — от постановки в очередь до получения результата
Рассмотрим пример с базовой телеметрией:
Такая телеметрия позволяет:
- Выявить задачи, которые долго ждут в очереди (возможно, нужно увеличить лимит семафора)
- Обнаружить медленные операции обработки
- Оценить эффективность распределения нагрузки
В реальных приложениях эти метрики можно отправлять в системы мониторинга (Prometheus, Grafana) для анализа в реальном времени.
Конвейеры корутин
Когда данные проходят через несколько стадий обработки, удобно выстроить цепочку задач — асинхронный конвейер. Он похож на производственную линию: одна корутина производит данные, следующая их обрабатывает, а последняя сохраняет или передаёт дальше.
Представим, что нужно получать данные, фильтровать их и сохранять результат. Это можно сделать так:
Корутины соединены через очереди (будут рассмотрены в следующем уроке), что позволяет им работать независимо, но согласованно. Производитель создаёт элементы, обработчик их преобразует, а потребитель принимает результат. Такой подход гарантирует непрерывный поток данных и хорошо масштабируется: можно добавить несколько процессоров или потребителей, не меняя структуру кода.
Этот шаблон часто используется в асинхронных сервисах обработки событий, логов, данных из баз или сетевых потоков. Он помогает разграничить обязанности корутин и сделать код устойчивым к временным задержкам — если один этап временно замедлится, другие смогут работать с очередями, не блокируя общий процесс.
Ограничение параллелизма (Semaphore)
Иногда запуск большого количества задач одновременно приводит к перегрузке. Например, при 1000 сетевых запросах сервер может начать возвращать ошибки или программа — тратить слишком много ресурсов. Чтобы этого избежать, используется asyncio.Semaphore(), который ограничивает количество одновременно выполняющихся задач.
Семафор задаёт «максимальное число пропусков» для одновременного выполнения. Когда количество активных задач достигает этого порога, следующие задачи ждут, пока одна из текущих не завершится. Рассмотрим пример:
Здесь одновременно выполняются только три задачи, остальные ждут своей очереди. Это не только снижает нагрузку за счет ограничения одновременно выполняющихся задач, но и делает систему более предсказуемой — например, можно контролировать скорость обращений к внешнему API или базам данных.
Важное замечание: нужно учитывать, что все задачи создаются сразу и занимают память в ожидании освобождения семафора. При работе с тысячами задач следует учитывать потребление памяти и рассмотреть альтернативные подходы, такие как создание задач по мере освобождения слотов семафора.
Важно понимать, что async with sem: создаёт контекст, внутри которого выполняется ограниченная секция. Как только корутина выходит из этого блока, слот освобождается, и следующая задача может начать выполнение.
Семафоры особенно полезны, когда задачи совершают операции ввода-вывода: сетевые запросы, доступ к файлам или к внешним сервисам. В таких случаях ограничение параллелизма предотвращает перегрузку внешних систем и повышает стабильность приложения.
Механизмы backpressure
Backpressure (противодавление) — это механизм управления потоком данных, который предотвращает перегрузку системы, когда производитель генерирует данные быстрее, чем потребитель может их обработать.
В асинхронных системах backpressure обычно реализуется через:
Ограниченные очереди — очереди с максимальным размером, которые блокируют производителя при переполнении:
В этом примере производитель вынужден ждать, пока потребитель освободит место в очереди. Это предотвращает неограниченный рост памяти.
Связь шаблонов между собой
Fan-out и fan-in отвечают за распределение и сбор работы, конвейеры корутин — за последовательную передачу данных, а семафоры — за контроль интенсивности параллельных операций. Вместе они образуют основу конкурентного проектирования в asyncio.
Один из распространённых сценариев — объединение всех трёх подходов. Например, можно создать поток данных (fan-out), передавать его через конвейер обработки, а на каждом этапе ограничивать количество активных задач с помощью семафора. Такой подход позволяет эффективно использовать ресурсы и избегать перегрузки.
Пример комбинированного использования может выглядеть так:
В этом примере реализован упрощённый асинхронный конвейер с ограничением параллелизма. Одновременно выполняются не более четырёх загрузок, каждая из которых после завершения передаёт данные в следующую стадию обработки. Такой подход сохраняет баланс между скоростью и стабильностью.
Использование этих шаблонов делает асинхронные программы не только быстрее, но и структурно чище. Они позволяют строить системы, где каждая часть выполняет свою роль, а взаимодействие между ними остаётся предсказуемым и управляемым.
Контрольный список для конкурентных шаблонов
Всегда задавайте лимит параллелизма — используйте Semaphore для ограничения количества одновременно выполняющихся задач
Настройте механизмы backpressure — используйте ограниченные очереди (maxsize) или семафоры для предотвращения перегрузки
Выбирайте правильный инструмент сбора результатов — gather() для ожидания всех задач, as_completed() для обработки по мере готовности
Измеряйте производительность — отслеживайте время в очереди и время обработки для выявления узких мест
Обрабатывайте ошибки корректно — используйте return_exceptions=True в gather() для получения всех результатов, включая ошибки
Учитывайте потребление памяти — при создании тысяч задач сразу рассмотрите создание задач по мере освобождения ресурсов
Используйте именование задач — задавайте осмысленные имена через name для упрощения отладки
Пример правильного использования:
Рекомендуемые программы
Завершено
0 / 10

