Асинхронность в Python

Теория: Работа с сетью

Асинхронные HTTP-клиенты (aiohttp, httpx)

aiohttp

Работа с сетью — одно из главных применений асинхронности. Когда приложение делает запросы к веб-серверам, оно большую часть времени ждёт ответа. В таких сценариях синхронные решения могут работать медленно, так как блокируют выполнение программы.

Асинхронный HTTP-клиент позволяет отправлять десятки и сотни запросов одновременно, не блокируя выполнение программы. В Python самым распространённым инструментом для этого является библиотека aiohttp.

Она построена на asyncio и поддерживает все принципы асинхронного программирования. Библиотека позволяет выполнять HTTP-запросы без блокировки, управлять соединениями, заголовками, телом запроса, обрабатывать ответы и даже работать с потоковыми данными. Главная идея — каждый сетевой вызов является корутиной, и выполнение других задач не приостанавливается во время ожидания отклика сервера.

Для отправки запроса в aiohttp используется клиентская сессия — объект, который управляет подключениями и хранит общие параметры. Сессия переиспользует TCP-сокеты к одному хосту, поэтому важно создавать одну сессию на несколько запросов. Она создаётся при помощи async with, что гарантирует корректное закрытие соединений после завершения работы. Такой подход позволяет избежать утечек ресурсов и делает код безопасным.

Простейший пример:

import asyncio

import aiohttp


async def main() -> None:
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get") as response:
            data: str = await response.text()
            print(data[:100])  # Выводим первые 100 символов


asyncio.run(main())

Когда выполняется session.get(), создаётся запрос, который не блокирует цикл событий (event loop). Корутина await response.text() возвращает управление до тех пор, пока сервер не пришлёт ответ. Как только данные получены, они преобразуются в строку.

Помимо .text(), у объекта ответа есть методы .json() и .read(). Первый возвращает данные как Python-объект, если сервер ответил JSON-структурой, второй — необработанные байты, что удобно для скачивания файлов или изображений.

Пример с обработкой JSON-ответа:

import asyncio

import aiohttp


async def get_json() -> None:
    async with aiohttp.ClientSession() as session:
        async with session.get("https://jsonplaceholder.typicode.com/todos/1") as resp:
            data: dict = await resp.json()
            print(data)
            # => {'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}


asyncio.run(get_json())

Работа с aiohttp не ограничивается только GET-запросами. Для отправки данных используется метод post(), который принимает аргумент data или json. Например, чтобы передать JSON-тело на сервер:

import asyncio

import aiohttp


async def send_data() -> None:
    async with aiohttp.ClientSession() as session:
        payload: dict[str, str | int] = {"name": "Alice", "age": 25}
        try:
            async with session.post("https://httpbin.org/post", json=payload) as resp:
                # Проверяем статус ответа
                if resp.status == 200:
                    data: dict = await resp.json()
                    print(data["json"])
                    # => {'name': 'Alice', 'age': 25}
                else:
                    print(f"Ошибка: статус {resp.status}")
        except aiohttp.ClientError as e:
            print(f"Ошибка соединения: {e}")


asyncio.run(send_data())

Использование параметра json позволяет aiohttp автоматически преобразовать объект Python в JSON и установить правильный заголовок Content-Type: application/json. При необходимости можно добавить собственные заголовки, например токен авторизации или ключ API, передав их через параметр headers. Для добавления query-параметров используется аргумент params, который автоматически кодирует словарь в строку запроса.

Асинхронность aiohttp позволяет обрабатывать большие объёмы данных потоково. Иногда ответ слишком большой, чтобы загружать его целиком в память, например при скачивании видеофайлов или логов. В этом случае можно читать поток частями с помощью response.content.iter_chunked():

import asyncio

import aiohttp


async def download(url: str, filename: str) -> None:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            with open(filename, "wb") as f:
                async for chunk in resp.content.iter_chunked(8):
                    f.write(chunk)
            print(f"Файл {filename} загружен")


asyncio.run(download("https://httpbin.org/bytes/80", "file.bin"))

Здесь файл записывается по мере поступления данных, без ожидания загрузки всего ответа. Это не только экономит память, но и позволяет обрабатывать поток в реальном времени.

httpx

Асинхронный HTTP-клиент httpx — это современная библиотека для работы с сетью, которая сочетает простоту requests и преимущества асинхронного программирования. В отличие от aiohttp, где интерфейс более низкоуровневый и близок к самому asyncio, httpx предлагает лаконичный и привычный API, похожий на классический requests, но с поддержкой async/await. Это делает его удобным инструментом для построения асинхронных приложений, микросервисов и API-клиентов.

Главное преимущество httpxуниверсальность: библиотека может работать как в синхронном, так и в асинхронном режиме. Это значит, что код, написанный для requests, может быть легко адаптирован под асинхронное выполнение без полной переработки. Если используется asyncio, то просто нужно создавать AsyncClient вместо обычного клиента, а вызовы методов выполнять через await. Такой подход снижает порог вхождения и позволяет быстро переходить от синхронных HTTP-запросов к конкурентной работе с сетью.

Работа с httpx начинается с создания клиента. Как и в aiohttp, используется контекст async with:

import asyncio

import httpx


async def main() -> None:
    async with httpx.AsyncClient() as client:
        response: httpx.Response = await client.get("https://httpbin.org/get")
        print(response.text[:100])  # Выводим первые 100 символов


asyncio.run(main())

Результат запроса представлен объектом Response с теми же свойствами, что и в requests.

import asyncio

import httpx


async def get_json() -> None:
    async with httpx.AsyncClient() as client:
        response: httpx.Response = await client.get(
            "https://jsonplaceholder.typicode.com/todos/1"
        )
        data: dict = response.json()  # в httpx .json() синхронный, await не нужен
        print(data["title"])
        # => delectus aut autem


asyncio.run(get_json())

Параллельное выполнение запросов реализуется так же, как в aiohttp, через asyncio.gather().

Одно из преимуществ httpx заключается в том, что он изначально построен поверх стандарта HTTP/1.1 и поддерживает HTTP/2 без дополнительных расширений. Это значит, что при необходимости клиент может использовать мультиплексирование соединений — несколько запросов через одно TCP-соединение, что особенно эффективно при обращении к одному серверу.

Кроме стандартных GET-запросов, httpx поддерживает все остальные методы HTTP: POST, PUT, DELETE, PATCH, OPTIONS. Для передачи данных используется параметр data, а для отправки JSON-структур — json:

import asyncio

import httpx


async def post_data() -> None:
    async with httpx.AsyncClient(timeout=30.0) as client:
        payload: dict[str, str | int] = {"name": "Alice", "age": 30}
        try:
            response: httpx.Response = await client.post(
                "https://httpbin.org/post", json=payload
            )
            response.raise_for_status()
            print(response.json()["json"])
            # => {'name': 'Alice', 'age': 30}
        except httpx.HTTPError as e:
            print(f"Ошибка HTTP: {e}")


asyncio.run(post_data())

Заголовки можно передать через параметр headers как при инициализации клиента, так и при каждом запросе:

import asyncio

import httpx


async def authorized_request() -> None:
    headers: dict[str, str] = {"Authorization": "Bearer mytoken123"}
    async with httpx.AsyncClient(headers=headers) as client:
        r: httpx.Response = await client.get("https://httpbin.org/headers")
        print(r.json()["headers"]["Authorization"])
        # => Bearer mytoken123


asyncio.run(authorized_request())

httpx также поддерживает работу с параметрами запроса. Для добавления query-параметров используется аргумент params, который автоматически кодирует словарь в строку запроса:

import asyncio

import httpx


async def get_params() -> None:
    async with httpx.AsyncClient() as client:
        r: httpx.Response = await client.get(
            "https://httpbin.org/get", params={"q": "python", "page": 2}
        )
        print(r.json()["args"])
        # => {'q': 'python', 'page': '2'}


asyncio.run(get_params())

При работе с большими ответами, например при загрузке файлов, важно не загружать всё содержимое в память сразу. В httpx можно читать поток данных по частям через aiter_bytes(). Это даёт возможность сохранять файл постепенно, не блокируя другие операции:

import asyncio

import httpx


async def download_file() -> None:
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", "https://httpbin.org/bytes/10240") as response:
            with open("file.bin", "wb") as f:
                async for chunk in response.aiter_bytes():
                    f.write(chunk)
            print("Файл загружен")


asyncio.run(download_file())

Режим stream() позволяет считывать данные последовательно, не загружая весь ответ в память.

Обработка ошибок в httpx проста и безопасна. Все сетевые исключения наследуются от базового httpx.RequestError, что упрощает их перехват. Если сервер вернул код ошибки, например 404 или 500, библиотека не выбрасывает исключение по умолчанию, но можно вызвать response.raise_for_status(), чтобы явно проверить корректность ответа:

import asyncio

import httpx


async def safe_request() -> None:
    async with httpx.AsyncClient() as client:
        try:
            r: httpx.Response = await client.get("https://httpbin.org/status/404")
            r.raise_for_status()
        except httpx.HTTPStatusError as e:
            print(f"Ошибка {e.response.status_code}: {e.request.url}")
            # => Ошибка 404: https://httpbin.org/status/404


asyncio.run(safe_request())

Клиент в httpx поддерживает пул соединений, переиспользуя TCP-соединения к одному серверу. Подробнее об этом в следующем разделе.

Таймауты и пулы соединений

Таймауты

В httpx таймауты задаются через объект httpx.Timeout. Он позволяет указать отдельные значения для разных стадий сетевого обмена. Например, connect — время ожидания установки TCP-соединения, read — время ожидания чтения данных, write — отправки тела запроса, а pool — время ожидания свободного соединения из пула:

import asyncio

import httpx


async def main() -> None:
    timeout: httpx.Timeout = httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=10.0)
    async with httpx.AsyncClient(timeout=timeout) as client:
        response: httpx.Response = await client.get("https://httpbin.org/delay/1")
        print(response.status_code)
        # => 200


asyncio.run(main())

Если указать только одно значение timeout=5.0, оно будет применяться ко всем этапам сразу. Таймаут срабатывает не мгновенно, а при следующей операции ввода-вывода. Это значит, что если сервер уже прислал часть данных, клиент не оборвёт соединение до тех пор, пока не произойдёт попытка чтения или записи. Важно понимать, что внутри клиента нет жёсткого обрыва сокета — это логическая отмена операции, выполняемая через event loop.

Пулы соединений

Каждый HTTP-запрос требует открытия TCP-соединения. Установка соединения — затратная операция, особенно при HTTPS, где добавляется TLS-рукопожатие. Чтобы не открывать новые соединения при каждом запросе, клиент хранит их в пуле и переиспользует. В httpx пул создаётся автоматически внутри AsyncClient. Он управляет количеством одновременных соединений и временем их жизни. Если запросов поступает больше, чем свободных соединений, клиент ждёт освобождения слота в пуле.

Параметр Limits позволяет контролировать поведение пула. Например, max_connections задаёт общее число соединений, а max_keepalive_connections ограничивает количество неактивных соединений, которые можно переиспользовать:

import asyncio

import httpx


async def main() -> None:
    limits: httpx.Limits = httpx.Limits(max_connections=100, max_keepalive_connections=20)
    async with httpx.AsyncClient(limits=limits) as client:
        tasks = [client.get("https://httpbin.org/get") for _ in range(20)]
        results: list[httpx.Response] = await asyncio.gather(*tasks)
        print([r.status_code for r in results])
        # => [200, 200, 200, ...]


asyncio.run(main())

Если пул заполнен, ожидание свободного соединения ограничивается таймаутом pool — если клиент не получил соединение вовремя, выбрасывается PoolTimeout. В примере выше мы увеличили размер пула до 100 соединений, чтобы 20 параллельных запросов могли выполниться без ожидания.

В aiohttp концепция похожа, но реализация отличается. Клиент использует объект TCPConnector, который управляет соединениями и кешем keep-alive. Таймауты задаются через aiohttp.ClientTimeout:

import asyncio

import aiohttp


async def main() -> None:
    timeout: aiohttp.ClientTimeout = aiohttp.ClientTimeout(total=30, connect=10, sock_read=20)
    connector: aiohttp.TCPConnector = aiohttp.TCPConnector(limit=100, ttl_dns_cache=300)
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        async with session.get("https://httpbin.org/get") as resp:
            print(resp.status)
            # => 200


asyncio.run(main())

Параметр limit задаёт общее количество открытых соединений, ttl_dns_cache управляет временем хранения DNS-записей. Если сервер использует балансировку по DNS, этот параметр помогает не делать повторные запросы к DNS при каждом подключении. Таймаут total в aiohttp определяет максимальное время всей операции, включая все стадии. Это удобно для простых сценариев, где не нужно разделять время ожидания по этапам.

При большом количестве параллельных запросов управление пулом становится особенно важным. Если пул слишком маленький, клиенты начнут ждать и общая производительность снизится. Если слишком большой — возникнет нагрузка на сеть и на сервер, а память приложения будет занята неиспользуемыми соединениями. В асинхронных программах оптимальный размер пула подбирается экспериментально, исходя из числа одновременно выполняемых задач и скорости ответов сервера.

Также важно понимать, что соединения в пуле могут устаревать. Сервер может их закрыть по таймауту keep-alive, а клиент не всегда узнает об этом сразу. Поэтому библиотеки сами выполняют проверку соединения перед повторным использованием. Если соединение закрыто, оно автоматически пересоздаётся. Этот процесс невидим для пользователя, но может влиять на задержку при первом запросе после простоя.

Некоторые приложения, например API-сервисы, требуют строгого контроля таймаутов, чтобы не держать открытые соединения дольше определённого времени. В httpx можно задавать таймауты и пулы соединений отдельно для каждого клиента, что удобно при работе с несколькими источниками данных:

import asyncio

import httpx


async def main() -> None:
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(10.0), limits=httpx.Limits(max_connections=20)
    ) as fast_api:
        async with httpx.AsyncClient(
            timeout=httpx.Timeout(30.0), limits=httpx.Limits(max_connections=5)
        ) as slow_api:
            result_fast: httpx.Response = await fast_api.get("https://httpbin.org/get")
            result_slow: httpx.Response = await slow_api.get("https://httpbin.org/delay/2")
            print(result_fast.status_code, result_slow.status_code)
            # => 200 200


asyncio.run(main())

Такая настройка позволяет гибко распределять ресурсы: быстрые API получают короткие таймауты и больше соединений, а медленные — меньше. Это защищает event loop от блокировки на долгих запросах и сохраняет отзывчивость приложения.

Повторные попытки с exponential backoff и jitter

При работе с сетью неизбежны временные сбои: сервер может быть перегружен, сеть — нестабильна, или возникнет временная недоступность. Вместо немедленного отказа лучше повторить запрос через некоторое время. Однако простые retry-механизмы могут усугубить проблему, создавая эффект «грозы повторных запросов» (retry storm), когда множество клиентов одновременно повторяют запросы, ещё больше перегружая сервер.

Exponential backoff (экспоненциальная задержка) решает эту проблему, увеличивая паузу между попытками: первая попытка через 1 секунду, вторая — через 2, третья — через 4, и так далее. Это даёт серверу время восстановиться.

Jitter (случайный разброс) добавляет случайную составляющую к задержке, чтобы избежать синхронизации запросов от разных клиентов. Без jitter все клиенты, начавшие retry одновременно, снова обратятся к серверу в один момент.

Пример реализации:

import asyncio
import logging
import random

import httpx

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger(__name__)


async def fetch_with_retry(
    url: str, max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0
) -> httpx.Response:
    """Выполняет запрос с exponential backoff и jitter

    Args:
        url: URL для запроса
        max_retries: Максимальное число попыток
        base_delay: Базовая задержка в секундах
        max_delay: Максимальная задержка в секундах

    Returns:
        HTTP ответ

    Raises:
        httpx.HTTPError: После исчерпания всех попыток
    """
    attempt = 0

    async with httpx.AsyncClient(timeout=30.0) as client:
        while attempt < max_retries:
            try:
                logger.info(f"Попытка {attempt + 1}/{max_retries}: GET {url}")
                response = await client.get(url)
                response.raise_for_status()
                logger.info(f"Успешный ответ: {response.status_code}")
                return response

            except (httpx.RequestError, httpx.HTTPStatusError) as e:
                attempt += 1

                if attempt >= max_retries:
                    logger.error(f"Исчерпаны все попытки для {url}: {e}")
                    raise

                # Exponential backoff: 2^attempt * base_delay
                delay = min(base_delay * (2 ** (attempt - 1)), max_delay)

                # Добавляем jitter: ±25% от delay
                jitter = delay * (0.75 + random.random() * 0.5)

                logger.warning(
                    f"Ошибка {type(e).__name__}: {e}. "
                    f"Повтор через {jitter:.2f}с (попытка {attempt}/{max_retries})"
                )

                await asyncio.sleep(jitter)

    raise httpx.RequestError("Не удалось выполнить запрос")


async def main() -> None:
    try:
        # Симулируем нестабильный endpoint
        response = await fetch_with_retry("https://httpbin.org/status/500")
        print(f"Результат: {response.status_code}")
    except httpx.HTTPError as e:
        print(f"Финальная ошибка: {e}")


asyncio.run(main())

Ключевые моменты:

  • Задержка растёт экспоненциально: 1с, 2с, 4с, 8с, 16с...
  • Jitter добавляет случайность ±25%, чтобы избежать синхронизации
  • Максимальная задержка ограничена max_delay, чтобы не ждать слишком долго
  • Все попытки логируются для последующего анализа

WebSocket: поддержание соединения и переподключение

WebSocket устанавливает постоянное соединение для двустороннего обмена сообщениями в реальном времени.

Библиотека aiohttp предоставляет удобный асинхронный интерфейс для работы с WebSocket. После установления соединения с помощью метода session.ws_connect() создаётся объект ClientWebSocketResponse, через который можно как отправлять, так и получать сообщения. Работа с ним полностью асинхронная, поэтому для чтения используется await ws.receive(), а для отправки — await ws.send_str() или аналогичные методы для передачи бинарных данных.

Рассмотрим простой пример клиента, который подключается к серверу, отправляет сообщения и обрабатывает ответы:

import asyncio

import aiohttp


async def websocket_client() -> None:
    async with aiohttp.ClientSession() as session:
        try:
            async with session.ws_connect("wss://ws.postman-echo.com/raw") as ws:
                await ws.send_str("Привет, сервер!")
                msg: aiohttp.WSMessage = await ws.receive()
                print("Ответ:", msg.data)
                # => Ответ: Привет, сервер!
                await ws.close()
        except aiohttp.ClientError as e:
            print(f"Ошибка подключения: {e}")


asyncio.run(websocket_client())

Метод ws.receive() возвращает объект сообщения, в котором может быть текст, бинарные данные или уведомление о закрытии. Такой код корректен для базовой работы, но в реальных приложениях нужно уметь восстанавливаться после обрыва соединения и повторно подключаться.

Корректное управление сессией при ошибках

Критическая проблема: при ошибках подключения сессия может остаться незакрытой, что приводит к утечкам ресурсов. Необходимо всегда гарантировать закрытие сессии, даже при исключениях.

Плохой пример — сессия может не закрыться:

import asyncio

import aiohttp


async def bad_retry(url: str) -> None:
    """ПЛОХО: сессия может остаться незакрытой"""
    session = aiohttp.ClientSession()  # Создаём вне try
    try:
        ws = await session.ws_connect(url)
        # Работа с ws...
    except aiohttp.ClientError as e:
        print(f"Ошибка: {e}")
        # Сессия не закрыта!
        return

Правильный подход — всегда используйте async with или явное закрытие в finally:

import asyncio
import logging
import random

import aiohttp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


async def connect_with_retry(
    url: str, max_retries: int = 5, base_delay: float = 2.0
) -> tuple[aiohttp.ClientWebSocketResponse, aiohttp.ClientSession]:
    """Устанавливает WebSocket-соединение с retry и корректным управлением ресурсами

    Args:
        url: URL WebSocket сервера
        max_retries: Максимальное число попыток
        base_delay: Базовая задержка между попытками

    Returns:
        Кортеж (WebSocket соединение, ClientSession)

    Raises:
        ConnectionError: После исчерпания всех попыток
    """
    attempt = 0

    while attempt < max_retries:
        session: aiohttp.ClientSession | None = None
        try:
            session = aiohttp.ClientSession()
            logger.info(f"Попытка подключения {attempt + 1}/{max_retries}")

            ws = await session.ws_connect(url)
            logger.info("Подключение установлено")
            return ws, session

        except aiohttp.ClientError as e:
            attempt += 1
            logger.warning(f"Ошибка подключения: {e}")

            # КРИТИЧЕСКИ ВАЖНО: закрываем сессию при ошибке
            if session:
                await session.close()

            if attempt >= max_retries:
                raise ConnectionError(f"Не удалось подключиться после {max_retries} попыток")

            # Exponential backoff с jitter
            delay = base_delay * (2 ** (attempt - 1))
            jitter = delay * (0.75 + random.random() * 0.5)
            logger.info(f"Повтор через {jitter:.2f}с")
            await asyncio.sleep(jitter)

    raise ConnectionError("Не удалось подключиться")


async def handle_messages(ws: aiohttp.ClientWebSocketResponse, duration: int) -> bool:
    """Обрабатывает сообщения WebSocket

    Args:
        ws: WebSocket соединение
        duration: Максимальное время работы в секундах

    Returns:
        True если завершение успешное, False если требуется переподключение
    """
    try:
        await ws.send_str("Начинаем обмен сообщениями")

        start_time = asyncio.get_event_loop().time()
        message_count = 0

        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                message_count += 1
                logger.info(f"Получено сообщение #{message_count}: {msg.data}")

                if message_count < 3:
                    await ws.send_str(f"Сообщение {message_count + 1}")
                else:
                    logger.info("Обмен сообщениями завершён")
                    await ws.close()
                    return True

            elif msg.type == aiohttp.WSMsgType.ERROR:
                logger.error(f"Ошибка WebSocket: {ws.exception()}")
                return False

            elif msg.type == aiohttp.WSMsgType.CLOSED:
                logger.info("Соединение закрыто сервером")
                return False

            # Проверяем таймаут
            if asyncio.get_event_loop().time() - start_time > duration:
                logger.info(f"Завершение работы по таймауту ({duration} секунд)")
                await ws.close()
                return True

        return False

    except asyncio.CancelledError:
        logger.info("Обработка сообщений отменена")
        raise
    except Exception as e:
        logger.error(f"Ошибка при обработке сообщений: {e}")
        return False


async def listen_with_reconnect(url: str, max_attempts: int = 3, duration: int = 5) -> None:
    """Слушает WebSocket с автоматическим переподключением

    Args:
        url: URL WebSocket сервера
        max_attempts: Максимальное число попыток переподключения
        duration: Время работы в секундах перед завершением
    """
    attempts = 0
    completed = False

    while attempts < max_attempts and not completed:
        session: aiohttp.ClientSession | None = None
        ws: aiohttp.ClientWebSocketResponse | None = None

        try:
            ws, session = await connect_with_retry(url)
            attempts = 0  # Сбрасываем счётчик при успешном подключении

            completed = await handle_messages(ws, duration)

        except ConnectionError as e:
            logger.error(f"Не удалось подключиться: {e}")
            attempts += 1

        except Exception as e:
            attempts += 1
            logger.error(
                f"Сбой соединения: {e}. Попытка переподключения {attempts}/{max_attempts}"
            )

        finally:
            # КРИТИЧЕСКИ ВАЖНО: всегда закрываем ресурсы
            if ws and not ws.closed:
                await ws.close()
            if session and not session.closed:
                await session.close()
                logger.info("Сессия закрыта")

            if attempts < max_attempts and attempts > 0 and not completed:
                logger.info("Ожидание перед переподключением...")
                await asyncio.sleep(3)

    if attempts >= max_attempts:
        logger.error("Достигнут лимит попыток переподключения")
    elif completed:
        logger.info("Работа с WebSocket завершена успешно")


asyncio.run(listen_with_reconnect("wss://ws.postman-echo.com/raw", max_attempts=2, duration=5))

Ключевые изменения для корректного управления ресурсами:

  1. Создание сессии внутри try — позволяет перехватить ошибки
  2. Явное закрытие в except — гарантирует освобождение ресурсов при ошибке подключения
  3. Блок finally — закрывает ws и session независимо от исхода
  4. Проверка состояния — закрываем только если не закрыто (not ws.closed)
  5. Логирование — отслеживаем все попытки подключения и закрытия

Поддержание соединения активным (ping-pong)

WebSocket полезен не только для получения данных, но и для двусторонней коммуникации, например, при обмене уведомлениями, синхронизации состояния или потоковой передаче данных. Поэтому важно уметь поддерживать соединение активным. Для этого применяют ping-pong механизм, позволяющий серверу и клиенту убедиться, что друг друга всё ещё можно достичь. В aiohttp это делается вызовом метода ping(). Иногда сервер автоматически закрывает неактивные соединения, поэтому клиенту стоит периодически отправлять пинг:

import asyncio
import logging

import aiohttp

logger = logging.getLogger(__name__)


async def keep_alive(ws: aiohttp.ClientWebSocketResponse, interval: int = 20) -> None:
    """Поддерживает соединение активным через периодические ping

    Args:
        ws: WebSocket соединение
        interval: Интервал между ping в секундах
    """
    try:
        while not ws.closed:
            try:
                await ws.ping()
                logger.debug("Отправлен ping")
                await asyncio.sleep(interval)
            except Exception as e:
                logger.error(f"Ошибка при отправке ping: {e}")
                break
    except asyncio.CancelledError:
        logger.info("Keep-alive задача отменена")
        raise


async def websocket_with_keepalive(url: str, duration: int = 30) -> None:
    """Пример WebSocket клиента с keep-alive механизмом"""
    session: aiohttp.ClientSession | None = None
    ws: aiohttp.ClientWebSocketResponse | None = None
    keepalive_task: asyncio.Task | None = None

    try:
        session = aiohttp.ClientSession()
        ws = await session.ws_connect(url)

        # Запускаем задачу keep-alive параллельно
        keepalive_task = asyncio.create_task(keep_alive(ws, interval=15))

        # Основная логика работы с сообщениями
        start_time = asyncio.get_event_loop().time()

        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                logger.info(f"Получено: {msg.data}")

            if asyncio.get_event_loop().time() - start_time > duration:
                logger.info("Время работы истекло")
                break

    except Exception as e:
        logger.error(f"Ошибка: {e}")

    finally:
        # Отменяем keep-alive задачу
        if keepalive_task and not keepalive_task.done():
            keepalive_task.cancel()
            try:
                await keepalive_task
            except asyncio.CancelledError:
                pass

        # Закрываем ресурсы
        if ws and not ws.closed:
            await ws.close()
        if session and not session.closed:
            await session.close()

Эта корутина выполняется параллельно с основной задачей, чтобы поддерживать активность соединения. Если ping() не получает ответа, соединение обычно закрывается, и цикл переподключения снова активируется.

Ограничение количества WebSocket-соединений

Если приложение активно использует множество WebSocket-соединений, стоит ограничить их количество с помощью семафора (asyncio.Semaphore). Это помогает избежать перегрузки клиента и предотвращает превышение лимитов сервера:

import asyncio

import aiohttp


async def limited_websockets() -> None:
    """Демонстрирует ограничение количества одновременных WebSocket-соединений"""
    semaphore: asyncio.Semaphore = asyncio.Semaphore(5)

    async def connect_with_limit(url: str, client_id: int) -> None:
        session: aiohttp.ClientSession | None = None
        ws: aiohttp.ClientWebSocketResponse | None = None

        async with semaphore:
            try:
                session = aiohttp.ClientSession()
                ws = await session.ws_connect(url)
                await ws.send_str(f"Hello from client {client_id}")
                msg: aiohttp.WSMessage = await ws.receive()
                print(f"Client {client_id} получил: {msg.data}")
            except aiohttp.ClientError as e:
                print(f"Client {client_id} ошибка: {e}")
            finally:
                if ws and not ws.closed:
                    await ws.close()
                if session and not session.closed:
                    await session.close()

    # Пытаемся подключиться к 10 клиентам, но одновременно только 5
    urls: list[str] = ["wss://ws.postman-echo.com/raw"] * 10
    tasks = [connect_with_limit(url, i) for i, url in enumerate(urls)]
    await asyncio.gather(*tasks, return_exceptions=True)


asyncio.run(limited_websockets())

Такой подход гарантирует, что одновременно будет активно не более 5 соединений, что предотвращает перегрузку и помогает контролировать использование ресурсов.

Контрольный список для работы с сетью

Используйте единый клиент/пул на процесс — создавайте один AsyncClient или ClientSession для всего приложения, переиспользуя пул соединений

Задавайте отдельные таймауты для разных стадий — используйте раздельные connect, read, write и pool таймауты для точного контроля

Всегда используйте async with для сессий — это гарантирует корректное закрытие соединений даже при ошибках

Закрывайте сессии явно в except или finally — при ошибках подключения сессия может остаться незакрытой

Реализуйте exponential backoff с jitter для retry — избегайте retry storm, добавляя случайную задержку

Логируйте все попытки подключения — это критически важно для диагностики проблем в продакшене

Настройте размер пула соединений — выбирайте баланс между производительностью и нагрузкой на сервер

Для WebSocket всегда проверяйте ws.closed перед закрытием — избегайте попыток закрыть уже закрытое соединение

Используйте семафоры для ограничения параллелизма — контролируйте количество одновременных соединений

Реализуйте keep-alive для длительных WebSocket-соединений — предотвращайте автоматическое закрытие неактивных соединений

Пример правильной конфигурации HTTP-клиента:

import asyncio
import logging

import httpx

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class APIClient:
    """Правильно настроенный HTTP-клиент для production"""

    def __init__(self, base_url: str, max_connections: int = 100) -> None:
        self.base_url = base_url
        self.timeout = httpx.Timeout(connect=10.0, read=30.0, write=10.0, pool=5.0)
        self.limits = httpx.Limits(
            max_connections=max_connections, max_keepalive_connections=20
        )
        self.client = httpx.AsyncClient(
            base_url=base_url, timeout=self.timeout, limits=self.limits, follow_redirects=True
        )

    async def __aenter__(self) -> "APIClient":
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()

    async def close(self) -> None:
        """Закрывает клиент и освобождает ресурсы"""
        await self.client.aclose()

    async def get_with_retry(self, path: str, max_retries: int = 3) -> httpx.Response:
        """Выполняет GET-запрос с retry логикой"""
        import random

        attempt = 0
        while attempt < max_retries:
            try:
                logger.info(f"GET {path} (попытка {attempt + 1}/{max_retries})")
                response = await self.client.get(path)
                response.raise_for_status()
                return response
            except (httpx.RequestError, httpx.HTTPStatusError) as e:
                attempt += 1
                if attempt >= max_retries:
                    logger.error(f"Все попытки исчерпаны для {path}")
                    raise
                delay = 1.0 * 2 ** (attempt - 1)
                jitter = delay * (0.75 + random.random() * 0.5)
                logger.warning(f"Ошибка {type(e).__name__}: {e}. Повтор через {jitter:.2f}с")
                await asyncio.sleep(jitter)
        raise httpx.RequestError("Не удалось выполнить запрос")


async def main() -> None:
    """Пример использования правильно настроенного клиента"""
    async with APIClient("https://httpbin.org") as client:
        tasks = [
            client.get_with_retry("/get"),
            client.get_with_retry("/delay/1"),
            client.get_with_retry("/delay/2"),
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.error(f"Запрос {i} завершился с ошибкой: {result}")
            else:
                logger.info(f"Запрос {i} успешен: {result.status_code}")


if __name__ == "__main__":
    asyncio.run(main())

Ключевые особенности правильной конфигурации:

  1. Единый клиент на всё приложение — переиспользуется пул соединений
  2. Раздельные таймауты — точный контроль над каждой стадией
  3. Настроенный пул — баланс между производительностью и ресурсами
  4. Контекстный менеджер — гарантирует закрытие при любом исходе
  5. Встроенный retry — exponential backoff с jitter и логированием
  6. Обработка ошибок — использование return_exceptions=True для массовых запросов

Такая конфигурация обеспечивает надёжную работу в production-окружении с тысячами запросов в секунду.

Рекомендуемые программы

Завершено

0 / 10