FastAPIでWebSocketに入門してみた
CISTアドベントカレンダー11日目の記事です!
FastAPIでWebSocketを触ってみたので備忘録的な感じで記事にします。
WebSocketとは?
https://learn.microsoft.com/ja-jp/azure/application-gateway/application-gateway-websocketより
WebSocketとはクライアントとサーバーの間で双方向通信を行うためのプロトコルです。
HTTP通信はリクエストとレスポンスの1セットのやりとりが終わると通信が終了します。対して、WebSocketは最初にコネクションを確立した後、通信を終了するまでサーバーとクライアントのどちらからも任意のタイミングでメッセージを飛ばすことができます。
WebSocketはws://
かwss://
のURLスキームで使用できます。
HTTP通信をいっぱいすればよくね?
わざわざWebSocketなんか使わなくてもHTTP通信をいっぱいすればいい気がします。JSをちょっと書くだけでできそうです。
WebSocketの利点をなんとなく書いていきます。
HTTPだと毎回コネクションを確立する必要がある
HTTPを何回もする形にすると、通信のたびにコネクションを確立する必要があります。どうせ通信先は同じなのに、何回もコネクションを確立するのは無駄ですね。WebSocketなら1回コネクションを確立するだけで済みます。
サーバー側は能動的にクライアント側へメッセージを送ることができない
HTTPは基本的にクライアントがサーバーにリクエストを飛ばす形で利用されています。つまり、クライアントからのリクエストがない限りサーバーは何も返しません。
リアルタイムチャットを想定します。リアルタイムチャットであれば、あるクライアントがサーバーにメッセージを送信した(≒エンドポイントを叩いた)とき、別のクライアントにこれを通知したいはずです。つまり、サーバーから能動的にクライアントへメッセージを送りたいということになります。HTTPだとクライアントがサーバーの状態を監視することができません。WebSocketであれば、通信状態が続いている間はサーバーの状態が変わったときにサーバーから能動的にクライアントへメッセージを送ることができます。
FastAPIとは?
FastAPI は、Pythonの標準である型ヒントに基づいてPython 3.6 以降でAPI を構築するための、モダンで、高速(高パフォーマンス)な、Web フレームワークです。
@app.get("/")
async def hello() -> dict:
return {"msg": "HELLO"}
こんな感じのノリで簡単にWeb APIを実装できます。
FastAPIはWebSocketもサポートしています。
FastAPIでWebSocketを実装する
基本的な実装
とりあえず上記ページのサンプルコードをそのまま動かしてみます
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
localhost:{ポート番号}
にアクセスしてみましょう
送信したメッセージがサーバーからそのまま返ってきていることがわかります。
別のエンドポイントを叩かれたときにメッセージを送信する
サーバーから能動的にクライアントへメッセージを送りたい
これの(簡易的な)実現を目指します。
別のエンドポイントにPOSTメソッドでメッセージを送信されたときに、WebSocket通信中のクライアントにメッセージを送信するようにします。
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var client_id = Date.now()
document.querySelector("#ws-id").textContent = client_id;
var ws = new WebSocket(`ws://localhost:8001/ws/${client_id}`);
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def hello() -> str:
return HTMLResponse(html)
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str) -> None:
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str) -> None:
self.active_connections.pop(client_id)
async def send_personal_message(self, message: str, client_id: str) -> None:
await self.active_connections[client_id].send_text(message)
ws_manager = ConnectionManager()
@app.post("/msg/{client_id}")
async def msg(msg: str, client_id: str) -> None:
await ws_manager.send_personal_message(msg, client_id)
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await ws_manager.connect(websocket, client_id)
try:
while True:
msg = await websocket.receive_text()
await ws_manager.send_personal_message(msg, client_id)
except WebSocketDisconnect:
ws_manager.disconnect(client_id)
ConnectionManagerクラス
ConnectionManagerクラスがWebSocketの接続を管理します。client_id
をキーにして、WebSocketオブジェクトを保持しています。
post("/msg/{client_id}")
client_id
に対応したエンドポイントへメッセージを送信することで、client_id
に対応したWebSocket通信中のクライアントへ同じメッセージを送信します。
動かしてみる
client idを確認する
localhost:{ポート番号}
にアクセスすると、JSが勝手に生成したIDを見ることができます(すでにこのIDでWebSocket通信がはじまっています)。
IDを控えておきましょう。
エンドポイントを叩いてメッセージを送信する
localhost:{ポート番号}/docs
にアクセスすると自動生成されたAPIドキュメントにアクセスできます。APIテスターとしても使えるのでここから、/msg/{client_id}
にメッセージを送信します
実行結果
開発者ツールを見てみます。
緑色のデータがクライアントから送信したメッセージなので、サーバー側から一方的にメッセージを送信できていることが確認できました!
さいごに
FastAPIでWebSocketの実装ができました。もっとがんばれば本格的なリアルタイムチャットツールなどを作ることができそうなので、ぜひやってみてください。
参考
Discussion