FastAPI, asyncio и multiprocessing

Недавно товарищ поделился со мной ссылкой на статью про FastAPI и кооперативную мультипоточность. В ней автор, во-первых, ссылается на исследование другого автора про сравнение производительности между синхронными и асинхронными Python фреймворками. А во-вторых, приводит личный пример падения производительности приложения и как итог значительного увеличение задержки ответов от сервера.

Суть его проблемы в том, что в одном из API Endpoint выполняется CPU-intensive задача, а именно генерация списка из Pydantic-моделей, который позже конвертируется в JSON. Сериализация/десериализация огромного количества объектов недешевая процедура, а тем более для pydantic-моделей, где есть дополнительные накладные расходы. Т.е. эта задача попадает под классический CPU-bound пример. Но нужно понимать, что любой CPU-intensive код полностью блокирует eventloop, и переключение между корутинами прекращается до полной его разблокировки. Автор статьи предлагает использовать несколько воркеров при запуске uvicorn и надеяться, что нагрузка не прилетит сразу на все. Другой вариант это запускать тяжелые задачи в отдельном пуле потоков, но я не понимаю каким образом это решает проблему CPU-intensive задач в связи с присутствием общего GIL (Global Interpreter Lock). В конце концов чтобы улучшить производительность приложения, он рекомендует забрать у FastAPI конвертацию Pydantic модели в json и сделать это самостоятельно, используя метод .json() у pydantic-объекта, выплюнув в итоге голый Response:

Было:

@app.get("/clients", response_model=ClientsResponse)
def clients():
    return ClientsResponse(
        items=[
            Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
            for i in range(40_000)
        ]
    )

Стало:

@app.get("/clients")
def clients():
    return Response(
        content=ClientsResponse(
            items=[
                Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
                for i in range(40_000)
            ]
        ).json(),
        media_type="application/json",
    )

Производительность стала лучше, но проблема блокировки eventloop не ушла. Потому что CPU-intensive задача всё ещё выполняется в рамках того же процесса что и eventloop. Как сделать лучше? Вынести “тяжелую” задачу в отдельный процесс. В рамках работы с asyncio можно создавать процесс и ждать выполнения задачи асинхронно. Сделать это можно через Executor-классы из модуля concurrent.futures и метод run_in_executor у eventloop’а. Для CPU-intensive задач подойдёт ProcessPoolExecutor.

Вот как бы это выглядело:

from concurrent.futures import ProcessPoolExecutor

def generate_client_ids(n: int) -> Response:
    return Response(
        content=ClientsResponse(
            items=[
                Client(id=i, address=Address(id=i), bank_accounts=[Account(id=i)])
                for i in range(n)
            ]
        ).json(),
        media_type='application/json',
    )

@app.get('/clients')
async def clients():
    with ProcessPoolExecutor() as executor:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(executor, generate_client_ids, 50_000)
        return result

В этом случае не произойдёт ни одного таймаута или обрыва соединения, eventloop не блокируется и может переключаться между корутинами пока ожидает ответ от сабпроцесса. По-хорошему стоит вынести пул в общий доступ, чтобы не создавать его каждый раз при выполнении функции clients, но это уже другая история.

Внимательный читатель может заметить, что автор статьи использует синхронные функции (без ключевого слова async), такие функции будут запускаться через ThreadPool самим FastAPI, но сути это не меняет. Чтобы улучшить производительность и избежать таймаутов всё равно нужно запустить CPU-intensive задачу через ProcessPoolExecutor:

@app.get('/clients')
def clients(n: int) -> Response:
    with ProcessPoolExecutor() as pool:
        for f in as_completed([pool.submit(generate_client_ids, 40_000)]):
            return f.result()

Итог

  • если вы работаете в асинхронном режиме избегайте блокирующих операций (длительные CPU-intensive задачи)
  • если избежать не получается, то выносите их в отдельные процессы и общайтесь с ними асинхронно: очереди, ProcessPoolExecutor и т.д.
  • если ваше приложение в основном состоит из CPU-intensive задач, то возможно лучше использовать обычный синхронный фреймворк (Django, Flask, Bottle, Falcon), а все “тяжелые” операции проводить через очереди типа Celery, RQ и т.д.