Асинхронность в Python

Теория: Введение в асинхронность

Асинхронность vs параллелизм

Когда мы пишем обычную программу на Python, она выполняется построчно. Операторы идут один за другим: сначала читается файл, потом обрабатываются данные, потом выводится результат. Такой подход называют синхронным: каждая операция ждёт, пока предыдущая завершится.

Теперь представьте, что у нас есть задача скачать данные из интернета с пяти разных сайтов. В синхронной программе это будет выглядеть так: мы делаем запрос к первому сайту, ждём ответа, только после этого идём ко второму, и так далее. В итоге процессор простаивает большую часть времени, потому что сеть работает медленно, а компьютер просто ждёт.

Асинхронность решает эту проблему через механизм кооперативной многозадачности. Она позволяет программе переключаться между задачами, когда одна из них временно ожидает завершения операции ввода-вывода. Пока идёт загрузка данных с первого сайта, мы можем начать скачивание со второго. Важно понимать: асинхронность не создаёт дополнительные потоки или процессы. Вместо этого корутины добровольно уступают управление event loop, пока операционная система обслуживает операции ввода-вывода.

А вот параллелизм — это другое. Параллельность означает одновременное выполнение нескольких операций на разных вычислительных ресурсах (ядрах процессора, отдельных процессах или машинах). Например, если у нас есть четыре ядра, мы можем параллельно запустить четыре вычисления, каждое на своём ядре.

Асинхронность — это скорее про эффективное использование времени ожидания через кооперативное переключение задач. Параллелизм — это буквально одновременное выполнение задач на разных вычислительных ресурсах. Иногда они могут комбинироваться, но путать их нельзя.

Простейший пример. Представим обычную функцию, которая имитирует долгую работу:

import time


def task(name: str) -> None:
    print(f"Начинаю {name}")
    time.sleep(2)
    print(f"Заканчиваю {name}")


task("A")
task("B")

Здесь всё выполняется синхронно: сначала выполняется задача A, потом задача B. Общее время — примерно 4 секунды. Асинхронная версия позволила бы переключаться: пока первая задача ждёт, запускается вторая.

Когда асинхронность эффективна

Чтобы понять, где нужна асинхронность, полезно различать два типа задач:

  1. CPU-bound — задачи, которые загружают процессор вычислениями. Например, сортировка огромного массива или вычисление факториала для большого числа. Здесь асинхронность почти бесполезна, потому что процессор и так всё время занят.

  2. IO-bound — задачи, которые много времени тратят на ожидание операций ввода-вывода. Это работа с сетью, с диском, с базой данных, с внешними устройствами. Компьютер послал запрос, и пока операционная система обрабатывает операцию ввода-вывода, программа может заниматься другими задачами. Вот здесь асинхронность раскрывается в полной мере.

Простой пример. Пусть у нас есть две функции: одна долго считает, другая долго ждёт.

Примечание: В примерах кода далее я буду указывать реальные замеры времени выполнения, но на вашей системе они могут отличаться в зависимости от производительности компьютера.

import time


def cpu_bound() -> None:
    start = time.time()
    x = 0
    for i in range(10**7):
        x += i
    print("CPU задача заняла", time.time() - start, "секунд")


def io_bound() -> None:
    start = time.time()
    time.sleep(3)
    print("IO задача заняла", time.time() - start, "секунд")


cpu_bound()
io_bound()

Первая функция cpu_bound() использует процессор — она загружает его до конца. Вторая io_bound() просто ждёт, но результат занимает те же ~3 секунды. Если операций с ожиданием много, асинхронность позволяет не терять это время. Пока одна задача ожидает завершения операции ввода-вывода, можно заняться другой.

Сравнение потоков, процессов и корутин

Чтобы организовать одновременное выполнение, можно использовать разные подходы. Традиционно это потоки и процессы. Потоки — это несколько последовательностей команд внутри одной программы. Процессы — это полностью отдельные программы, которые операционная система запускает параллельно.

Корутины — это другой подход. Они не создают дополнительные потоки или процессы. Это функции, которые можно приостанавливать и возобновлять. В отличие от потоков, переключение между корутинами полностью управляется event loop внутри программы через кооперативную многозадачность, а не операционной системой через вытесняющую многозадачность. За счёт этого переключение намного быстрее и дешевле.

Чтобы лучше понять, можно вообразить, что корутина — это актёр на сцене. Он произносит свою реплику и уходит за кулисы, уступая место другому. Позже возвращается и продолжает с того же места. Потоки же — это как несколько актёров, которые одновременно находятся на сцене, но режиссёр (планировщик ОС) постоянно решает, кому сейчас давать слово. Процессы — это вообще разные спектакли в разных залах.

Event Loop и кооперативная многозадачность

В основе асинхронного программирования лежит event loop (цикл обработки событий). Это центральный механизм, который:

  1. Поддерживает очередь готовых к выполнению корутин
  2. Выполняет корутины до тех пор, пока они не встретят операцию ввода-вывода
  3. Передаёт операции ввода-вывода операционной системе
  4. Ожидает уведомлений от ОС о завершении операций
  5. Возобновляет соответствующие корутины через callbacks

Ключевой термин здесь — кооперативная многозадачность. Корутины сами решают, когда уступить управление (через await), в отличие от потоков, где переключение происходит принудительно.

┌──────────────────────────────────────┐
│         Event Loop                   │
├──────────────────────────────────────┤
│  Очередь готовых задач:              │
│  [корутина A, корутина C]            │
│                                      │
│  Ожидание I/O:                       │
│  корутина B → чтение файла           │
│  корутина D → сетевой запрос         │
│                                      │
│  При завершении I/O:                 │
│  ОС → callback → возврат в очередь   │
└──────────────────────────────────────┘

Ограничения GIL и работа Python с задачами

В Python есть особенность, которая сильно влияет на то, как работает параллельное выполнение. Это так называемый Global Interpreter Lock, или GIL. Его можно представить как общий замОк, который удерживает интерпретатор. GIL блокирует одновременное выполнение байт-кода Python в разных потоках, даже на многоядерных системах. Однако GIL освобождается во время IO-операций, что позволяет другим потокам работать в это время.

Причина появления GIL проста: он сильно упрощает управление памятью и делает работу интерпретатора более безопасной. Но у этого есть последствия. Если задача нагружает процессор сложными вычислениями, то использование нескольких потоков в Python не даёт выигрыша по скорости, потому что всё равно выполняется только один поток Python-кода за раз.

Разберём это на упрощённом примере. Допустим, мы пишем функцию cpu_bound(), которая просто много считает:

import time


def cpu_bound(n: int) -> int:
    total = 0
    for i in range(10**7):
        total += i * n
    return total


start = time.time()
cpu_bound(1)
cpu_bound(2)
print("Время для двух вызовов подряд:", time.time() - start)

Здесь мы вызываем функцию дважды, и оба раза процессор занят вычислениями. Если бы Python мог по-настоящему распараллелить эти вызовы на разные ядра в потоках, выполнение было бы быстрее. Но из-за GIL даже в многопоточной версии всё равно работал бы только один поток Python-кода за раз.

Теперь сравним с другой ситуацией — когда задачи не нагружают процессор, а просто ожидают:

import time


def io_bound(n: int) -> None:
    print(f"Начало {n}")
    time.sleep(2)
    print(f"Конец {n}")


start = time.time()
io_bound(1)
io_bound(2)
print("Время выполнения двух вызовов подряд:", time.time() - start)

Обе задачи заняли примерно 4 секунды, потому что они выполнялись строго друг за другом. Но обратите внимание: всё время они ожидали, а не считали. В таких случаях как раз и нужна асинхронность.

Асинхронная версия с asyncio

Давайте перепишем предыдущий пример, используя asyncio:

Важно: В асинхронном коде нельзя использовать time.sleep(), так как это блокирует event loop. Вместо этого используйте await asyncio.sleep().

import asyncio
import time


async def async_io_bound(n: int) -> None:
    print(f"Начало {n}")
    await asyncio.sleep(2)  # НЕ time.sleep()!
    print(f"Конец {n}")


async def main() -> None:
    start = time.time()
    await asyncio.gather(async_io_bound(1), async_io_bound(2))
    print("Время выполнения с asyncio:", time.time() - start)


asyncio.run(main())

Теперь общее время составит примерно 2 секунды вместо 4, потому что обе корутины выполняются конкурентно: пока одна ожидает в asyncio.sleep(), event loop переключается на другую.

Ключевое отличие:

  • time.sleep() блокирует весь поток выполнения
  • await asyncio.sleep() уступает управление event loop, который может выполнять другие корутины

Отсюда вывод: Асинхронность не запускает дополнительные потоки или процессы. Вместо этого корутины кооперативно уступают управление event loop (через await), пока операционная система обслуживает операции ввода-вывода. GIL ограничивает параллельное выполнение CPU-bound задач в потоках, но не препятствует эффективной асинхронной обработке IO-bound задач.

Именно поэтому в Python асинхронность так активно применяется для сетевых запросов, работы с базами данных и файловыми операциями, но не ускоряет задачи, которые полностью зависят от процессора.

Контрольный список при работе с asyncio

При написании асинхронного кода следуйте этим правилам:

  • Используйте await asyncio.sleep() вместо time.sleep()time.sleep() блокирует event loop

  • Явно указывайте типы исключений в except — никогда не используйте голый except: без указания типа

  • Не подавляйте asyncio.CancelledError — это исключение используется для корректной отмены задач

  • Используйте async with для асинхронных контекстных менеджеров — это гарантирует правильное освобождение ресурсов

    Плохо:

    try:
        time.sleep(1)  # Блокирует event loop
    except:  # Слишком широко
        pass

    Хорошо:

    try:
        await asyncio.sleep(1)  # Не блокирует event loop
    except ValueError as e:  # Конкретный тип
        logger.error(f"Ошибка: {e}")

Рекомендуемые инструменты для Python-разработки

uv — современный менеджер пакетов на Rust, в 10-100 раз быстрее pip для установки зависимостей и создания виртуальных окружений.

Ruff — универсальный линтер и форматтер, заменяющий flake8, black, isort и другие инструменты одной быстрой утилитой на Rust.

Docker — платформа контейнеризации для упаковки приложения со всеми зависимостями, обеспечивающая одинаковую работу на любых системах.

Рекомендуемые программы

Завершено

0 / 10