Асинхронность в Python

Теория: Асинхронные базы данных

Драйвер asyncpg

asyncpg

Работа с базой данных в асинхронных приложениях требует особого подхода. Если использовать обычный синхронный драйвер, каждый запрос к базе будет блокировать event loop, из-за чего приложение потеряет преимущество асинхронности. Решение для PostgreSQL — это библиотека asyncpg. Она реализована на чистом C и Python, полностью асинхронна и использует возможности asyncio, обеспечивая высокую скорость и низкие задержки при большом числе запросов.

Чтобы начать работу, нужно установить библиотеку:

pip install asyncpg

После этого можно подключаться к базе данных. Подключение создаётся вызовом asyncpg.connect(), и возвращает объект соединения, через который выполняются запросы.

Пример простейшего подключения и выполнения SQL-запроса:

import asyncio

import asyncpg


async def main() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres",
        password="admin",
        database="postgres",
        host="127.0.0.1",
        port=5432,
    )
    result: list[asyncpg.Record] = await conn.fetch("SELECT 1 AS number")
    print(result)
    # => [<Record number=1>]
    await conn.close()


asyncio.run(main())

Асинхронное подключение нужно закрывать вызовом await conn.close(). Если этого не сделать, соединение останется висеть до завершения программы. При большом числе открытий это может привести к переполнению лимитов PostgreSQL.

Метод fetch() возвращает список записей в виде объектов Record. Каждый элемент можно воспринимать как словарь: row['column_name']. Если запрос должен вернуть одну строку, используется fetchrow(), а если одно значение — fetchval():

import asyncio

import asyncpg


async def query_examples() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    # Получаем одну строку
    row: asyncpg.Record | None = await conn.fetchrow("SELECT 1 AS id, $1 AS name", "Alice")
    if row:
        print(f"ID: {row['id']}, Name: {row['name']}")
        # => ID: 1, Name: Alice

    # Получаем одно значение
    value: int = await conn.fetchval("SELECT 42")
    print(f"Значение: {value}")
    # => Значение: 42

    await conn.close()


asyncio.run(query_examples())

Здесь $1параметр, передаваемый в запрос. В отличие от подстановки строк через форматирование, такой способ защищает от SQL-инъекций. asyncpg автоматически выполняет подстановку параметров, не допуская вредоносного кода.

Важно: всегда используйте параметризованные запросы вместо форматирования строк. Это критически важно для безопасности.

Если нужно выполнить запрос без возврата данных, например INSERT, UPDATE или DELETE, применяется метод execute(). Он возвращает статусную строку PostgreSQL:

import asyncio

import asyncpg


async def insert_example() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    # Создаём таблицу для примера
    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL,
            age INTEGER
        )
    """)

    # Вставляем данные
    status: str = await conn.execute(
        "INSERT INTO users (name, age) VALUES ($1, $2)", "Анна", 25
    )
    print(status)
    # => INSERT 0 1

    await conn.close()


asyncio.run(insert_example())

При работе с большими объёмами данных удобно использовать курсоры, чтобы не загружать все результаты запроса в память сразу. Курсор позволяет получать данные частями, по мере их поступления от сервера, что особенно полезно при выборках с тысячами строк. В asyncpg курсор создаётся методом cursor() и используется внутри транзакции.

import asyncio

import asyncpg


async def read_users(conn: asyncpg.Connection) -> None:
    async with conn.transaction():
        # создаем курсор с запросом
        cursor = await conn.cursor("SELECT id, name FROM users")

        # извлекаем данные партиями
        while True:
            # fetch(n) — забирает n строк за раз
            records = await cursor.fetch(2)
            if not records:
                break
            for record in records:
                print(f"Пользователь: {record['name']}")


async def cursor_example() -> None:
    conn = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)
    await conn.execute("INSERT INTO users (name) VALUES ('Bob'), ('Carol'), ('Dave')")

    await read_users(conn)
    await conn.close()


asyncio.run(cursor_example())

Метод fetch(n) позволяет получать данные порциями — приложение обрабатывает результаты сразу после их поступления, не дожидаясь завершения всей выборки. Такой подход снижает нагрузку на память и ускоряет обработку больших наборов данных.

Библиотека выбрасывает исключения, унаследованные от asyncpg.PostgresError. Это позволяет точно обрабатывать различные ошибки базы данных:

import asyncio

import asyncpg


async def error_handling() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)

    try:
        await conn.execute("INSERT INTO users (id, name) VALUES (1, 'Анна')")
        await conn.execute("INSERT INTO users (id, name) VALUES (1, 'Борис')")
    except asyncpg.UniqueViolationError:
        print("Пользователь с таким ID уже существует")

    await conn.close()


asyncio.run(error_handling())

asyncpg автоматически преобразует типы PostgreSQL в Python: INTEGER становится int, TEXT — str, TIMESTAMP — datetime, JSONB — dict. Это делает работу с базой естественной и удобной.

Таймауты для запросов

В production-окружении необходимо ограничивать время выполнения запросов, чтобы предотвратить зависание приложения при проблемах с базой данных. В asyncpg таймауты задаются через параметр timeout или с помощью asyncio.timeout():

import asyncio

import asyncpg


async def query_with_timeout() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    try:
        # Способ 1: встроенный таймаут asyncpg
        result = await conn.fetch("SELECT pg_sleep(2)", timeout=1.0)
    except asyncio.TimeoutError:
        print("Запрос превысил таймаут 1 секунду")

    try:
        # Способ 2: использование asyncio.timeout()
        async with asyncio.timeout(1.0):
            result = await conn.fetch("SELECT pg_sleep(2)")
    except asyncio.TimeoutError:
        print("Запрос превысил таймаут (asyncio.timeout)")

    await conn.close()


asyncio.run(query_with_timeout())

Таймауты особенно важны для:

  • Защиты от медленных запросов
  • Предотвращения блокировки event loop
  • Быстрого обнаружения проблем с базой данных

Пулы соединений

Открывать соединение для каждого запроса дорого: требуется TCP-сессия, авторизация, настройка параметров. Пул соединений хранит готовые соединения и выдаёт их задачам по мере необходимости.

В asyncpg пул создаётся вызовом asyncpg.create_pool(). Параметры такие же, как у обычного подключения: имя пользователя, пароль, база, хост, порт:

import asyncio

import asyncpg


async def main() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres",
        password="admin",
        database="postgres",
        host="127.0.0.1",
        port=5432,
        min_size=1,
        max_size=5,
        command_timeout=60.0,  # Таймаут для всех команд
    )

    async with pool.acquire() as conn:
        result: list[asyncpg.Record] = await conn.fetch("SELECT now()")
        print(result[0]["now"])

    await pool.close()


asyncio.run(main())

Параметры min_size и max_size определяют количество соединений. Если все соединения заняты, задачи ждут освобождения. Асинхронный контекстный менеджер async with pool.acquire() гарантирует, что соединение будет возвращено обратно в пул, даже если в коде произошла ошибка. Без этого можно случайно «потерять» соединение, и пул постепенно исчерпает лимит. Параметр command_timeout задаёт глобальный таймаут для всех команд, выполняемых через этот пул.

Пул делит соединения между задачами. Например, при пяти соединениях одновременно выполняются пять запросов, остальные ждут очереди.

Критически важно: пулы должны быть закрыты при завершении сервиса. Используйте await pool.close() или контекстный менеджер для гарантированной очистки ресурсов.

Можно выполнять операции напрямую через пул, без явного получения соединения:

import asyncio

import asyncpg


async def direct_pool_query() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    # Прямой запрос через пул
    result: list[asyncpg.Record] = await pool.fetch("SELECT $1 AS message", "Hello from pool")
    print(result[0]["message"])
    # => Hello from pool

    await pool.close()


asyncio.run(direct_pool_query())

Это сокращает код и делает его безопаснее.

Пулы в asyncpg также позволяют выполнять инициализацию соединения. Например, можно указать функцию, которая будет вызываться при создании нового соединения — для настройки параметров сессии, включения расширений или логирования:

import asyncio

import asyncpg


async def init_connection(conn: asyncpg.Connection) -> None:
    """Инициализация каждого соединения в пуле"""
    await conn.execute('SET TIME ZONE "UTC"')
    print("Соединение настроено с UTC timezone")


async def pool_with_init() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres",
        password="admin",
        database="postgres",
        host="127.0.0.1",
        init=init_connection,
    )

    async with pool.acquire() as conn:
        timezone: str = await conn.fetchval("SHOW timezone")
        print(f"Текущая timezone: {timezone}")
        # => Текущая timezone: UTC

    await pool.close()


asyncio.run(pool_with_init())

Это особенно полезно, если приложение работает в распределённой среде и нужно, чтобы каждое соединение имело одинаковые настройки. Пул создаёт минимальное количество соединений (min_size), остальные открываются по мере необходимости.

Рассмотрим пример, где несколько задач одновременно используют пул:

import asyncio

import asyncpg


async def get_user(pool: asyncpg.Pool, user_id: int) -> asyncpg.Record | None:
    """Получает пользователя по ID"""
    async with pool.acquire() as conn:
        return await conn.fetchrow("SELECT id, name FROM users WHERE id=$1", user_id)


async def parallel_queries() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres",
        password="admin",
        database="postgres",
        host="127.0.0.1",
        min_size=1,
        max_size=2,
    )

    # Создаём тестовые данные
    await pool.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)
    await pool.execute("TRUNCATE users RESTART IDENTITY")
    await pool.execute(
        "INSERT INTO users (name) VALUES ('User1'), ('User2'), ('User3'), ('User4'), ('User5')"
    )

    # Пять задач обращаются к базе параллельно
    tasks = [get_user(pool, i) for i in range(1, 6)]
    results: list[asyncpg.Record | None] = await asyncio.gather(*tasks)

    for user in results:
        if user:
            print(f"ID: {user['id']}, Name: {user['name']}")

    await pool.close()


asyncio.run(parallel_queries())

Здесь пять задач обращаются к базе параллельно. Если max_size=2, одновременно будут выполняться только две из них, а остальные дождутся освобождения соединений. Это простой, но эффективный способ регулировать нагрузку на базу.

Пулы особенно важны при построении веб-сервисов, где запросы поступают непредсказуемо. Даже если запросов немного, каждый из них может вызвать несколько операций с базой. Пул помогает избежать хаоса: все операции проходят через контролируемое количество соединений.

Пул также поддерживает проверку состояния соединений. При создании можно передать параметр max_inactive_connection_lifetime, который определяет, как долго неиспользуемое соединение может оставаться открытым. После этого срока оно будет закрыто и заменено новым. Это защищает от обрыва старых TCP-сессий и помогает поддерживать стабильность соединений:

import asyncio

import asyncpg


async def pool_with_lifetime() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres",
        password="admin",
        database="postgres",
        host="127.0.0.1",
        min_size=2,
        max_size=10,
        max_inactive_connection_lifetime=300.0,  # 5 минут
    )

    async with pool.acquire() as conn:
        result: str = await conn.fetchval("SELECT version()")
        print(f"PostgreSQL version: {result[:50]}...")

    await pool.close()


asyncio.run(pool_with_lifetime())

В веб-приложениях пул создаётся при запуске и передаётся в обработчики запросов, обеспечивая эффективное использование ресурсов базы данных на протяжении всего времени работы приложения.

Транзакции и батчи

Транзакции

Транзакции обеспечивают согласованность данных: операции выполняются либо все, либо ни одной. В asyncpg транзакция начинается вызовом conn.transaction(), и её удобно использовать через контекстный менеджер async with. Это гарантирует, что при выходе из блока транзакция будет либо зафиксирована (commit), если ошибок не было, либо отменена (rollback), если возникло исключение:

import asyncio

import asyncpg


async def transaction_example() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)

    try:
        async with conn.transaction():
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
            print("Транзакция успешно завершена")
    except asyncpg.PostgresError as e:
        print(f"Ошибка базы данных: {e}")
    except Exception as e:
        print(f"Неожиданная ошибка транзакции: {e}")
    finally:
        await conn.close()


asyncio.run(transaction_example())

В этом примере две вставки выполняются как единое целое. Если первая выполнится, а вторая вызовет ошибку, первая автоматически отменится. Такой подход защищает данные от неполных изменений и делает логику приложения предсказуемой.

Транзакции работают с пулом соединений:

import asyncio

import asyncpg


async def insert_users(pool: asyncpg.Pool) -> None:
    """Вставляет пользователей в рамках одной транзакции"""
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Charlie")
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Diana")
            print("Пользователи добавлены")


async def pool_transaction() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await pool.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)

    await insert_users(pool)
    await pool.close()


asyncio.run(pool_transaction())

Батчи

Иногда нужно выполнять однотипные операции с большим количеством данных. Выполнять их по одному запросу долго и неэффективно. Здесь на помощь приходят батчи. В asyncpg можно использовать метод executemany(), который принимает SQL и список параметров, выполняя запрос для каждой записи. Это быстрее, чем запускать отдельный запрос для каждой строки, и уменьшает нагрузку на сеть и базу:

import asyncio

import asyncpg


async def batch_insert(pool: asyncpg.Pool) -> None:
    """Пакетная вставка с таймаутом"""
    async with pool.acquire() as conn:
        users: list[tuple[str]] = [(f"User{i}",) for i in range(1000)]

        try:
            async with asyncio.timeout(10.0):
                async with conn.transaction():
                    await conn.executemany("INSERT INTO users(name) VALUES($1)", users)
                    print(f"Вставлено {len(users)} пользователей")
        except asyncio.TimeoutError:
            print("Батч-операция превысила таймаут")


async def batch_example() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await pool.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)
    await pool.execute("TRUNCATE users RESTART IDENTITY")

    await batch_insert(pool)

    count: int = await pool.fetchval("SELECT COUNT(*) FROM users")
    print(f"Всего пользователей: {count}")

    await pool.close()


asyncio.run(batch_example())

Батчи хорошо комбинировать с транзакциями, обернув всю операцию в conn.transaction():

import asyncio

import asyncpg


async def batch_with_transaction() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await pool.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL,
            price DECIMAL(10, 2)
        )
    """)

    async with pool.acquire() as conn:
        async with conn.transaction():
            products: list[tuple[str, float]] = [
                ("Laptop", 999.99),
                ("Mouse", 29.99),
                ("Keyboard", 79.99),
                ("Monitor", 299.99),
            ]
            await conn.executemany(
                "INSERT INTO products(name, price) VALUES($1, $2)", products
            )
            print(f"Добавлено {len(products)} товаров в рамках транзакции")

    await pool.close()


asyncio.run(batch_with_transaction())

Вложенные транзакции

В сложных сценариях можно использовать вложенные транзакции через savepoint. В asyncpg это делается с помощью conn.transaction() внутри другой транзакции. Если часть операций внутри вложенной транзакции неудачна, можно откатить только её, не влияя на внешнюю транзакцию. Это удобно для обработки ошибок на уровне отдельных батчей или блоков операций:

import asyncio

import asyncpg


async def nested_example(conn: asyncpg.Connection) -> None:
    """Демонстрирует вложенные транзакции с savepoint"""
    async with conn.transaction():
        await conn.execute("INSERT INTO users(name) VALUES($1)", "Henry")
        print("Вставлен Henry (внешняя транзакция)")

        try:
            async with conn.transaction():
                await conn.execute("INSERT INTO users(name) VALUES($1)", "Ivy")
                print("Вставлен Ivy (внутренняя транзакция)")
                # Симулируем ошибку
                raise Exception("Имитация ошибки")
        except Exception as e:
            print(f"Ошибка во внутренней транзакции: {e}")
            print("Ivy откатится, но Henry останется")

        await conn.execute("INSERT INTO users(name) VALUES($1)", "Jack")
        print("Вставлен Jack (внешняя транзакция)")


async def nested_transaction_example() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)
    await conn.execute("TRUNCATE users RESTART IDENTITY")

    await nested_example(conn)

    # Проверяем результат
    users: list[asyncpg.Record] = await conn.fetch("SELECT name FROM users ORDER BY id")
    print("Пользователи в базе:", [u["name"] for u in users])
    # => Пользователи в базе: ['Henry', 'Jack']

    await conn.close()


asyncio.run(nested_transaction_example())

Вложенные транзакции позволяют создавать гибкую логику обработки ошибок, где критичные операции защищены внешней транзакцией, а менее важные могут быть отменены без влияния на основной процесс.

Транзакции с таймаутами

Транзакции обеспечивают согласованность данных: операции выполняются либо все, либо ни одной. В production необходимо ограничивать время выполнения транзакций:

import asyncio

import asyncpg


async def transaction_with_timeout() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)

    try:
        # Ограничиваем время выполнения транзакции
        async with asyncio.timeout(5.0):
            async with conn.transaction():
                await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
                await asyncio.sleep(2)  # Имитация долгой операции
                await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
                print("Транзакция успешно завершена")
    except asyncio.TimeoutError:
        print("Транзакция превысила таймаут и была отменена")
    except asyncpg.PostgresError as e:
        print(f"Ошибка базы данных: {e}")
    finally:
        await conn.close()


asyncio.run(transaction_with_timeout())

Важно: при превышении таймаута транзакция автоматически откатывается (rollback), все изменения отменяются.

Уровни изоляции транзакций

Уровень изоляции определяет, какие изменения, сделанные другими транзакциями, видны текущей транзакции. PostgreSQL поддерживает четыре уровня:

  1. READ UNCOMMITTED — на практике работает как READ COMMITTED в PostgreSQL
  2. READ COMMITTED (по умолчанию) — видны только зафиксированные изменения
  3. REPEATABLE READ — все чтения внутри транзакции видят одинаковый снимок данных
  4. SERIALIZABLE — самый строгий уровень, гарантирует полную сериализуемость

В asyncpg уровень изоляции задаётся при создании транзакции:

import asyncio

import asyncpg


async def isolation_levels_example() -> None:
    pool: asyncpg.Pool = await asyncpg.create_pool(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await pool.execute("""
        CREATE TABLE IF NOT EXISTS accounts (
            id SERIAL PRIMARY KEY,
            balance INTEGER NOT NULL
        )
    """)
    await pool.execute("TRUNCATE accounts RESTART IDENTITY")
    await pool.execute("INSERT INTO accounts (balance) VALUES (1000)")

    async with pool.acquire() as conn:
        # REPEATABLE READ - все чтения видят один снимок
        async with conn.transaction(isolation="repeatable_read"):
            balance1: int = await conn.fetchval("SELECT balance FROM accounts WHERE id=1")
            print(f"Первое чтение: {balance1}")

            # Имитируем изменение из другой транзакции
            async with pool.acquire() as conn2:
                await conn2.execute("UPDATE accounts SET balance=2000 WHERE id=1")
                print("Другая транзакция изменила баланс на 2000")

            # В REPEATABLE READ увидим старое значение
            balance2: int = await conn.fetchval("SELECT balance FROM accounts WHERE id=1")
            print(f"Второе чтение: {balance2}")
            # => Второе чтение: 1000 (старый снимок)

    # После транзакции увидим новое значение
    final_balance: int = await pool.fetchval("SELECT balance FROM accounts WHERE id=1")
    print(f"После транзакции: {final_balance}")
    # => После транзакции: 2000

    await pool.close()


asyncio.run(isolation_levels_example())

Когда использовать каждый уровень:

  • READ COMMITTED — для большинства случаев, минимальные блокировки
  • REPEATABLE READ — когда нужна консистентность чтений внутри транзакции
  • SERIALIZABLE — для критичных операций, требующих полной изоляции

Реакция на отмену задач

При отмене асинхронной задачи (CancelledError) важно корректно завершить транзакцию и освободить ресурсы:

import asyncio

import asyncpg


async def long_transaction(conn: asyncpg.Connection) -> None:
    """Длительная транзакция с корректной обработкой отмены"""
    try:
        async with conn.transaction():
            await conn.execute("INSERT INTO users(name) VALUES($1)", "User1")
            print("Вставлен User1")

            # Имитация долгой операции
            await asyncio.sleep(10)

            await conn.execute("INSERT INTO users(name) VALUES($1)", "User2")
            print("Вставлен User2")

    except asyncio.CancelledError:
        print("Транзакция отменена - rollback выполнен автоматически")
        raise  # Важно пробросить исключение дальше


async def cancellation_example() -> None:
    conn: asyncpg.Connection = await asyncpg.connect(
        user="postgres", password="admin", database="postgres", host="127.0.0.1"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT NOT NULL
        )
    """)
    await conn.execute("TRUNCATE users RESTART IDENTITY")

    task = asyncio.create_task(long_transaction(conn))

    # Отменяем задачу через 2 секунды
    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Задача была отменена")

    # Проверяем, что User1 не был зафиксирован
    users = await conn.fetch("SELECT name FROM users")
    print(f"Пользователей в базе: {len(users)}")
    # => Пользователей в базе: 0

    await conn.close()


asyncio.run(cancellation_example())

Ключевой момент: контекстный менеджер async with conn.transaction() автоматически выполняет rollback при CancelledError, гарантируя целостность данных.

Лёгкая альтернатива: aiosqlite

Для локальных упражнений и разработки удобнее использовать aiosqlite — асинхронную обёртку над SQLite. Это позволяет работать с базой данных без необходимости установки и настройки PostgreSQL:

pip install aiosqlite

Пример использования:

import asyncio

import aiosqlite


async def aiosqlite_example() -> None:
    # Создаём/открываем БД в файле
    async with aiosqlite.connect("example.db") as db:
        # Создаём таблицу
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                age INTEGER
            )
        """)

        # Вставляем данные (параметризованный запрос)
        await db.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Alice", 25))
        await db.execute("INSERT INTO users (name, age) VALUES (?, ?)", ("Bob", 30))

        # Фиксируем изменения
        await db.commit()

        # Читаем данные
        async with db.execute("SELECT * FROM users") as cursor:
            rows = await cursor.fetchall()
            for row in rows:
                print(f"ID: {row[0]}, Name: {row[1]}, Age: {row[2]}")
                # => ID: 1, Name: Alice, Age: 25
                # => ID: 2, Name: Bob, Age: 30


asyncio.run(aiosqlite_example())

Преимущества aiosqlite:

  • Не требует установки сервера БД
  • Идеально для обучения и тестирования
  • Простая миграция на PostgreSQL при необходимости
  • Поддерживает транзакции и параметризованные запросы

Пример с транзакциями:

import asyncio

import aiosqlite


async def transaction_example() -> None:
    async with aiosqlite.connect("example.db") as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS accounts (
                id INTEGER PRIMARY KEY,
                balance INTEGER
            )
        """)
        await db.execute("DELETE FROM accounts")
        await db.execute("INSERT INTO accounts VALUES (1, 1000)")
        await db.commit()

        try:
            # Начинаем транзакцию (по умолчанию в aiosqlite)
            await db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
            await db.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")

            # Симулируем ошибку
            raise ValueError("Ошибка перевода!")

            await db.commit()
        except ValueError as e:
            print(f"Ошибка: {e}")
            await db.rollback()
            print("Транзакция отменена")

        # Проверяем баланс
        async with db.execute("SELECT balance FROM accounts WHERE id = 1") as cursor:
            balance = await cursor.fetchone()
            print(f"Баланс счёта 1: {balance[0]}")
            # => Баланс счёта 1: 1000 (откат выполнен)


asyncio.run(transaction_example())

Контрольный список для работы с асинхронными БД

Всегда используйте параметризованные запросы — никогда не используйте форматирование строк для SQL-запросов

Пулы должны закрываться при завершении сервиса — используйте await pool.close() или контекстные менеджеры

Задавайте таймауты для всех операций — используйте command_timeout для пула и asyncio.timeout() для транзакций

Ограничивайте время выполнения транзакций — длительные транзакции блокируют другие операции

Выбирайте правильный уровень изоляции — используйте минимально необходимый уровень для производительности

Обрабатывайте CancelledError корректно — транзакции должны откатываться при отмене задачи

Настройте размер пула соединений — баланс между производительностью и нагрузкой на БД

Используйте батчи для массовых операцийexecutemany() значительно быстрее множественных execute()

Для разработки используйте aiosqlite — не требует установки PostgreSQL

Логируйте все ошибки БД — это критично для диагностики проблем в production

Пример production-ready конфигурации:

import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator

import asyncpg

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DatabasePool:
    """Production-ready пул соединений с БД"""

    def __init__(self, dsn: str, min_size: int = 10, max_size: int = 20) -> None:
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool: asyncpg.Pool | None = None

    async def connect(self) -> None:
        """Создаёт пул соединений"""
        logger.info("Создание пула соединений с БД")
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60.0,
            max_inactive_connection_lifetime=300.0,
        )
        logger.info(f"Пул создан: min={self.min_size}, max={self.max_size}")

    async def close(self) -> None:
        """Закрывает пул соединений"""
        if self.pool:
            logger.info("Закрытие пула соединений")
            await self.pool.close()
            logger.info("Пул закрыт")

    @asynccontextmanager
    async def transaction(
        self, timeout: float = 30.0, isolation: str = "read_committed"
    ) -> AsyncGenerator[asyncpg.Connection, None]:
        """Контекстный менеджер для транзакций с таймаутом

        Args:
            timeout: Максимальное время выполнения транзакции
            isolation: Уровень изоляции транзакции
        """
        if not self.pool:
            raise RuntimeError("Пул не инициализирован")
        async with self.pool.acquire() as conn:
            try:
                async with asyncio.timeout(timeout):
                    async with conn.transaction(isolation=isolation):
                        yield conn
            except asyncio.TimeoutError:
                logger.error(f"Транзакция превысила таймаут {timeout}с и была отменена")
                raise
            except asyncio.CancelledError:
                logger.warning("Транзакция отменена")
                raise
            except asyncpg.PostgresError as e:
                logger.error(f"Ошибка БД в транзакции: {e}")
                raise

    async def execute(self, query: str, *args, timeout: float = 30.0) -> str:
        """Выполняет запрос с таймаутом"""
        if not self.pool:
            raise RuntimeError("Пул не инициализирован")
        try:
            async with asyncio.timeout(timeout):
                return await self.pool.execute(query, *args)
        except asyncio.TimeoutError:
            logger.error(f"Запрос превысил таймаут {timeout}с")
            raise

    async def fetch(self, query: str, *args, timeout: float = 30.0) -> list[asyncpg.Record]:
        """Выполняет SELECT с таймаутом"""
        if not self.pool:
            raise RuntimeError("Пул не инициализирован")
        try:
            async with asyncio.timeout(timeout):
                return await self.pool.fetch(query, *args)
        except asyncio.TimeoutError:
            logger.error(f"Запрос превысил таймаут {timeout}с")
            raise


async def main() -> None:
    """Пример использования production-ready пула"""
    db = DatabasePool(
        dsn="postgresql://postgres:admin@localhost/postgres", min_size=5, max_size=20
    )
    try:
        await db.connect()
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name TEXT NOT NULL
            )
        """)
        async with db.transaction(timeout=10.0, isolation="repeatable_read") as conn:
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice")
            await conn.execute("INSERT INTO users(name) VALUES($1)", "Bob")
            logger.info("Транзакция успешно выполнена")
        users = await db.fetch("SELECT * FROM users")
        logger.info(f"Найдено пользователей: {len(users)}")
    except Exception as e:
        logger.error(f"Ошибка: {e}")
    finally:
        await db.close()


if __name__ == "__main__":
    asyncio.run(main())

Эта конфигурация обеспечивает:

  • Автоматические таймауты для всех операций
  • Корректную обработку отмены задач
  • Логирование всех критических событий
  • Контролируемое управление пулом соединений
  • Гибкую настройку уровней изоляции

Рекомендуемые программы

Завершено

0 / 10