Асинхронность в Python

Теория: Таймауты и отмена задач

В асинхронных приложениях важно не позволять задачам выполняться бесконечно. Если операция зависает из-за сетевых проблем или длительных вычислений, программа не должна ждать бесконечно. Для этого используются таймауты и механизмы отмены задач. Они позволяют завершать операции по истечении заданного времени и гарантировать, что ресурсы будут освобождены.

Контекстный менеджер asyncio.timeout

Требование версии: Функциональность asyncio.timeout() доступна начиная с Python 3.11. Для более ранних версий используйте asyncio.wait_for(), описанный ниже.

Основным инструментом для задания ограничения времени является контекстный менеджер asyncio.timeout(). Он принимает обязательный параметр — количество секунд, отведённых на выполнение блока кода. Если операция внутри контекстного менеджера не завершается за отведенное время, то в точке await будет выброшено исключение asyncio.TimeoutError.

import asyncio


async def slow_task() -> str:
    await asyncio.sleep(5)
    return "готово"


async def main() -> None:
    try:
        async with asyncio.timeout(2):
            result = await slow_task()
            print(result)
    except asyncio.TimeoutError:
        print("Задача прервана по таймауту")


asyncio.run(main())
# => Задача прервана по таймауту

В этом примере задача slow_task() рассчитана на 5 секунд, но с помощью asyncio.timeout() мы даём ей только 2 секунды. Как только время выходит, выбрасывается исключение и программа переходит в блок except.

Контекстный менеджер удобен тем, что позволяет локально задать время для конкретного участка кода, не влияя на остальные задачи в приложении.

У asyncio.timeout() нет дополнительных параметров — передаётся только лимит времени в секундах. Это делает его простым и предсказуемым инструментом.

Когда таймаут истекает, asyncio.timeout() не выбрасывает TimeoutError напрямую. Вместо этого контекстный менеджер отменяет задачу, вызывая её метод cancel(). Это приводит к тому, что в задачу отправляется исключение CancelledError, которое поднимается при следующем операторе await. Задача получает возможность обработать отмену, освободить ресурсы и корректно завершиться.

После того как CancelledError всплывает из задачи, контекстный менеджер asyncio.timeout() перехватывает его и преобразует в TimeoutError. Это видно в исходном коде asyncio: if issubclass(exc_type, exceptions.CancelledError): raise TimeoutError from exc_val. Такое преобразование позволяет внешнему коду точно понять причину прерывания — это был именно таймаут, а не какая-то другая причина отмены. При этом задача внутри всё ещё видит CancelledError и может выполнить необходимую очистку.

import asyncio


async def long_operation() -> str:
    try:
        print("Операция начата")
        await asyncio.sleep(5)
        print("Операция завершена успешно")
        return "результат"
    except asyncio.CancelledError:
        print("Задача получила CancelledError при отмене")
        # Здесь можно освободить ресурсы
        raise  # Пробрасываем в asyncio.timeout()


async def main() -> None:
    try:
        async with asyncio.timeout(2):
            result = await long_operation()
            print(f"Получен результат: {result}")
    except asyncio.TimeoutError:
        print("Контекстный менеджер преобразовал CancelledError в TimeoutError")


asyncio.run(main())
# => Операция начата
# => Задача получила CancelledError при отмене
# => Контекстный менеджер преобразовал CancelledError в TimeoutError

Такое разделение ответственности делает код более надёжным: задача может корректно завершить работу через обработку CancelledError, а вызывающий код получает явный сигнал о таймауте через TimeoutError. Подробнее о механизме отмены и правильной обработке CancelledError рассказано в материале «Задачи и структурная конкурентность».

Альтернатива для Python 3.10 и ниже: asyncio.wait_for

Если вы используете Python версии ниже 3.11, используйте функцию asyncio.wait_for(). Она принимает корутину и время ожидания в секундах, возвращая результат или выбрасывая asyncio.TimeoutError:

import asyncio


async def slow_task() -> str:
    await asyncio.sleep(5)
    return "готово"


async def main() -> None:
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("Задача прервана по таймауту")


asyncio.run(main())
# => Задача прервана по таймауту

Основное отличие от asyncio.timeout()wait_for() работает с одной корутиной, а не с блоком кода. Для ограничения нескольких последовательных операций потребуется оборачивать их в отдельную корутину:

import asyncio


async def step1() -> None:
    await asyncio.sleep(1)
    print("Шаг 1 завершён")


async def step2() -> None:
    await asyncio.sleep(1)
    print("Шаг 2 завершён")


async def all_steps() -> None:
    await step1()
    await step2()


async def main() -> None:
    try:
        await asyncio.wait_for(all_steps(), timeout=3)
    except asyncio.TimeoutError:
        print("Не все шаги успели выполниться")


asyncio.run(main())

В Python 3.11+ рекомендуется использовать asyncio.timeout(), так как он более удобен для работы с несколькими операциями и лучше интегрируется с контекстными менеджерами.

Несколько уровней таймаутов

В реальных приложениях часто нужно ограничивать время выполнения не только для всей операции, но и для отдельных её частей. Например, общий запрос пользователя не должен длиться больше 10 секунд, а отдельный шаг, связанный с обращением к внешнему сервису, должен завершиться за 3 секунды. Для этого можно вкладывать контекстные менеджеры asyncio.timeout() друг в друга.

import asyncio


async def fetch_data() -> None:
    await asyncio.sleep(3)


async def process_data() -> None:
    await asyncio.sleep(2)


async def main() -> None:
    try:
        async with asyncio.timeout(5):  # общий таймаут
            async with asyncio.timeout(2):  # локальный таймаут для fetch_data
                await fetch_data()
            await process_data()
            print("Все операции выполнены вовремя")
    except asyncio.TimeoutError:
        print("Превышен лимит времени выполнения")


asyncio.run(main())
# => Превышен лимит времени выполнения

В этом коде общий лимит на всю задачу — 5 секунд, а для шага fetch_data() выделено только 2. Если он не успеет завершиться за это время, программа не будет ждать остальные шаги. Если первый шаг успеет завершиться, оставшееся время глобального таймаута можно будет использовать для последующих шагов.

При вложенных таймаутах сработает тот, чей лимит истечет первым. Если внутренний таймаут истекает, это также прерывает выполнение внешнего блока.

Важно помнить, что таймауты срабатывают только когда управление возвращается в цикл событий. Длительные синхронные вычисления без await могут блокировать выполнение и предотвращать срабатывание таймаута.

Такая вложенность позволяет гибко управлять разными частями асинхронных процессов и избегать зависаний на отдельных шагах.

Для Python 3.10 и ниже вложенные таймауты можно реализовать через комбинацию wait_for() и промежуточных корутин, но это менее удобно.

Гарантированное освобождение ресурсов

Когда задача прерывается по таймауту, важно правильно освободить ресурсы: закрыть файлы, разорвать соединения, освободить память. Если этого не сделать, могут возникнуть утечки памяти или блокировка системных ресурсов.

Для этого в асинхронном коде чаще всего используют блок finally. Этот блок выполняется независимо от того, завершилась задача успешно или была прервана. Его удобно использовать для ручного закрытия ресурсов.

import asyncio


async def worker() -> None:
    resource = "открытый ресурс"
    try:
        await asyncio.sleep(5)
        print("Задача завершена")
    finally:
        print(f"Закрытие: {resource}")


async def main() -> None:
    try:
        async with asyncio.timeout(2):
            await worker()
    except asyncio.TimeoutError:
        print("Таймаут истёк")


asyncio.run(main())
# => Закрытие: открытый ресурс
# => Таймаут истёк

В этом примере задача worker() не успевает завершиться за 2 секунды и прерывается по таймауту, но блок finally всё равно выполняется и освобождает ресурс. Такой способ хорошо работает для простых случаев, но в реальных приложениях часто приходится работать с более сложными объектами — сетевыми соединениями, файлами или пулами подключений. В таких ситуациях гораздо удобнее использовать контекстные менеджеры.

Контекстные менеджеры async with позволяют автоматически управлять ресурсами и упрощают код. При их использовании не нужно вручную следить за закрытием ресурсов: они будут корректно освобождены, даже если задача была отменена или завершилась с ошибкой. Это особенно важно при работе с внешними системами, где некорректное освобождение ресурсов может привести к утечкам памяти или блокировкам соединений.

Рассмотрим упрощённый пример с имитацией сетевого соединения:

import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager


@asynccontextmanager
async def network_connection(url: str) -> AsyncGenerator[str, None]:
    """Имитация работы с сетевым соединением"""
    print(f"Открытие соединения с {url}")
    connection = f"connection_to_{url}"
    try:
        yield connection
    finally:
        print(f"Закрытие соединения с {url}")
        await asyncio.sleep(0.1)  # имитация очистки


async def fetch_data(url: str) -> str:
    async with network_connection(url) as conn:
        print(f"Получение данных через {conn}")
        await asyncio.sleep(5)  # долгая операция
        return "данные"


async def main() -> None:
    try:
        async with asyncio.timeout(2):
            data = await fetch_data("example.com")
            print(f"Получено: {data}")
    except asyncio.TimeoutError:
        print("Загрузка прервана по таймауту")


asyncio.run(main())
# => Открытие соединения с example.com
# => Получение данных через connection_to_example.com
# => Закрытие соединения с example.com
# => Загрузка прервана по таймауту

В этом примере контекстный менеджер network_connection() имитирует работу с сетевым соединением. Когда срабатывает таймаут, блок finally внутри контекстного менеджера гарантированно выполняется, закрывая соединение.

Для реальной работы с HTTP-запросами в дальнейших уроках мы будем использовать библиотеку aiohttp, которая предоставляет готовые асинхронные контекстные менеджеры для управления сессиями и соединениями. Принцип работы будет аналогичным — контекстный менеджер автоматически освободит ресурсы при любом исходе операции, включая таймаут.

Контрольный список при работе с таймаутами

Используйте asyncio.timeout() для Python 3.11+ — это современный и удобный способ ограничения времени выполнения

Для Python 3.10 и ниже используйте asyncio.wait_for() — он работает аналогично, но менее удобен для нескольких операций

Никогда не подавляйте CancelledError без веской причины — это исключение критически важно для корректной отмены задач

Всегда пробрасывайте CancelledError после обработки — используйте raise в блоке except asyncio.CancelledError

Освобождайте ресурсы в блоке finally — это гарантирует очистку при любом исходе операции

Используйте асинхронные контекстные менеджеры для сложных ресурсов — они автоматически управляют жизненным циклом соединений, файлов и других объектов

Помните, что таймауты не прерывают синхронный код — длительные вычисления без await будут выполняться до конца

Пример правильного использования:

import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager


@asynccontextmanager
async def database_connection() -> AsyncGenerator[str, None]:
    """Контекстный менеджер для работы с базой данных"""
    print("Открытие соединения с БД")
    conn = "db_connection"
    try:
        yield conn
    finally:
        print("Закрытие соединения с БД")
        await asyncio.sleep(0.1)


async def fetch_user_data(user_id: int) -> dict[str, str]:
    """Получение данных пользователя с таймаутом"""
    try:
        async with asyncio.timeout(5):
            async with database_connection() as conn:
                print(f"Запрос к БД через {conn} для пользователя {user_id}")
                await asyncio.sleep(2)  # имитация запроса
                return {"id": str(user_id), "name": "User"}
    except asyncio.CancelledError:
        print("Операция отменена, ресурсы освобождены")
        raise  # Важно: пробрасываем исключение дальше


async def main() -> None:
    try:
        user = await fetch_user_data(123)
        print(f"Получены данные: {user}")
    except asyncio.TimeoutError:
        print("Превышено время ожидания ответа от БД")


if __name__ == "__main__":
    asyncio.run(main())

Рекомендуемые программы

Завершено

0 / 10