본문으로 건너뛰기

Queues and Backpressure

async 시스템이 터지는 흔한 이유는 "너무 느려서"보다 "입력을 통제하지 못해서"다. producer는 계속 밀어 넣고 consumer는 못 따라가는데 큐 크기 제한도, 동시성 제한도, timeout도 없으면 메모리와 downstream이 먼저 무너진다.

빠른 요약: backpressure의 기본 도구는 `Queue(maxsize=...)`, `Semaphore`, timeout, 그리고 overload policy다. 무한 큐와 무한 fan-out은 초반엔 편하지만 운영 단계에서 가장 먼저 문제를 만든다.

구조 그림

입력량을 통제하려면 큐 크기와 동시 실행 수를 같이 제한해야 한다. 큐는 burst를 흡수하고, semaphore는 downstream 동시 접근을 제어한다.

bounded queue와 graceful shutdown 예제

py
import asyncio


async def producer(queue: asyncio.Queue[int]) -> None:
    for item in range(6):
        print("produce", item, "qsize before:", queue.qsize())
        await queue.put(item)
        print("enqueued", item, "qsize after:", queue.qsize())


async def worker(
    name: str,
    queue: asyncio.Queue[int],
    limiter: asyncio.Semaphore,
) -> None:
    try:
        while True:
            item = await queue.get()
            try:
                async with limiter:
                    await asyncio.sleep(0.15)
                    print(name, "handled", item)
            finally:
                queue.task_done()
    except asyncio.QueueShutDown:
        print(name, "shutdown")


async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue(maxsize=2)
    limiter = asyncio.Semaphore(2)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(producer(queue))
        for index in range(3):
            task_group.create_task(worker(f"worker-{index}", queue, limiter))

        await queue.join()
        queue.shutdown()


asyncio.run(main())

`maxsize=2`라서 producer는 queue가 꽉 차면 기다리게 된다. 이것이 가장 기본적인 backpressure다. Python 3.13+의 `Queue.shutdown()`은 worker를 더 깔끔하게 멈추는 데 유용하다.

backpressure를 설계할 때 정해야 할 것

  • 입력을 기다리게 할 것인가 (await queue.put)
  • 일정 시간 후 실패시킬 것인가 (asyncio.timeout)
  • 오래된 작업을 버릴 것인가
  • 여러 downstream 호출 수를 어떻게 제한할 것인가 (Semaphore)

overload에 대한 기본 전략

bounded queue

버스트를 흡수하되, 무한히 메모리를 잡아먹지 않게 한다.

semaphore

큐가 있어도 downstream API/DB에 동시에 몇 개까지 보낼지 제한해야 한다.

timeout

enqueue나 downstream 작업이 너무 오래 걸릴 때 언제 포기할지 기준을 둔다.

drop policy

실시간 시스템은 "무조건 다 처리"보다 "낡은 작업을 버리고 최신 상태를 우선"하는 편이 맞는 경우도 많다.

실전 연결

  • webhook processing
  • rate-limited 외부 API fan-out
  • background worker queue
  • burst traffic ingestion

공식 자료

VitePress로 빌드한 Python 3.14 핸드북