Cloud Run に双方向通信アプリケーションをデプロイしてみよう
tl;dr;
- Cloud Run で WebSocket を利用したアプリケーションを動かしてみた
- Google Cloud の話は半分くらいで、半分くらいはアプリケーションの実装の話です
はじめに
こんにちは、パートナーエンジニアの Shoです。
この記事は Google Cloud Advent Calendar 2023 / 101 版の 12/11 の記事です。
皆さん、Cloud Run を使っていますか? Cloud Run を利用したサービスというと、ウェブサービスだったり、APIサーバだったりを思い浮かべる方が多いかもしれません。
実は、 Cloud Run では Websocket をサポートしており、双方向通信アプリケーションを作ることができます。本稿では、Cloud Run 上で双方向通信プリケーションを動かしてみようと思います。
まえがき
本稿の内容は初歩的なものとなり、セッションアフィニティや、インスタンス間でのデータの同期は扱いません。おまけの項でこれらについて扱っていますので、より実践的なアプリケーションの構築に取り組みたいと思った場合はそちらもご一読ください。
早速デプロイしてみる
百聞は一見にしかず、ということで早速試してみましょう。
ソースコードの取得
本日は以下のソースコードを利用します。
Cloud Shellを開いて、以下のコマンドを実行します。
git clone https://github.com/shonuma/advent-2023-cloudrun-websocket-app.git
# 数行表示される
ディレクトリを移動します。
cd advent-2023-cloudrun-websocket-app
以下のファイル群があることを確認します。
Dockerfile
docs/
LICENSE
main.py
models/
README.md
requirements.txt
requirements.updateme.txt
run.sh
static/
templates/
Cloud Shell 上での実行
まずは Cloud Run のデプロイの前に、CLoud Shell 上で動作確認をしてみましょう。
他の実行環境に影響を与えないために、Python の仮想環境を作成します。
python3 -m venv py3
source py3/bin/activate
コマンドラインの行先頭に (py3)
と表示されていれば、仮想環境を有効にできている状態です。
続いて必要なライブラリをインストールします。
pip install -r requirements.txt
これで準備は完了です。早速サーバを起動していきますが、run.sh
がローカルでの実行用ファイルとなっているため、実行権限を付与して実行してから実行します。
chmod +x run.sh; ./run.sh
実行後、以下のように表示されて入力まちの状態になれば、サーバは起動しています。
Cloud Shell 画面右上部のボタン(以下スクリーンショット参考)を押し、 Preview on port 8080 (8080 ポートでプレビュー)
を押すと、Cloud Shell 上に立ち上がったサーバの動作確認が行えます。
うまくいくと、以下のような Entrance ページが表示されます。
適当な名前をフォームに入力し入室すると以下のような画面に遷移します。
先程の Cloud Shell の画面に戻り、もう一度同じボタンを押して別の画面を開いてみましょう。成功したら、以下図のように画面を横に並べてみます。
画面上部のウィンドウにメッセージを入力することで、相互チャットができることを確認しましょう。
動作確認ができたら、 Cloud Shell の画面に戻って Ctrl + C を押します。サーバがシャットダウンされます。
また、サーバのログに「Get messages:」などの出力が出ていることも確認できます。
Cloud Run へデプロイを行う
それでは、Cloud Run へデプロイを行ってみます。
以下のコマンドを Cloud Shell 上で実行します。コマンドは、それぞれ以下を表現しています。
- Cloud Run へ
advent-2023-ws
というサービス名でデプロイします - ソースコードは現在のディレクトリのものです
- リージョンは
asia-northeast1
です - 認証不要でアクセスできるようにします
- 最小インスタンス数、最大インスタンス数はいずれも
1
です
gcloud run deploy advent-2023-ws --source . --region asia-northeast1 --allow-unauthenticated --min-instances=1 --max-instances=1
コマンド実行時、APIの有効化についてや、Google アカウントの認証を行うかどうかを聞かれることがありますが、いずれも許可、及び認証を行ってください。
成功すると、以下の様なメッセージが表示されます。 <URL>
の部分にはデプロイ先のウェブサイトの URL が表示されています。こちらをクリックしてみましょう。
Service [advent-2023-ws] revision [advent-2023-ws-00007-2wm] has been deployed and is serving 100 percent of traffic.
Service URL: https://<URL>
こちらでも先程の Cloud Shell 上の動作確認と同じようにページを見ることができて、チャットができるようになっています。確認してみてください。
かるたゲームを試す
画面を 2 つ並べた状態で、どちらかの画面で「かるたゲームを始める」を押してみます。
するとシステムメッセージにかるたゲームが開始されたと表示されます。
(左:はじめた側、右:はじめられた側)
はじめた側で、何らかの Google Cloud のアイコンを選んでみましょう。
すると、システムメッセージに「画面上部に表示されるヒントに該当する Google Cloud 製品を選択してください!」と表示され、画面上部に 1 単語ごとにサービスの説明が表示されていきます。
回答する側は、単語のヒントが表示されている間にアイコンをクリックすることで回答することができ、正解するとメッセージと、回答にかかったタイムが表示されるようになっています。
同じ URL にアクセスしてもらい、出題側と回答側(複数人)に分かれることで、Google Cloud かるたゲームを遊ぶことができます。最速コンプリート目指して頑張ってください!
(おまけ:A)アプリケーション内部を覗いてみる
※ ここから先はアプリケーション実装の話なので、Cloud Run の話を読みたい場合は おまけ:B
まで Skip を推奨します。
せっかくアプリケーションをデプロイ出来たので、内部の仕組みも見てみます。
Fast APIの WebSockets のページに、WebSocket を使ったアプリケーションの例が掲載されています。まずはこちらを眺めてみるのもよいでしょう。
接続、切断処理
GitHub のソースコードに戻りますが、125-136行目が接続処理になっています。
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
global users
user_name = base64.b64decode(client_id).decode()
if client_id not in users:
users[client_id] = 1
await manager.send_personal_message(
create_message(
"UI",
get_current_status(),
),
websocket,
)
グローバル変数の users
にログインしたユーザの ID (URLから取得していて、URL部はユーザ見えの ID を base64 encode した値)を格納しています。つまりメモリ内にユーザ一覧を格納していることになり、この変数を確認することで現在の接続人数や、接続者の状況がわかるようになっています。
var input = document.getElementById("messageText").value
console.log(input)
if (input == "") {
document.querySelector("#error-message").textContent = "[Warning]ユーザIDを入力してください。"
} else {
var date = Date.now()
// "ユーザが入れた文字列#タイムスタンプ" を ID として利用
var uid = btoa(input + "#" + date)
location.href = "/game/" + uid
}
ユーザの切断処理で、退出したユーザ ID を削除する処理も行っています。
except WebSocketDisconnect:
manager.disconnect(websocket)
if users.get(client_id) == 10:
game_init()
users.pop(client_id, None)
await manager.broadcast(
create_message(
"SYSTEM",
f"Client #{client_id} left the chat."
)
)
メッセージの送受信処理
今回のアプリケーションはクライアントからのデータの送信はもちろん、サーバから送信されてくるデータを処理しています。いわゆる双方向通信のアプリケーションです。
クライアントから送るデータ、及びサーバから送るデータをどう定義しているのか、について見ていきます。
メッセージの送信(クライアント)
メッセージの送信については、templates/game.html
に処理があります。
チャット窓からの送信は sendMessage
関数が担っていて、これはテキストメッセージをサーバに伝えて、全クライアントに broadcast を依頼する処理となります。
var host = location.host
var base64d_user_id = "{{ base64d_user_id }}"
var protocol = host.startsWith("localhost") ? "ws" : "wss"
var ws = new WebSocket(`${protocol}://${host}/ws/${base64d_user_id}`);
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
一方で、かるたの開始や選択等、ユーザの各種操作をサーバ側へ伝えるプロトコルメッセージは [StartGame]
という文字列で表現しています。
function startKaruta(event, id) {
if (inGame) {
ws.send("[EndGame]")
toggleStartGameBtn()
inGame = false
} else {
ws.send("[StartGame]")
// disableStartGameBtn()
toggleStartGameBtn()
inGame = true
}
event.preventDefault()
}
メッセージの受信(クライアント)
ws.onMessage
でサーバから送信されたデータを処理しています。以下では、JSON
形式のデータをパースして、evdata["messageType"]
の値によって以降の処理を分岐させています。
以下に一部のコードを掲載しますが、START_GAME
の場合は disableStartGameBtn()
を実行していて、関数名から「開始ボタンを無効化する」処理だと推測できます。
ws.onmessage = function(event) {
var evdata = JSON.parse(event.data)
var messages = document.getElementById('messages')
var message = document.createElement('li')
console.log(evdata);
switch (evdata["messageType"]) {
case "START_GAME":
disableStartGameBtn()
break
case "END_GAME":
enableStartGameBtn()
break
このように、サーバから受け取ったデータによって、クライアントの画面を変更したりする処理を行っていることがわかります。それでは、サーバ側の処理もみてみましょう。
メッセージの送信(サーバ)
サーバからクライアントへはどのようにメッセージを送信しているのでしょうか。
処理を見てみると、クライアントからのリクエストに対し、 broadcast
または send_personal
を利用してメッセージを送信していることがわかります。
await manager.send_personal_message(
create_message(
"SYSTEM",
f"You wrote: {data}",
),
websocket
)
await manager.broadcast(
create_message(
"MESSAGE",
data,
user_name=user_name
)
)
これらの処理は、文字通り broadcast = 全体に送信
と、 personal = 個人に送信
となっています。つまり、クライアントから来たメッセージによってこれらを使い分けることができるわけですね。
manager.broadcast
及び manager.send_personal_message
の処理を確認してみると、websocket
というアクセスユーザ単位のインスタンスでユーザを識別していることがわかり、また broadcast
では active_connections
で有効なコネクション一覧を保持していることがわかります。
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
さて、クライアントにメッセージを返却する際、ただのテキストメッセージなのか内部メッセージなのかを区別したいので、戻り値を json で定義しています。
これにより、messageType
を見てクライアントがどのような処理をすべきか、がわかりやすくなっています。より厳密に実装する場合、これらを構造体(dataclass
)として定義しておくと良いでしょう。
def create_message(message_type: str, value, user_name="system"):
return json.dumps(
{
"messageType": message_type,
"user": user_name,
"value": value
}
)
メッセージの受信(サーバ)
受信したメッセージは、main.py
で処理しています。
139 行目以降で、クライアントから到達したメッセージによってサーバ側で様々な処理を行っています。
data = await websocket.receive_text()
global answers
global status
global begin_time
logger.info(f"Get messages: {str(data)} from {client_id}, {user_name}")
logger.info(f"users: {str(users)}")
logger.info(f"status: {str(status)}")
データの流れを見てみる
上記を踏まえて、データがどのように流れていくかを見てみましょう。
一台のインスタンスにクライアント A 〜 C が接続されているとします。
クライアント A から Hello
というメッセージをサーバに送信します。するとサーバは broadcast
、つまりクライアント全員に対して Hello
というメッセージを送信します。その結果、クライアント A 〜 C は、Hello
というメッセージをサーバから受信し、画面に反映することができます。本アプリケーションでは、システムメッセージとして自身が送信したメッセージを You wrote: ...
としてメッセージ送信者に echo する処理が入っています。
では、カルタをスタートする処理を見てみます。[startKaruta]
というメッセージをクライアント A がサーバへ送信しています。メッセージを受け取ったサーバは、A をゲームのオーナーに設定し、ゲームが始まったことを broadcast
しています。また、オーナーにだけゲームの開始手段を別途通知しています。
ここでは、オーナー設定をusers
の値に 10
を設定するとしていますが、マジックナンバーを利用するよりも、role.OWNER
といった形で enum
を定義するほうが望ましいですね。
少し処理を進めて、ゲームオーナーがかるたを選択する処理を考えています。これもメッセージのブロードキャストになるので、クライアントが Hello
を送信する処理と似ています。送っているメッセージの messageType
を変更することで、クライアントに通常のメッセージとは異なるよ、というのを判断できるようにしています。
上記の出題に対し、クライアント B が Bigtable
と回答したケースを考えてみると、以下のようになります。クライアント B が Bigtable
を選択したことをサーバに通知します。サーバ側では、クライアント B の回答状況を保持していて(answers
)、一度回答した場合は回答し直せなかったり、正解かどうかを判定する、という処理に利用しています。
以上が、かるたゲームのデータの流れの説明となります。
(おまけ課題)以下の実装を組み込んでみよう
本アプリケーションはひとまず動作するというレベルのもので、改善の余地が多くあります。
実装が理解できたら、以下にチャレンジしてみましょう。
- カルタの内容を変更してみましょう。
- ヒント:
models/constants.py
を変更する
- ヒント:
- オーナーとユーザの値を enum 値に変更してみましょう。
- サーバからクライアントに送信するメッセージを
dataclass
で定義してみましょう。 - インスタンスを複数立てた場合に、データが同期されるようにしてみましょう。
サーバーが複数台ある場合
では仮に、サーバが 1 台ではなく 2 台あった場合のことを考えてみます。具体的に、クライアント A と B がサーバ X、クライアント C と D がサーバ Y に接続されているとします。
この様になっている場合、クライアント A がサーバに対してメッセージを送信し、サーバが broadcast
を行ったとしても、クライアント C、D にはメッセージを届けることができません。
サーバで保持しているコネクションはあくまで A と B のため、C と D の存在を検知できないためです。
ソースコードを見ると、self.active_connections
は該当サーバに接続されているユーザのみを保持していて、異なるサーバの場合はメモリ内にその情報が存在しないためです。
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
この問題を解決するために、バックエンドにデータベース(redis
)やメッセージングサービス(pub/sub
)を置いて、特定のサーバの更新を他のサーバに伝えるという手段があります。
これらについては、記事後半のインスタンス間のデータの同期で少し解説しています。
(おまけ:B)公式のチュートリアルと、WebSocket アプリケーション実装時に気をつけたほうが良いこと
本稿の内容ですが、公式でもチュートリアルを公開しています。
チャットアプリや、ホワイトボードアプリを試したい場合は、是非こちらのチュートリアルも参考にしてみてください。
再接続の処理
Cloud Run では、WebSocket リクエストは 長時間実行される HTTP リクエスト
として扱われるため、リクエストタイムアウトの影響を受けます(最大 60 分、デフォルトは 5 分)。
そのため、該当時間の接続を想定する場合、クライアントからの接続が切れた場合の再接続処理をクライアント側で実装しておく必要があります。
料金
Cloud Run 上のアプリケーションで Websocket 接続がオープンな場合、インスタンスはアクティブとみなされるため、CPU が割り当てられ料金が発生します。
セッションアフィニティ(スティッキーセッション)
Cloud Run では複数のコンテナインスタンスを扱うことができます。WebSocket を利用したアプリケーションをデプロイした場合、ユーザは最初にいずれかのインスタンスに接続しにいくわけですが、アクセスが切断されて再接続した場合、そのアクセスが同じインスタンスに到達する保証はありません(セッションアフィニティのない世界)。
- クライアント A/B のアクセスが上下のコンテナに振り分けられる。
そこで、セッションアフィニティを有効にすると、Cloud Run は特定のクライアントからの連続したアクセス(TTL を 30 日に設定した場合、30 日間)を、同じインスタンスに振り分けることができます。これにより、ユーザがサービスから切断されてしまったとしても、コンテナ内に保持している情報を再利用できるというわけです。
セッションアフィニティのある世界
- クライアント A のアクセスは下のコンテナに、クライアント B のアクセスは上のコンテナに振り分けられる。
注意すべき点として、以下が挙げられています。
- セッションアフィニティは「特定のユーザ専用」のインスタンスとするわけではありません。クライアント C が現れた場合、A または B と同じインスタンスに接続します。
- Cloud Run の自動スケーリング(イン、アウト)機能のため、セッションアフィニティの機能はベストエフォート(なるだけがんばる)になります。接続していたインスタンスがスケールインした場合、当然そのインスタンスには接続できなくなるため、他のインスタンスにリクエストが転送されることに注意してください。
インスタンス間でのデータの同期
セッションアフィニティの項目では複数のインスタンスを持つ場合の注意点について説明しました。では、Cloud Run の自動スケーリング機能に対応したい場合はどのようにしたら良いでしょうか。
サーバが複数台ある場合でも説明がありますが、サーバが複数台ある場合、該当のインスタンスに接続しているユーザ間でしか情報をやりとりできず、異なるインスタンスに接続しているユーザとはやり取りができません。
以下は、Cloud Run を利用したチャットアプリのアーキテクチャの例です。Cloud Run
のバックエンドに Memorystore for Redis
が配置されており、各々のコンテナ内で共有されているデータをデータベース(キャッシュ)を介してやり取りしています。これにより、コンテナがスケールイン、アウトをした場合でも情報のやり取りが可能です。
インスタンスが増えてくると Cloud Run インスタンスと Redis 間のトラフィックが増大します。キャパシティプランニングを実施する場合、クオータ のページの接続クライアント数や容量に応じて変化するネットワークスループット も参考にしてください。
まとめ
本稿では、Cloud Run 上に双方向通信アプリケーションをデプロイしてみて、その仕組みについて動作を見ながら確認してみました。ちょっとしたインタラクティブなアプリケーションの開発や PoC であれば Cloud Run を利用して双方向アプリケーションを開発するという選択肢もあるということを覚えておいていただますと、皆様の開発の助けになると思っています。
良いアプリケーション開発ライフを!
Discussion