k6でLLMのSSEエンドポイントに負荷試験を実施し、Datadogで可視化する
この記事はMLOps Advent Calendar 2024への投稿記事です。
自前で建てたLLMを載せたサーバの負荷試験をk6で行い、結果をDatadogで確認できるようにします。
本文中のコード: https://github.com/hosimesi/k6_datadog
記事の趣旨
ChatGPTをはじめとする生成AIを使ったサービスが増えてきています。一方で、LLM(Large Language Model)からのレスポンスは一般的に遅く、ユーザー体験に影響します。これを緩和する方法の一つとして、LLMの推論結果を一括で表示するのではなく、Server-Sent Events(SSE)を使ってストリーミングで結果を返す方法があります。しかし、このシステムの性能を定量的に測るためには負荷試験が必要で、負荷試験ツールによってはSSEに対応していないものもあります。
そこで本記事では、SSEで結果が返ってくるLLMシステムの負荷試験を、k6とDatadogを使って行います。
本記事のゴール
SSEでレスポンスを返すエンドポイントに対して、LLMを載せたサーバをk6で負荷試験し、Datadog上で結果を確認する、簡易的な負荷試験環境とレポーティングシステムを作ります。
LLMOpsの第一段階として、通常のサーバのメトリクスに加えて、LLM特有の以下のメトリクスを計測することを目指します。
-
TTFT(Time To First Token)
- リクエストを投げてから1トークン目が返ってくるまでの時間
-
TPoT(Time Per Output Token)
- 1トークン生成するのにかかる時間
-
待ち時間
- リクエストを投げてから完全に生成が終わるまでの時間(TTFT + TPoT × トークン数)
-
スループット
- 秒間のトークン生成数
事前知識
Datadogとは
Datadogとは、モダンなモニタリングとセキュリティを提供するSaaS型のモニタリングツールです。単純な監視だけでなく、トレーシングやプロファイラーも提供しています。最近では、LLM Observabilityの機能も拡充しており、この機能に関してはまた別の機会に紹介したいと思います。
k6とは
k6はGrafana Labsが提供しているオープンソースのGo製の負荷試験ツールです。JavaScriptを使ってテストシナリオを作成することで、柔軟かつ軽量な負荷試験が実行できます。実験結果を確認するための可視化ツールとして、xk6-dashboardといった自前の可視化ツールがあり、他のDatadogやGrafanaといった可視化ツールとのインテグレーションも充実しています
ディレクトリ構成
本システムのディレクトリ構成は以下の通りです。
.
├── Makefile
├── README.md
├── compose.yaml
├── docker
│ ├── Dockerfile
│ └── Dockerfile.loadtest
├── infra
│ ├── ar.tf
│ ├── gke.tf
│ ├── main.tf
│ ├── sa.tf
│ └── variable.tf
├── k8s
│ ├── base
│ │ ├── deployment.yaml
│ │ ├── kustomization.yaml
│ │ ├── service.yaml
│ │ └── serviceaccount.yaml
│ └── overlays
│ ├── datadog-agent.yaml
│ ├── deployment.yaml
│ ├── job.yaml
│ ├── kustomization.yaml
│ └── serviceaccount.yaml
├── loadtest
│ └── main.js
├── pyproject.toml
├── src
│ └── main.py
└── uv.lock
GKEクラスタの作成
まず、すべてのインフラの基盤になるGKEクラスタを設定していきます。今回は簡単のため、Control Planeへのアクセスを許可します。実際にプロダクトを運用する時は、限定公開クラスタを使うといいと思います。
インスタンスとして、ある程度のサイズのモデルを載せるために、CPUのみのノードプールであるe2-standard-4を用意します。
resource "google_container_cluster" "chat_app" {
project = var.project
name = "chat-app-cluster"
location = "asia-northeast1-a"
remove_default_node_pool = true
initial_node_count = 1
}
resource "google_container_node_pool" "chat_app_cpu_node_pool" {
name = "chat-app-cpu-node-pool"
location = "asia-northeast1-a"
cluster = google_container_cluster.chat_app.name
node_count = 1
node_config {
preemptible = true
machine_type = "e2-standard-4"
service_account = google_service_account.chat_app_node_sa.email
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
}
management {
auto_repair = "true"
auto_upgrade = "true"
}
}
他にもService Accountのリソース等もありますが、上記の準備ができればリソースを作成していきます。適用後にGKEクラスタが作成されていれば成功です。
$terraform plan
$terraform apply
作成後、GKEのコンテキストを指定して、ローカルからkubectlコマンドが使えるようにしておきます。
Google Cloudのその他のリソースの作成
次に、GKE外のリソースを作成していきます。作成するリソースは以下になります。
resource "google_artifact_registry_repository" "chat_app" {
location = var.region
repository_id = "chat_app"
description = "chat_app用のArtifact Registry"
format = "DOCKER"
}
resource "google_service_account" "chat_app_node_sa" {
account_id = "chat-app-node-sa"
display_name = "chat-app-node-sa"
}
resource "google_service_account" "chat_app_sa" {
account_id = "chat-app-sa"
display_name = "chat-app-sa"
}
resource "google_service_account_iam_binding" "app" {
service_account_id = google_service_account.chat_app_sa.name
role = "roles/iam.workloadIdentityUser"
members = [
"serviceAccount:${var.project}.svc.id.goog[default/chat-app-serviceaccount]"
]
}
resource "google_project_iam_member" "artifact_registry_iam" {
project = var.project
role = "roles/artifactregistry.reader"
member = "serviceAccount:${google_service_account.chat_app_node_sa.email}"
}
resource "google_project_iam_member" "service_account_token_creator_iam" {
project = var.project
role = "roles/iam.serviceAccountTokenCreator"
member = "serviceAccount:${google_service_account.chat_app_sa.email}"
}
今回は簡略化のために作成していませんが、本運用ではSecret Managerが必要になります。Hugging FaceのトークンやDatadogのAPIキー等を管理するために使用し、External Secret経由でGKE側から使用できるようにします。Artifact Registryはイメージの管理に使用します。Service Accountはアプリケーションの権限管理のために使用します。以前はGoogleサービスアカウント(GSA)とKubernetesサービスアカウント(KSA)を紐づけて連携していましたが、現在は不要になりました。
こちらも適用していきます。
$terraform plan
$terraform apply
アプリケーションの準備
LLMが推論して返す簡易的なチャットサーバを作成します。フレームワークにFastAPIを使用し、モデルはgemma-2-2b-jpn-itを使用しました。
SSEでのストリーミングレスポンスにしたいため、FastAPIのStreamingResponseを使用し、起動時にモデルをメモリに展開しておきます。ストリーミングレスポンスなので、LLMの推論はpipeline
で実行するのではなく、TextIteratorStreamerを使用します。
import logging
import os
from contextlib import asynccontextmanager
import torch
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QueryRequest(BaseModel):
query: str
def get_model_and_tokenizer():
try:
device = "mps" if torch.backends.mps.is_available() else "cpu"
logger.info(f"デバイス: {device}. モデルのロードを開始します...")
model = AutoModelForCausalLM.from_pretrained(
"google/gemma-2-2b-jpn-it",
torch_dtype=torch.bfloat16,
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
)
tokenizer = AutoTokenizer.from_pretrained(
"google/gemma-2-2b-jpn-it",
token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
)
model.to(device)
logger.info("モデルのロードに成功しました")
return model, tokenizer, device
except Exception as e:
raise RuntimeError(f"モデルのロードに失敗しました: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# モデルのロード
app.state.model, app.state.tokenizer, app.state.device = get_model_and_tokenizer()
yield
app = FastAPI(lifespan=lifespan)
@app.post("/generate/")
async def generate(query: QueryRequest):
logger.info(f"リクエスト: {query}")
try:
inputs = app.state.tokenizer(query.query, return_tensors="pt").to(
app.state.device
)
streamer = TextIteratorStreamer(app.state.tokenizer, skip_prompt=True)
generation_kwargs = dict(
inputs=inputs.input_ids,
max_new_tokens=256,
streamer=streamer,
pad_token_id=app.state.tokenizer.eos_token_id,
)
app.state.model.generate(**generation_kwargs)
async def event_generator():
try:
for new_text in streamer:
logger.info(f"Response: {new_text}")
yield f"data: {new_text}\n\n"
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"ストリームの生成中にエラーが発生しました: {e}",
)
return StreamingResponse(event_generator(), media_type="text/event-stream")
except Exception as e:
raise HTTPException(status_code=500, detail=f"応答の生成に失敗しました: {e}")
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
そして、ビルドしてイメージをArtifact Registryにプッシュしておきます。
次に、アプリケーションをデプロイするためのインフラの準備をします。Kustomizeを使って記述し、適用します。今回は内部から負荷をかけるため、Ingressは用意しません。
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- deployment.yaml
- serviceaccount.yaml
- service.yaml
apiVersion: v1
kind: Service
metadata:
name: chat-app-cpu-service
spec:
type: ClusterIP
selector:
app: chat-app-cpu
ports:
- port: 8000
targetPort: chat-app
protocol: "TCP"
apiVersion: apps/v1
kind: Deployment
metadata:
name: chat-app-cpu
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: chat-app-cpu
template:
metadata:
labels:
app: chat-app-cpu
spec:
serviceAccountName: chat-app-serviceaccount
terminationGracePeriodSeconds: 60
nodeSelector:
cloud.google.com/gke-nodepool: chat-app-cpu-node-pool
containers:
- name: chat-app
env:
- name: HUGGINGFACE_HUB_TOKEN
valueFrom:
secretKeyRef:
name: hugging-face-secret
key: token
ports:
- containerPort: 8000
name: chat-app
startupProbe:
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 5
initialDelaySeconds: 240
periodSeconds: 10
httpGet:
port: 8000
scheme: HTTP
path: /
livenessProbe:
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 5
initialDelaySeconds: 10
periodSeconds: 10
httpGet:
port: 8000
scheme: HTTP
path: /
readinessProbe:
timeoutSeconds: 2
successThreshold: 2
failureThreshold: 2
initialDelaySeconds: 5
periodSeconds: 5
httpGet:
port: 8000
scheme: HTTP
path: /
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- sleep 50
apiVersion: v1
kind: ServiceAccount
metadata:
name: chat-app-serviceaccount
namespace: default
apiVersion: apps/v1
kind: Deployment
metadata:
name: chat-app-cpu
spec:
template:
spec:
containers:
- name: chat-app
image: asia-northeast1-docker.pkg.dev/<project-id>/chat-app/chat-app:latest
command:
[
".venv/bin/python",
"-m",
"uvicorn",
"src.main:app",
"--host",
"0.0.0.0",
"--port",
"8000",
]
resources:
requests:
cpu: 1000m
memory: 8000Mi
limits:
cpu: 1400m
memory: 10000Mi
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
labels:
- includeSelectors: true
resources:
- ../../base
patches:
- path: deployment.yaml
- path: serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: chat-app-serviceaccount
annotations:
iam.gke.io/gcp-service-account: "chat-app@<project-id>.iam.gserviceaccount.com"
ここで、Hugging Faceのトークンをシークレットとして登録しておきます。本番環境で使用する時は、External Secret経由で与えてください。
$ kubectl create secret generic hugging-face-secret --from-literal token=<your-token>
Datadog Agentの準備
基本的にはこちらの記事と同じ手順で進めていきます。
Datadog Agentをdatadog-operatorを使ってインストールします。
基本的に、Datadogの設定ガイドに従って進めていきます。
$ helm repo add datadog https://helm.datadoghq.com
$ helm install datadog-operator datadog/datadog-operator
$ kubectl create secret generic datadog-secret --from-literal api-key=<your-api-key>
apiVersion: datadoghq.com/v2alpha1
kind: DatadogAgent
metadata:
name: datadog
spec:
global:
clusterName: chat-app-cluster
site: ap1.datadoghq.com
credentials:
apiSecret:
secretName: datadog-secret
keyName: api-key
features:
dogstatsd:
hostPortConfig:
enabled: true
ここで重要なのは、statsdを使用できるように、dogstatsdの設定を有効にする必要があることです。その後、適用していきます。
$ kubectl apply -f ./k8s/overlays/datadog-agent.yaml
Datadog Agentが動いていることを確認できたらOKです。
そして、Integrations画面で、k6をインストールしておきます。
k6負荷試験サーバの準備
k6を実行するためのサーバを準備します。同一のクラスタのネームスペースに配置し、Service経由で負荷をかけていきます。そして、負荷試験結果をDatadog Agent経由でDatadogに送信し、可視化します。
負荷試験クライアントはKubernetesのJobで実行します。
環境設定
k6はデフォルトではSSE(Server-Sent Events)に対応していないため、拡張機能を使用する必要があります。今回は xk6-sse という拡張機能を導入します。拡張機能を追加するため、xk6をベースイメージとして使用しつつ、xk6-sseのバイナリをビルドします。
FROM grafana/xk6:0.13.0 AS builder
WORKDIR /xk6
RUN xk6 build v0.51.0 \
--with github.com/phymbert/xk6-sse@v0.1.2 \
--with github.com/grafana/xk6-dashboard@v0.7.5 \
--output /xk6/myk6
FROM alpine:3.19
WORKDIR /app
COPY /xk6/myk6 /usr/bin/k6
COPY ./loadtest /app
USER 1001
CMD ["/bin/sh"]
コマンドに関しては、Kubernetes Jobで上書きします。
シナリオ作成
次にシナリオを作成します。xk6-sseの例を参考に作成していきます。
下記の項目はカスタムメトリクスになるため、意味に合わせてカスタムメトリクスを定義していきます。
- TTFT(Time To First Token)
- TPoT(Time Per Output Token)
- 待ち時間
- スループット
import { check, sleep } from 'k6';
import { Counter, Trend } from 'k6/metrics';
import sse from 'k6/x/sse';
let TTFT = new Trend('TTFT'); //ms
let TPoT = new Trend('TPoT'); //ms
let WaitingTime = new Trend('WaitingTime'); //ms
let Throughput = new Trend('Throughput'); //tokens/sec
let TotalTokens = new Counter('TotalTokens');
export let options = {
stages: [
{ duration: '1m', target: 1 },
{ duration: '1m', target: 2 },
{ duration: '1m', target: 0 },
],
};
export default function () {
let requestStart = Date.now();
const url = 'http://chat-app-cpu-service:8000/generate/';
// const url = 'http://k6-datadog:8000/generate/';
const params = {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify({
query: "日本で最も高い山は何ですか?",
}),
};
let tokenCount = 0;
const response = sse.open(url, params, function (client) {
let ttftCaptured = false;
let tokenTimes = [];
let firstTokenTime = 0;
client.on('event', function (event) {
console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`);
if (parseInt(event.id) === 2) {
client.close()
}
let chunkTime = Date.now();
let data = event.data.trim();
if (data === '') {
return;
}
if (data === '[DONE]') {
client.close();
return;
}
if (!ttftCaptured) {
firstTokenTime = chunkTime;
let ttft = firstTokenTime - requestStart;
TTFT.add(ttft);
ttftCaptured = true;
} else {
let lastTokenTime = tokenTimes[tokenTimes.length - 1] || firstTokenTime;
let tpot = chunkTime - lastTokenTime;
TPoT.add(tpot);
}
tokenTimes.push(chunkTime);
tokenCount += 1;
TotalTokens.add(1);
});
client.on('error', function (e) {
console.error('An unexpected error occurred: ', e.error());
client.close();
});
});
let responseEnd = Date.now();
let waitingTime = responseEnd - requestStart;
WaitingTime.add(waitingTime);
if (waitingTime > 0 && tokenCount > 0) {
let throughput = (tokenCount * 1000) / waitingTime;
Throughput.add(throughput);
}
check(response, {
'Status is 200': (r) => r && r.status === 200,
'Received tokens': () => tokenCount > 0,
});
sleep(1);
}
インフラ設定
まず、インフラリソースを構築していきます。今回は負荷試験を行うサーバーをJobで定義し、負荷試験の実行後にPodが削除されるようにします。
apiVersion: batch/v1
kind: Job
metadata:
name: k6-load-testing
spec:
ttlSecondsAfterFinished: 300
template:
spec:
containers:
- name: k6-loadtest
image: asia-northeast1-docker.pkg.dev/<your-project-id>/chat-app/loadtest:latest
command:
[
"k6",
"run",
"--out",
"statsd",
"/app/main.js",
]
env:
- name: K6_STATSD_ENABLE_TAGS
value: "true"
- name: K6_STATSD_ADDR
value: "datadog-agent.default.svc.cluster.local:8125"
restartPolicy: Never
k6での負荷試験結果をDatadogに送るには、Datadog Agentが動作していれば簡単です。上記の通り、オプションを付加してリクエストを送信していきます。
負荷試験
実際に負荷をかけていきます。
$ kubectl apply -f ./k8s/overlays/job.yaml
実行後、Datadogのダッシュボードに行くと自動でk6用のデフォルトダッシュボードが作成されています。ダッシュボードを確認するとk6のデフォルトメトリクスが表示されています。
今回追加したカスタムメトリクスのグラフも追加していきます。
このようにカスタムメトリクスも表示できるようになりました。TTFTに平均1分ほどかかっているようで、現在のサーバー構成では秒間1リクエストが来ると対応しきれないことがわかります。
※今回はトークンのカウントは概算になっています。
まとめ
今回、k6とDatadogを用いて、SSE(Server-Sent Events)エンドポイントに対する簡易的な負荷試験環境を構築することができました。これにより、LLMOps特有のTTFT(Time To First Token)やTPoT(Time Per Output Token)といったカスタムメトリクスを定義することで、サービスの応答性能やスループットに関するメトリクスを取得・可視化でき、ボトルネックの特定やリソースの最適化に役立てることが期待できます。
この負荷試験のトリガーをCIに組み込むことで、コードの変更やデプロイ時に自動的に性能評価を行えるようになります。また、Slackなどのチャットツールと連携し、コマンド一つで負荷試験を実行できるようにすれば、ChatOpsができるようになります。
参考
Discussion