Асинхронность в Python

Теория: Блокирующие операции и адаптеры

Асинхронный код в Python эффективно использует время ожидания ввода-вывода, но не решает всех проблем производительности. В реальных приложениях часто встречается код, который нельзя переписать под async/await: старые библиотеки, блокирующие функции или задачи с серьёзными вычислениями. Для интеграции таких функций в асинхронные программы используются пулы потоков (ThreadPoolExecutor) и процессов (ProcessPoolExecutor). Они позволяют запускать синхронный код параллельно, не блокируя главный цикл событий.

Использование ThreadPoolExecutor и ProcessPoolExecutor

ThreadPoolExecutor для I/O-операций

Когда мы вызываем asyncio.to_thread(func, *args), Python по умолчанию использует глобальный ThreadPoolExecutor. Однако, если приложение требует контроля над количеством потоков или поведения пула, удобнее создать свой экземпляр. Например, для задач чтения с диска или запросов к внешним API можно создать пул из 5 потоков и использовать его во всех корутинах:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor


def read_file(name: str) -> str:
    time.sleep(1)
    return f"Файл {name} прочитан"


async def main() -> None:
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=5) as pool:
        tasks = [loop.run_in_executor(pool, read_file, f"file_{i}.txt") for i in range(10)]
        results = await asyncio.gather(*tasks)
        print(results)


asyncio.run(main())

В этом примере чтение десяти файлов выполняется параллельно, хотя каждая функция read_file блокирующая. Без пула потоков программа выполнялась бы последовательно и занимала бы около 10 секунд, а с пулом из 5 потоков все задачи завершаются примерно за 2 секунды. Это не «настоящая» асинхронность, но с точки зрения event loop — результат тот же: он не простаивает во время ожидания.

Пул можно создать заранее и переиспользовать:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=3)


def process_item(n: int) -> int:
    return n * n


async def handle_items() -> None:
    loop = asyncio.get_running_loop()
    results = await asyncio.gather(
        *(loop.run_in_executor(executor, process_item, i) for i in range(10))
    )
    print(results)
    # => [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


asyncio.run(handle_items())

ProcessPoolExecutor для CPU-операций

ProcessPoolExecutor используется аналогично, но каждая задача выполняется в отдельном процессе, а не в потоке. Это важно, потому что процессы не делят память и не блокируются глобальной блокировкой интерпретатора (GIL). Однако обмен данными между процессами требует сериализации через pickle, поэтому передача больших структур данных замедляет работу. Тем не менее, для независимых и изолированных задач ProcessPoolExecutor остаётся эффективным:

import asyncio
import math
import sys
from concurrent.futures import ProcessPoolExecutor


def calc_factorial(n: int) -> int:
    return math.factorial(n)


async def main() -> None:
    sys.set_int_max_str_digits(1000000)
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor(max_workers=4) as pool:
        numbers = [100_000, 120_000, 80_000, 110_000]
        tasks = [loop.run_in_executor(pool, calc_factorial, n) for n in numbers]
        results = await asyncio.gather(*tasks)
        print([len(str(r)) for r in results])
        # => [456574, 557390, 357507, 506784]


if __name__ == "__main__":
    asyncio.run(main())

Внутренне оба пула управляются очередями заданий. Когда мы вызываем run_in_executor, задача помещается в очередь пула, а один из рабочих потоков или процессов её извлекает и выполняет. После завершения результат возвращается через будущее (Future), которое корутина ожидает с помощью await. Цикл событий остаётся свободным, пока процессы выполняют свои функции, и управление возвращается, когда они завершаются. При этом программа не блокируется и может обслуживать другие корутины.

С появлением Python 3.14 и особенно 3.15 многопоточность стала эффективнее благодаря переработке GIL. Ранее глобальная блокировка интерпретатора позволяла работать только одному потоку Python одновременно, что лишало ThreadPoolExecutor смысла для вычислений. Теперь, в режиме «free-threading», потоки действительно могут выполняться параллельно на разных ядрах процессора. Улучшена синхронизация между потоками, снижены накладные расходы при переключении контекста, что делает ThreadPoolExecutor быстрее и предсказуемее. Асинхронные программы, использующие пулы потоков, теперь выигрывают не только при работе с вводом-выводом, но и при умеренной вычислительной нагрузке.

Можно совмещать оба типа пулов — сетевые запросы в ThreadPoolExecutor, обработку данных в ProcessPoolExecutor:

import asyncio
import hashlib
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

import requests


def fetch(url: str) -> str:
    return requests.get(url).text


def compute_hash(data: str) -> str:
    return hashlib.sha256(data.encode()).hexdigest()


async def main() -> None:
    loop = asyncio.get_running_loop()
    with (
        ThreadPoolExecutor(max_workers=5) as tpool,
        ProcessPoolExecutor(max_workers=2) as ppool,
    ):
        html_list = await asyncio.gather(
            *(
                loop.run_in_executor(tpool, fetch, url)
                for url in ["https://example.com", "https://python.org"]
            )
        )

        hashes = await asyncio.gather(
            *(loop.run_in_executor(ppool, compute_hash, html) for html in html_list)
        )

        print(hashes)


if __name__ == "__main__":
    asyncio.run(main())

При вызове cancel() корутина отменится, но задача в потоке/процессе продолжит выполнение. Исключения из пула передаются в корутину:

import asyncio
from concurrent.futures import ThreadPoolExecutor


def error_task() -> None:
    raise ValueError("Ошибка в задаче потока")


async def main() -> None:
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor() as pool:
        try:
            await loop.run_in_executor(pool, error_task)
        except Exception as e:
            print(f"Перехвачено исключение: {e}")
            # => Перехвачено исключение: Ошибка в задаче потока


asyncio.run(main())

CPU-bound задачи в процессах

Для CPU-интенсивных операций ProcessPoolExecutor обеспечивает истинный параллелизм. Оптимальный размер пула обычно равен числу ядер процессора (os.cpu_count()):

import asyncio
import os
from concurrent.futures import ProcessPoolExecutor


def intensive_calc(n: int) -> int:
    count = 0
    for i in range(n * 1_000_000):
        count += i % 3 - i % 2
    return count


async def main() -> None:
    loop = asyncio.get_running_loop()
    optimal_workers = os.cpu_count() or 4

    with ProcessPoolExecutor(max_workers=optimal_workers) as pool:
        tasks = [loop.run_in_executor(pool, intensive_calc, 5) for _ in range(4)]
        results = await asyncio.gather(*tasks)
        print(results)
        # => [2499999, 2499999, 2499999, 2499999]


if __name__ == "__main__":
    asyncio.run(main())

Каждый процесс имеет собственный интерпретатор Python и не делит память с другими. Данные передаются через сериализацию (pickle), поэтому избегайте передачи тяжёлых структур. Для больших массивов чисел используйте общую память через multiprocessing.Array или библиотеки вроде numpy.

Интеграция sync API в async-программы

Если синхронная функция не поддерживает asyncio, её можно изолировать в пуле потоков или процессов.

Простейший способ — использовать asyncio.to_thread() (доступен с Python 3.9):

import asyncio
import time


def blocking_task(x: int) -> int:
    time.sleep(2)
    return x * 2


async def main() -> None:
    print("Запуск синхронной функции в фоне")
    result = await asyncio.to_thread(blocking_task, 5)
    print(f"Результат: {result}")
    # => Результат: 10


asyncio.run(main())

Для частого использования синхронных API создайте свой ThreadPoolExecutor:

import asyncio
from concurrent.futures import ThreadPoolExecutor

import requests


def get_page(url: str) -> int:
    response = requests.get(url)
    return response.status_code


async def main() -> None:
    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(max_workers=5) as pool:
        urls = ["https://example.com", "https://python.org", "https://github.com"]
        tasks = [loop.run_in_executor(pool, get_page, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)
        # => [200, 200, 200]


asyncio.run(main())

Для CPU-интенсивных синхронных функций используйте ProcessPoolExecutor:

import asyncio
from concurrent.futures import ProcessPoolExecutor


def calc_pi(n: int) -> float:
    s = 0.0
    for i in range(n):
        s += ((-1.0) ** i) / (2 * i + 1)
    return 4 * s


async def main() -> None:
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, calc_pi, 50_000_000)
        print(result)
        # => 3.1415926335902506


if __name__ == "__main__":
    asyncio.run(main())

Функция calc_pi() загружает процессор на максимум, но цикл событий не блокируется и может обслуживать другие корутины.

Рекомендуемые программы

Завершено

0 / 10