🦋

Pythonで使える!非同期対応のRequestsモジュールとしてのHTTPX

2024/04/15に公開

はじめに

PythonでHTTPリクエストを大量に非同期で投げる方法を探していました。requestモジュールはどうも対応していない様子なので、aiohttpを使うしかないのか…?と諦めていたところ、どうやらHTTPXが良さそうなので、試してみました。

What's HTTPX?

HTTPXはDjango REST frameworkや、Starlette、Uvicornと同様に管理しているEncode社が管理しているプロダクトのようです。

https://www.encode.io/projects/

HTTPX is a fully featured HTTP client library for Python 3. It includes an integrated command line client, has support for both HTTP/1.1 and HTTP/2, and provides both sync and async APIs.

https://www.python-httpx.org/

と、公式にもあるようにHTTP/1.1, HTTP/2が扱える、同期・非同期対応のクライアントです。コマンドラインでも扱えるようですが、今回は説明は割愛します。

Setting

コードの説明をする前に検証用の環境を構築します。

対向API

今回は非同期で大量にリクエストする検証なので、対向APIをAWS Lambda+Honoで建てます。作り方は特に公式手順からは変えていません。
https://hono.dev/getting-started/aws-lambda

コードはこちら。POSTされたidを1秒待ってJSONでオウム返しするだけ。

import { Hono } from 'hono'
import { handle } from 'hono/aws-lambda'
import { logger } from 'hono/logger'

const app = new Hono()

app.use(logger())
const sleep = (time) => new Promise((r) => setTimeout(r, time));

app.post('/', async (c) => {
    const { id } = await c.req.parseBody()
    await sleep(1000)
    return c.json({"id": id})
})

export const handler = handle(app)

環境構築

Pythonの動作環境は特にこだわりがないのでRyeで作ってhttpxをインストールします。

# Setup Rye
curl -sSf https://rye-up.com/get | bash
echo 'source "$HOME/.rye/env"' >> ~/.bashrc
source "$HOME/.rye/env"

# init
rye init sample && cd sample
rye add httpx
rye sync

1. 同期リクエスト

まずは小手調べ。コードはこのようにrequestライクに書けます。

import httpx

def send_post_request_sync(url: str, session: httpx.Client, id: int) -> dict:
    body = {"id": id}
    response = session.post(url, data=body)
    print(response.json())
    return response.json()


def hit_api_sync(url: str, times: int):
    with httpx.Client() as client:
        responses = [send_post_request_sync(url, client, i) for i in range(times)]
        return responses


def main_sync():
    url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
    responses = hit_api_sync(url, 100)
    print("Number of responses received:", len(responses))

main_sync()

100リクエストを順に投げて、約100秒。想定通りです。

time rye run python src/py/__init__.py 
{'id': '0'}
...
{'id': '98'}
{'id': '99'}
Number of responses received: 100

real    1m43.773s
user    0m0.295s
sys     0m0.030s

2. 非同期リクエスト

続いて待望の非同期リクエストを試します。httpxのドキュメントはこの通り。

https://www.python-httpx.org/async/

組み込みモジュールのasyncioと組み合わせて、先ほどのコードにasync/awaitを加えます。

import asyncio
import httpx

async def send_post_request_async(url: str, session: httpx.AsyncClient, id: int) -> dict:
    body = {"id": id}
    response = await session.post(url, data=body)
    response.raise_for_status() 
    print(response.json())
    return response.json()

async def hit_api_async(url: str, times: int):
    async with httpx.AsyncClient() as client:
        tasks = [send_post_request_async(url, client, i) for i in range(times)]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return responses

async def main_async():
    url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
    responses = await hit_api_async(url, 100)
    print("Number of responses received:", len(responses))

asyncio.run(main_async())

1秒待ちのエンドポイントを100発叩いても1.6秒ほどで終わっていることがわかる。非同期リクエストの成功だ。

time rye run python src/py/__init__.py 
{'id': '86'}
{'id': '68'}
...
{'id': '63'}
{'id': '79'}
{'id': '42'}
Number of responses received: 100

real    0m1.694s
user    0m0.486s
sys     0m0.042s

3. 大量の非同期リクエスト

ちなみに、先ほどのコードで調子に乗って10000リクエストを投げるとハングします。悲しい。マシンリソースをうまく使いながらやっていきたいところです。

httpxのクライアント側でLimitsを使って、制限をかけられます。

https://www.python-httpx.org/advanced/resource-limits/

私の使い方の問題な気もしますが、注意として今回のケースではリソース制御がhttpxだけだと上手く行かないのでセマフォ(async.semaphore)も使って制御しています。

import asyncio
import httpx

limits = httpx.Limits(max_connections=100, max_keepalive_connections=50)

async def send_post_request_async(url: str, session: httpx.AsyncClient, semaphore: asyncio.Semaphore, id: int) -> dict:
    async with semaphore:
        body = {"id": id}
        response = await session.post(url, data=body)
        response.raise_for_status() 
        print(response.json())
        return response.json()


async def hit_api_async(url: str, times: int):
    semaphore = asyncio.Semaphore(100)
    async with httpx.AsyncClient(limits=limits) as client:

        tasks = [
            send_post_request_async(url, client, semaphore, i) for i in range(times)
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return responses
        
async def main_async():
    url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
    responses = await hit_api_async(url, 10000)
    print("Number of responses received:", len(responses))

asyncio.run(main_async())

これでハングせずに、10000リクエストを2分以下で完了できました。

{'id': '9998'}
{'id': '9996'}
{'id': '9999'}
Number of responses received: 10000

real    1m44.326s
user    0m37.279s
sys     0m3.833s

まとめ

HTTPXを使うと、PythonでもJavaScriptで書くような非同期リクエストが簡単にできることが分かりました。

このブログの内容のアクションとしては、httpx.AsyncClientでできるオプションを理解するために、このドキュメントを読むと良さそうです。

https://www.python-httpx.org/api/#asyncclient

今回の記事で足りない部分は、以下のクイックスタートも参考にしていただければ。

https://www.python-httpx.org/quickstart/

Appendix. Retryは別モジュールで

2024.4現在、httpx.AsyncClientにはRetry機能はありません。私はChatGPTの誤訳で存在すると思い込んでハマったので、ここで供養します。

Pythonのリトライ用モジュールは?

Pythonのリトライといえばretryretryingを使いたくなりますが、基本機能こそ充足しているとはいえ、8年以上コミットが止まっているライブラリを選ぶのは流石に抵抗感があります。この記事を参考にretryingの後継相当とされるtenacityを採用しました。

https://zenn.dev/taroman_zenn/articles/dd0b33a3a37d1e

tenacity + httpx

ほぼ上記の記事の使い方で、倍々に最大5回リトライするExponential Backoffを実装できました。
httpx.AsyncClientでも利用できることを確認しました。

import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

limits = httpx.Limits(max_connections=100, max_keepalive_connections=50)

@retry(
    stop=stop_after_attempt(5), 
    wait=wait_exponential(multiplier=1, exp_base=2) 
)
async def send_post_request_async(
    url: str, session: httpx.AsyncClient, semaphore: asyncio.Semaphore, id: int
) -> dict:
    async with semaphore:
        body = {"id": id}
        response = await session.post(url, data=body)
        response.raise_for_status() 
        print(response.json())
        return response.json()


async def hit_api_async(url: str, times: int):
    semaphore = asyncio.Semaphore(100)
    async with httpx.AsyncClient(limits=limits) as client:

        tasks = [
            send_post_request_async(url, client, semaphore, i) for i in range(times)
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return responses


async def main_async():
    url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
    responses = await hit_api_async(url, 1000)
    errors = [resp for resp in responses if isinstance(resp, Exception)]
    print(f"Number of responses received: {len(responses)}")
    print(f"Number of errors: {len(errors)}")
    if errors:
        print(f"Example error: {errors[0]}")
    print("Number of responses received:", len(responses))


asyncio.run(main_async())

先ほどのHonoのコードを変えて、5割の確率でエラーを返すように変えて試せます。

import { Hono } from 'hono'
import { handle } from 'hono/aws-lambda'
import { logger } from 'hono/logger'

const app = new Hono()

app.use(logger())
const sleep = (time) => new Promise((r) => setTimeout(r, time));

// リクエストが失敗する確率を設定
const FAILURE_RATE = 0.5;

app.post('/', async (c) => {
    const { id } = await c.req.parseBody()
    await sleep(1000)

    if (Math.random() < FAILURE_RATE) {
        return c.json({ error: 'Internal Server Error' }, 500)
    }

    return c.json({"id": id})
})

export const handler = handle(app)

Discussion