Pythonのasyncioでストリーミングを理解する
はじめに
本記事では、Pythonの非同期処理ライブラリである「asyncio」を使ったストリーミング処理の実装について解説します。
非同期処理は、WebアプリケーションやAPIサーバーなどで効率的なリソース活用や高速なレスポンスを実現するために重要な技術です。Pythonではasync
/await
構文やasyncio
ライブラリを使うことで、比較的簡単に非同期処理を記述できます。
しかし、チャットアプリなどでよく見かけるストリーミング処理を実装した際に、「途中のレスポンスをどのようにフロントエンドに流すのか?」という点で悩んだため調べてみました。
本記事では、
- Pythonの非同期処理の基礎
- asyncioによるストリーミングの実装例
- ストリーミングの仕組みやポイント
について、サンプルコードを交えながら解説します。
前提知識として、
- Pythonの基本文法
- 非同期処理の概要(
async
/await
) - ジェネレータの基礎
を軽く知っていると理解しやすいですが、初学者の方にも分かりやすいよう丁寧に説明します。
(非同期処理イメージを掴む記事としてPythonの非同期プログラミングを完全理解がわかりやすかったです。)
Pythonの非同期処理の基礎
async/awaitとasyncioの役割
Pythonでは、非同期処理を記述するためにasync
/await
構文が導入されています。これにより、従来のコールバック地獄やスレッド管理の煩雑さを避けつつ、シンプルに非同期プログラムを書くことができます。
このasync
/await
構文を支えているのが、標準ライブラリのasyncio
です。asyncio
は「イベントループ」と呼ばれる仕組みで、非同期タスクの実行や協調的なマルチタスクを管理します。イベントループは、複数の非同期処理を効率よく切り替えながら実行する役割を担っています。
例えば、複数の非同期タスクを同時に実行する場合、以下のように記述します。
import asyncio
async def task(name, sec):
print(f"{name} 開始")
await asyncio.sleep(sec)
print(f"{name} 終了")
return f"{name} 完了"
async def main():
results = await asyncio.gather(
task("タスクA", 2),
task("タスクB", 1),
task("タスクC", 3)
)
print("全タスクの結果:", results)
asyncio.run(main())
この例では、3つのタスクが同時に開始され、それぞれの指定した時間後に順次終了します。asyncio.gather
を使うことで、複数の非同期処理をまとめて実行し、すべての結果を一度に受け取ることができます。出力から「非同期で複数の処理が進んでいる」ことが直感的に分かります。
ジェネレータとの違い
Pythonには「ジェネレータ」という仕組みもあります。ジェネレータはyield
を使って値を順次返すイテレータですが、非同期処理とは異なります。
- ジェネレータ:値の逐次生成・消費に便利
- 非同期コルーチン(async/await):I/O待ちなどの非同期処理に便利
両者は「途中で一時停止できる」という点で似ていますが、用途や使い方が異なります。
次章では、このasyncio
を使ってストリーミング処理をどのように実装するかを解説します。
asyncioによるストリーミングの実装
ストリーミングとは?
ストリーミングとは、データを一度に全て返すのではなく、分割して順次クライアントに送信する仕組みです。チャットアプリや動画配信、APIの逐次レスポンスなどでよく利用されます。
ストリーミングの実装方法
ストリーミングでは、サーバー側が「データの一部を生成→クライアントに送信→次のデータを生成…」という流れを繰り返します。
これを効率的に実現するためには、asyncioによる非同期処理に加えて、ジェネレータ(yield) を使ってデータを分割し、逐次クライアントに送信する仕組みが必要です。
asyncioは複数のリクエストやタスクを効率よく切り替えながら処理し、ジェネレータのyieldはデータを「チャンク」単位でクライアントに届ける役割を担います。
同期的な実装だと、全ての処理が終わるまで他のリクエストを処理できませんが、asyncioとyieldを組み合わせることでストリーミングを実現できます。
実装例:非同期ストリーミングAPI
以下は、asyncioを使ったシンプルなストリーミングAPIの例です。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def fake_stream():
for i in range(5):
yield f"data: {i}\n"
await asyncio.sleep(1)
@app.get("/stream")
async def stream():
return StreamingResponse(fake_stream(), media_type="text/event-stream")
この例では、fake_stream
関数が非同期ジェネレータとして定義され、1秒ごとにデータを生成・送信しています。StreamingResponse
を使うことで、クライアントはサーバーからのデータをリアルタイムで受け取ることができます。
途中レスポンスをフロントに流す仕組み
HTTPの通常のレスポンスは全ての処理が終わってからまとめて返されますが、ストリーミングでは「チャンク」と呼ばれる単位で部分的にレスポンスを返します。これにより、クライアントはサーバーからのデータを逐次受信・表示できます。
例えば、先ほどのfake_stream
関数のyield
部分が「チャンク送信」に該当します。
async def fake_stream():
for i in range(5):
yield f"data: {i}\n" # ← ここでチャンクを送信
await asyncio.sleep(1)
StreamingResponse
は、このyield
で生成されたデータを受け取るたびに、クライアントへ逐次(チャンク単位で)送信します。つまり、forループのたびにクライアント側で新しいデータが受信できる仕組みです。
このように、asyncioと非同期ジェネレータを活用することで、Pythonでも簡単にストリーミングAPIを実装できます。
まとめ
本記事では、Pythonのasyncioを使った非同期処理とストリーミングの実装方法について解説しました。
非同期処理は、効率的なリソース活用やリアルタイムなレスポンスを実現する上で非常に重要な技術です。asyncioと非同期ジェネレータを活用することで、Pythonでも簡単にストリーミングAPIを構築できることが分かりました。
Discussion