Pythonで使える!非同期対応のRequestsモジュールとしてのHTTPX
はじめに
PythonでHTTPリクエストを大量に非同期で投げる方法を探していました。request
モジュールはどうも対応していない様子なので、aiohttp
を使うしかないのか…?と諦めていたところ、どうやらHTTPX
が良さそうなので、試してみました。
What's HTTPX?
HTTPXはDjango REST frameworkや、Starlette、Uvicornと同様に管理しているEncode社が管理しているプロダクトのようです。
HTTPX is a fully featured HTTP client library for Python 3. It includes an integrated command line client, has support for both HTTP/1.1 and HTTP/2, and provides both sync and async APIs.
と、公式にもあるようにHTTP/1.1, HTTP/2が扱える、同期・非同期対応のクライアントです。コマンドラインでも扱えるようですが、今回は説明は割愛します。
Setting
コードの説明をする前に検証用の環境を構築します。
対向API
今回は非同期で大量にリクエストする検証なので、対向APIをAWS Lambda+Honoで建てます。作り方は特に公式手順からは変えていません。
コードはこちら。POSTされたidを1秒待ってJSONでオウム返しするだけ。
import { Hono } from 'hono'
import { handle } from 'hono/aws-lambda'
import { logger } from 'hono/logger'
const app = new Hono()
app.use(logger())
const sleep = (time) => new Promise((r) => setTimeout(r, time));
app.post('/', async (c) => {
const { id } = await c.req.parseBody()
await sleep(1000)
return c.json({"id": id})
})
export const handler = handle(app)
環境構築
Pythonの動作環境は特にこだわりがないのでRyeで作ってhttpxをインストールします。
# Setup Rye
curl -sSf https://rye-up.com/get | bash
echo 'source "$HOME/.rye/env"' >> ~/.bashrc
source "$HOME/.rye/env"
# init
rye init sample && cd sample
rye add httpx
rye sync
1. 同期リクエスト
まずは小手調べ。コードはこのようにrequestライクに書けます。
import httpx
def send_post_request_sync(url: str, session: httpx.Client, id: int) -> dict:
body = {"id": id}
response = session.post(url, data=body)
print(response.json())
return response.json()
def hit_api_sync(url: str, times: int):
with httpx.Client() as client:
responses = [send_post_request_sync(url, client, i) for i in range(times)]
return responses
def main_sync():
url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
responses = hit_api_sync(url, 100)
print("Number of responses received:", len(responses))
main_sync()
100リクエストを順に投げて、約100秒。想定通りです。
time rye run python src/py/__init__.py
{'id': '0'}
...
{'id': '98'}
{'id': '99'}
Number of responses received: 100
real 1m43.773s
user 0m0.295s
sys 0m0.030s
2. 非同期リクエスト
続いて待望の非同期リクエストを試します。httpxのドキュメントはこの通り。
組み込みモジュールのasyncio
と組み合わせて、先ほどのコードにasync/awaitを加えます。
import asyncio
import httpx
async def send_post_request_async(url: str, session: httpx.AsyncClient, id: int) -> dict:
body = {"id": id}
response = await session.post(url, data=body)
response.raise_for_status()
print(response.json())
return response.json()
async def hit_api_async(url: str, times: int):
async with httpx.AsyncClient() as client:
tasks = [send_post_request_async(url, client, i) for i in range(times)]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def main_async():
url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
responses = await hit_api_async(url, 100)
print("Number of responses received:", len(responses))
asyncio.run(main_async())
1秒待ちのエンドポイントを100発叩いても1.6秒ほどで終わっていることがわかる。非同期リクエストの成功だ。
time rye run python src/py/__init__.py
{'id': '86'}
{'id': '68'}
...
{'id': '63'}
{'id': '79'}
{'id': '42'}
Number of responses received: 100
real 0m1.694s
user 0m0.486s
sys 0m0.042s
3. 大量の非同期リクエスト
ちなみに、先ほどのコードで調子に乗って10000リクエストを投げるとハングします。悲しい。マシンリソースをうまく使いながらやっていきたいところです。
httpxのクライアント側でLimits
を使って、制限をかけられます。
私の使い方の問題な気もしますが、注意として今回のケースではリソース制御がhttpxだけだと上手く行かないのでセマフォ(async.semaphore
)も使って制御しています。
import asyncio
import httpx
limits = httpx.Limits(max_connections=100, max_keepalive_connections=50)
async def send_post_request_async(url: str, session: httpx.AsyncClient, semaphore: asyncio.Semaphore, id: int) -> dict:
async with semaphore:
body = {"id": id}
response = await session.post(url, data=body)
response.raise_for_status()
print(response.json())
return response.json()
async def hit_api_async(url: str, times: int):
semaphore = asyncio.Semaphore(100)
async with httpx.AsyncClient(limits=limits) as client:
tasks = [
send_post_request_async(url, client, semaphore, i) for i in range(times)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def main_async():
url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
responses = await hit_api_async(url, 10000)
print("Number of responses received:", len(responses))
asyncio.run(main_async())
これでハングせずに、10000リクエストを2分以下で完了できました。
{'id': '9998'}
{'id': '9996'}
{'id': '9999'}
Number of responses received: 10000
real 1m44.326s
user 0m37.279s
sys 0m3.833s
まとめ
HTTPXを使うと、PythonでもJavaScriptで書くような非同期リクエストが簡単にできることが分かりました。
このブログの内容のアクションとしては、httpx.AsyncClient
でできるオプションを理解するために、このドキュメントを読むと良さそうです。
今回の記事で足りない部分は、以下のクイックスタートも参考にしていただければ。
Appendix. Retryは別モジュールで
2024.4現在、httpx.AsyncClient
にはRetry機能はありません。私はChatGPTの誤訳で存在すると思い込んでハマったので、ここで供養します。
Pythonのリトライ用モジュールは?
Pythonのリトライといえばretry
やretrying
を使いたくなりますが、基本機能こそ充足しているとはいえ、8年以上コミットが止まっているライブラリを選ぶのは流石に抵抗感があります。この記事を参考にretrying
の後継相当とされるtenacity
を採用しました。
tenacity + httpx
ほぼ上記の記事の使い方で、倍々に最大5回リトライするExponential Backoffを実装できました。
httpx.AsyncClient
でも利用できることを確認しました。
import asyncio
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
limits = httpx.Limits(max_connections=100, max_keepalive_connections=50)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, exp_base=2)
)
async def send_post_request_async(
url: str, session: httpx.AsyncClient, semaphore: asyncio.Semaphore, id: int
) -> dict:
async with semaphore:
body = {"id": id}
response = await session.post(url, data=body)
response.raise_for_status()
print(response.json())
return response.json()
async def hit_api_async(url: str, times: int):
semaphore = asyncio.Semaphore(100)
async with httpx.AsyncClient(limits=limits) as client:
tasks = [
send_post_request_async(url, client, semaphore, i) for i in range(times)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def main_async():
url = "https://4agmurqrwvsghygnwn6roqgsqa0pnqrf.lambda-url.ap-northeast-1.on.aws/"
responses = await hit_api_async(url, 1000)
errors = [resp for resp in responses if isinstance(resp, Exception)]
print(f"Number of responses received: {len(responses)}")
print(f"Number of errors: {len(errors)}")
if errors:
print(f"Example error: {errors[0]}")
print("Number of responses received:", len(responses))
asyncio.run(main_async())
先ほどのHonoのコードを変えて、5割の確率でエラーを返すように変えて試せます。
import { Hono } from 'hono'
import { handle } from 'hono/aws-lambda'
import { logger } from 'hono/logger'
const app = new Hono()
app.use(logger())
const sleep = (time) => new Promise((r) => setTimeout(r, time));
// リクエストが失敗する確率を設定
const FAILURE_RATE = 0.5;
app.post('/', async (c) => {
const { id } = await c.req.parseBody()
await sleep(1000)
if (Math.random() < FAILURE_RATE) {
return c.json({ error: 'Internal Server Error' }, 500)
}
return c.json({"id": id})
})
export const handler = handle(app)
Discussion