Асинхронность в Python
Теория: Асинхронные базы данных
Драйвер asyncpg
asyncpg
Работа с базой данных в асинхронных приложениях требует особого подхода. Если использовать обычный синхронный драйвер, каждый запрос к базе будет блокировать event loop, из-за чего приложение потеряет преимущество асинхронности. Решение для PostgreSQL — это библиотека asyncpg. Она реализована на чистом C и Python, полностью асинхронна и использует возможности asyncio, обеспечивая высокую скорость и низкие задержки при большом числе запросов.
Чтобы начать работу, нужно установить библиотеку:
После этого можно подключаться к базе данных. Подключение создаётся вызовом asyncpg.connect(), и возвращает объект соединения, через который выполняются запросы.
Пример простейшего подключения и выполнения SQL-запроса:
Асинхронное подключение нужно закрывать вызовом await conn.close(). Если этого не сделать, соединение останется висеть до завершения программы. При большом числе открытий это может привести к переполнению лимитов PostgreSQL.
Метод fetch() возвращает список записей в виде объектов Record. Каждый элемент можно воспринимать как словарь: row['column_name']. Если запрос должен вернуть одну строку, используется fetchrow(), а если одно значение — fetchval():
Здесь $1 — параметр, передаваемый в запрос. В отличие от подстановки строк через форматирование, такой способ защищает от SQL-инъекций. asyncpg автоматически выполняет подстановку параметров, не допуская вредоносного кода.
Важно: всегда используйте параметризованные запросы вместо форматирования строк. Это критически важно для безопасности.
Если нужно выполнить запрос без возврата данных, например INSERT, UPDATE или DELETE, применяется метод execute(). Он возвращает статусную строку PostgreSQL:
При работе с большими объёмами данных удобно использовать курсоры, чтобы не загружать все результаты запроса в память сразу. Курсор позволяет получать данные частями, по мере их поступления от сервера, что особенно полезно при выборках с тысячами строк. В asyncpg курсор создаётся методом cursor() и используется внутри транзакции.
Метод fetch(n) позволяет получать данные порциями — приложение обрабатывает результаты сразу после их поступления, не дожидаясь завершения всей выборки. Такой подход снижает нагрузку на память и ускоряет обработку больших наборов данных.
Библиотека выбрасывает исключения, унаследованные от asyncpg.PostgresError. Это позволяет точно обрабатывать различные ошибки базы данных:
asyncpg автоматически преобразует типы PostgreSQL в Python: INTEGER становится int, TEXT — str, TIMESTAMP — datetime, JSONB — dict. Это делает работу с базой естественной и удобной.
Таймауты для запросов
В production-окружении необходимо ограничивать время выполнения запросов, чтобы предотвратить зависание приложения при проблемах с базой данных. В asyncpg таймауты задаются через параметр timeout или с помощью asyncio.timeout():
Таймауты особенно важны для:
- Защиты от медленных запросов
- Предотвращения блокировки event loop
- Быстрого обнаружения проблем с базой данных
Пулы соединений
Открывать соединение для каждого запроса дорого: требуется TCP-сессия, авторизация, настройка параметров. Пул соединений хранит готовые соединения и выдаёт их задачам по мере необходимости.
В asyncpg пул создаётся вызовом asyncpg.create_pool(). Параметры такие же, как у обычного подключения: имя пользователя, пароль, база, хост, порт:
Параметры min_size и max_size определяют количество соединений. Если все соединения заняты, задачи ждут освобождения. Асинхронный контекстный менеджер async with pool.acquire() гарантирует, что соединение будет возвращено обратно в пул, даже если в коде произошла ошибка. Без этого можно случайно «потерять» соединение, и пул постепенно исчерпает лимит. Параметр command_timeout задаёт глобальный таймаут для всех команд, выполняемых через этот пул.
Пул делит соединения между задачами. Например, при пяти соединениях одновременно выполняются пять запросов, остальные ждут очереди.
Критически важно: пулы должны быть закрыты при завершении сервиса. Используйте await pool.close() или контекстный менеджер для гарантированной очистки ресурсов.
Можно выполнять операции напрямую через пул, без явного получения соединения:
Это сокращает код и делает его безопаснее.
Пулы в asyncpg также позволяют выполнять инициализацию соединения. Например, можно указать функцию, которая будет вызываться при создании нового соединения — для настройки параметров сессии, включения расширений или логирования:
Это особенно полезно, если приложение работает в распределённой среде и нужно, чтобы каждое соединение имело одинаковые настройки. Пул создаёт минимальное количество соединений (min_size), остальные открываются по мере необходимости.
Рассмотрим пример, где несколько задач одновременно используют пул:
Здесь пять задач обращаются к базе параллельно. Если max_size=2, одновременно будут выполняться только две из них, а остальные дождутся освобождения соединений. Это простой, но эффективный способ регулировать нагрузку на базу.
Пулы особенно важны при построении веб-сервисов, где запросы поступают непредсказуемо. Даже если запросов немного, каждый из них может вызвать несколько операций с базой. Пул помогает избежать хаоса: все операции проходят через контролируемое количество соединений.
Пул также поддерживает проверку состояния соединений. При создании можно передать параметр max_inactive_connection_lifetime, который определяет, как долго неиспользуемое соединение может оставаться открытым. После этого срока оно будет закрыто и заменено новым. Это защищает от обрыва старых TCP-сессий и помогает поддерживать стабильность соединений:
В веб-приложениях пул создаётся при запуске и передаётся в обработчики запросов, обеспечивая эффективное использование ресурсов базы данных на протяжении всего времени работы приложения.
Транзакции и батчи
Транзакции
Транзакции обеспечивают согласованность данных: операции выполняются либо все, либо ни одной. В asyncpg транзакция начинается вызовом conn.transaction(), и её удобно использовать через контекстный менеджер async with. Это гарантирует, что при выходе из блока транзакция будет либо зафиксирована (commit), если ошибок не было, либо отменена (rollback), если возникло исключение:
В этом примере две вставки выполняются как единое целое. Если первая выполнится, а вторая вызовет ошибку, первая автоматически отменится. Такой подход защищает данные от неполных изменений и делает логику приложения предсказуемой.
Транзакции работают с пулом соединений:
Батчи
Иногда нужно выполнять однотипные операции с большим количеством данных. Выполнять их по одному запросу долго и неэффективно. Здесь на помощь приходят батчи. В asyncpg можно использовать метод executemany(), который принимает SQL и список параметров, выполняя запрос для каждой записи. Это быстрее, чем запускать отдельный запрос для каждой строки, и уменьшает нагрузку на сеть и базу:
Батчи хорошо комбинировать с транзакциями, обернув всю операцию в conn.transaction():
Вложенные транзакции
В сложных сценариях можно использовать вложенные транзакции через savepoint. В asyncpg это делается с помощью conn.transaction() внутри другой транзакции. Если часть операций внутри вложенной транзакции неудачна, можно откатить только её, не влияя на внешнюю транзакцию. Это удобно для обработки ошибок на уровне отдельных батчей или блоков операций:
Вложенные транзакции позволяют создавать гибкую логику обработки ошибок, где критичные операции защищены внешней транзакцией, а менее важные могут быть отменены без влияния на основной процесс.
Транзакции с таймаутами
Транзакции обеспечивают согласованность данных: операции выполняются либо все, либо ни одной. В production необходимо ограничивать время выполнения транзакций:
Важно: при превышении таймаута транзакция автоматически откатывается (rollback), все изменения отменяются.
Уровни изоляции транзакций
Уровень изоляции определяет, какие изменения, сделанные другими транзакциями, видны текущей транзакции. PostgreSQL поддерживает четыре уровня:
- READ UNCOMMITTED — на практике работает как READ COMMITTED в PostgreSQL
- READ COMMITTED (по умолчанию) — видны только зафиксированные изменения
- REPEATABLE READ — все чтения внутри транзакции видят одинаковый снимок данных
- SERIALIZABLE — самый строгий уровень, гарантирует полную сериализуемость
В asyncpg уровень изоляции задаётся при создании транзакции:
Когда использовать каждый уровень:
- READ COMMITTED — для большинства случаев, минимальные блокировки
- REPEATABLE READ — когда нужна консистентность чтений внутри транзакции
- SERIALIZABLE — для критичных операций, требующих полной изоляции
Реакция на отмену задач
При отмене асинхронной задачи (CancelledError) важно корректно завершить транзакцию и освободить ресурсы:
Ключевой момент: контекстный менеджер async with conn.transaction() автоматически выполняет rollback при CancelledError, гарантируя целостность данных.
Лёгкая альтернатива: aiosqlite
Для локальных упражнений и разработки удобнее использовать aiosqlite — асинхронную обёртку над SQLite. Это позволяет работать с базой данных без необходимости установки и настройки PostgreSQL:
Пример использования:
Преимущества aiosqlite:
- Не требует установки сервера БД
- Идеально для обучения и тестирования
- Простая миграция на PostgreSQL при необходимости
- Поддерживает транзакции и параметризованные запросы
Пример с транзакциями:
Контрольный список для работы с асинхронными БД
Всегда используйте параметризованные запросы — никогда не используйте форматирование строк для SQL-запросов
Пулы должны закрываться при завершении сервиса — используйте await pool.close() или контекстные менеджеры
Задавайте таймауты для всех операций — используйте command_timeout для пула и asyncio.timeout() для транзакций
Ограничивайте время выполнения транзакций — длительные транзакции блокируют другие операции
Выбирайте правильный уровень изоляции — используйте минимально необходимый уровень для производительности
Обрабатывайте CancelledError корректно — транзакции должны откатываться при отмене задачи
Настройте размер пула соединений — баланс между производительностью и нагрузкой на БД
Используйте батчи для массовых операций — executemany() значительно быстрее множественных execute()
Для разработки используйте aiosqlite — не требует установки PostgreSQL
Логируйте все ошибки БД — это критично для диагностики проблем в production
Пример production-ready конфигурации:
Эта конфигурация обеспечивает:
- Автоматические таймауты для всех операций
- Корректную обработку отмены задач
- Логирование всех критических событий
- Контролируемое управление пулом соединений
- Гибкую настройку уровней изоляции
Рекомендуемые программы
Завершено
0 / 10

