Асинхронность в Python

Теория: Тестирование асинхронного кода

Тестирование асинхронного кода требует инструментов, которые управляют циклом событий и корректно запускают корутины. Обычные тестовые фреймворки не умеют работать с async/await — попытка вызвать асинхронную функцию напрямую приведёт к ошибке. В Python используются два основных подхода: pytest-asyncio и встроенный IsolatedAsyncioTestCase.

pytest-asyncio и IsolatedAsyncioTestCase

pytest-asyncio

Плагин pytest-asyncio интегрируется с pytest, позволяя писать тесты в формате async def. Он автоматически создаёт event loop перед запуском каждого теста и очищает его после завершения.

Тесты пишутся как обычные корутины с маркировкой @pytest.mark.asyncio:

import asyncio

import pytest


async def add(a: int, b: int) -> int:
    await asyncio.sleep(0.1)
    return a + b


@pytest.mark.asyncio
async def test_add() -> None:
    result = await add(2, 3)
    assert result == 5

Плагин поддерживает асинхронные фикстуры для подготовки окружения:

import pytest


@pytest.fixture
async def data() -> dict[str, int]:
    await asyncio.sleep(0.1)
    return {"x": 10}


@pytest.mark.asyncio
async def test_data(data: dict[str, int]) -> None:
    assert data["x"] == 10

Фикстуры с yield поддерживают асинхронный teardown — код после yield выполняется как финализатор:

@pytest.fixture
async def connection():
    conn = await setup_connection()
    yield conn
    await conn.close()

IsolatedAsyncioTestCase

Встроенный класс IsolatedAsyncioTestCase из модуля unittest предоставляет аналогичную функциональность без сторонних зависимостей:

import asyncio
import unittest


async def multiply(a: int, b: int) -> int:
    await asyncio.sleep(0.1)
    return a * b


class TestMath(unittest.IsolatedAsyncioTestCase):
    async def test_multiply(self) -> None:
        result = await multiply(3, 4)
        self.assertEqual(result, 12)

Класс поддерживает методы asyncSetUp() и asyncTearDown() для подготовки и очистки ресурсов:

class TestService(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self) -> None:
        self.buffer: list[int] = []
        await asyncio.sleep(0.05)

    async def asyncTearDown(self) -> None:
        self.buffer.clear()

    async def test_buffer(self) -> None:
        self.buffer.append(1)
        await asyncio.sleep(0.05)
        self.assertEqual(self.buffer, [1])

Главное преимущество IsolatedAsyncioTestCase — каждому тесту создаётся новый цикл событий, который гарантированно закрывается после выполнения. Это предотвращает ситуации, когда фоновые корутины продолжают работу после завершения теста. Такая изоляция особенно важна при работе с объектами, привязанными к event loop: сессиями aiohttp, WebSocket-соединениями или пулами соединений к базе данных.

Моки и патчи для async-функций

При тестировании асинхронного кода часто требуется подменить внешние зависимости — API, базы данных или сетевые сервисы. Обычный Mock не работает с await, поэтому используется AsyncMock:

from unittest.mock import AsyncMock, patch

import pytest


async def fetch_user(user_id: int) -> dict[str, str]:
    raise NotImplementedError


async def get_user_name(user_id: int) -> str:
    data = await fetch_user(user_id)
    return data["name"]


@pytest.mark.asyncio
async def test_get_user_name() -> None:
    mock_data = {"name": "Alice"}
    with patch(__name__ + ".fetch_user", new=AsyncMock(return_value=mock_data)):
        name = await get_user_name(1)
        assert name == "Alice"

Здесь patch() перехватывает вызов fetch_user() и подменяет его асинхронным мок-объектом. Тест выполняется мгновенно, без реальных сетевых запросов. Можно проверять, как именно вызывался мок:

@pytest.mark.asyncio
async def test_fetch_user_called() -> None:
    mock = AsyncMock(return_value={"name": "Bob"})
    with patch(__name__ + ".fetch_user", new=mock):
        await get_user_name(10)

    mock.assert_awaited()
    mock.assert_awaited_once_with(10)

Метод assert_awaited() проверяет, что мок был вызван через await, а assert_awaited_once_with() — что он вызван один раз с указанными аргументами.

При тестировании кода с асинхронными контекстными менеджерами нужно подменять методы __aenter__() и __aexit__():

from unittest.mock import AsyncMock, MagicMock, patch

import pytest


class FakeResponse:
    def __init__(self, data: dict) -> None:
        self.data = data

    async def json(self) -> dict:
        return self.data


async def fetch() -> dict:
    import aiohttp

    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as resp:
            return await resp.json()


@pytest.mark.asyncio
async def test_fetch() -> None:
    fake_resp = FakeResponse({"ok": True})
    mock_session = MagicMock()
    mock_session.get = AsyncMock()
    mock_session.get.return_value.__aenter__.return_value = fake_resp
    with patch("aiohttp.ClientSession", return_value=mock_session):
        result = await fetch()
        assert result["ok"] is True

Такой подход позволяет тестировать HTTP-клиенты без реальных сетевых запросов. С IsolatedAsyncioTestCase моки работают аналогично:

import unittest
from unittest.mock import AsyncMock, patch


async def external_op() -> int:
    raise NotImplementedError


async def calc() -> int:
    return await external_op()


class TestCalc(unittest.IsolatedAsyncioTestCase):
    async def test_calc(self) -> None:
        with patch(__name__ + ".external_op", new=AsyncMock(return_value=10)):
            result = await calc()
            self.assertEqual(result, 10)

Для тестирования обработки ошибок используется side_effect:

@pytest.mark.asyncio
async def test_timeout_error() -> None:
    mock = AsyncMock(side_effect=TimeoutError())

    async def get_data() -> str:
        return await fetch_data()

    with patch(__name__ + ".fetch_data", new=mock):
        with pytest.raises(TimeoutError):
            await get_data()

При мокировании классов, которые создаёт тестируемая функция, нужно подменять сам класс:

@pytest.mark.asyncio
async def test_client_factory() -> None:
    async def run_query() -> list[str]:
        from module import Client

        client = Client()
        return await client.query()

    with patch("module.Client") as mock_cls:
        instance = mock_cls.return_value
        instance.query = AsyncMock(return_value=["item"])
        result = await run_query()
        assert result == ["item"]

Если тестируемая логика зависит от времени выполнения, можно добавить задержку через side_effect:

async def delayed() -> int:
    await asyncio.sleep(0.01)
    return 42


@pytest.mark.asyncio
async def test_with_delay() -> None:
    mock = AsyncMock(side_effect=delayed)
    # тест с учётом времени выполнения

Контроль и поиск «висячих» задач

Висячие задачи — это корутины, которые никогда не завершаются из-за забытого await, неправильной синхронизации или блокирующих вызовов. Они заполняют event loop, потребляют память и приводят к зависанию тестов или всего приложения.

Для обнаружения висячих задач используется asyncio.all_tasks(), который возвращает список всех активных задач:

import asyncio

import pytest


async def background() -> None:
    await asyncio.sleep(10)


async def run() -> None:
    asyncio.create_task(background())


@pytest.mark.asyncio
async def test_no_leaks() -> None:
    before = asyncio.all_tasks()
    await run()
    after = asyncio.all_tasks()

    leaks = after - before
    assert not leaks, f"Обнаружены висячие задачи: {leaks}"

Важно учитывать, что некоторые задачи создаются самим pytest-asyncio, поэтому сравнивать нужно только разницу до и после вызова тестируемого кода.

Если корутина должна завершиться за короткое время, используйте asyncio.wait_for():

@pytest.mark.asyncio
async def test_with_timeout() -> None:
    async def some_async_op() -> str:
        await asyncio.sleep(1)
        return "done"

    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(some_async_op(), timeout=0.1)

Таймаут помогает быстро обнаружить зависание вместо бесконечного ожидания. Если подозревается блокирующая операция, можно проверить, продолжает ли работать цикл событий:

@pytest.mark.asyncio
async def test_loop_responsive() -> None:
    async def some_async_op() -> None:
        await asyncio.sleep(0.5)

    task = asyncio.create_task(some_async_op())
    await asyncio.sleep(0)  # передаём управление
    assert not task.done()

Если await asyncio.sleep(0) вызывает зависание, значит event loop заблокирован.

Задачи, созданные через create_task(), нужно либо awaited, либо отменять вручную:

@pytest.mark.asyncio
async def test_background_task() -> None:
    async def worker() -> None:
        await asyncio.sleep(10)

    task = asyncio.create_task(worker())

    # выполняем проверки
    await asyncio.sleep(0.1)

    # явно отменяем задачу
    task.cancel()
    with pytest.raises(asyncio.CancelledError):
        await task

Без явной отмены задача продолжит выполнение после завершения теста, что приведёт к ошибкам в следующих тестах.

При использовании asyncio.Event, Lock или других примитивов можно проверить корректность их работы:

@pytest.mark.asyncio
async def test_event_waiting() -> None:
    event = asyncio.Event()

    async def waiter() -> None:
        await event.wait()

    task = asyncio.create_task(waiter())
    await asyncio.sleep(0)

    assert not task.done(), "Задача не должна завершиться до установки события"

    event.set()
    await task

В конце сложных тестов можно принудительно отменить все оставшиеся задачи:

@pytest.mark.asyncio
async def test_cleanup() -> None:
    # тестовая логика

    # очистка всех задач
    for t in asyncio.all_tasks():
        if t is not asyncio.current_task():
            t.cancel()

Такой подход защищает тестовый набор от «утечек» задач между тестами.

Рекомендуемые программы

Завершено

0 / 10