💡

k6でLLMのSSEエンドポイントに負荷試験を実施し、Datadogで可視化する

2024/12/18に公開

この記事はMLOps Advent Calendar 2024への投稿記事です。

自前で建てたLLMを載せたサーバの負荷試験をk6で行い、結果をDatadogで確認できるようにします。

本文中のコード: https://github.com/hosimesi/k6_datadog

記事の趣旨

ChatGPTをはじめとする生成AIを使ったサービスが増えてきています。一方で、LLM(Large Language Model)からのレスポンスは一般的に遅く、ユーザー体験に影響します。これを緩和する方法の一つとして、LLMの推論結果を一括で表示するのではなく、Server-Sent Events(SSE)を使ってストリーミングで結果を返す方法があります。しかし、このシステムの性能を定量的に測るためには負荷試験が必要で、負荷試験ツールによってはSSEに対応していないものもあります。
そこで本記事では、SSEで結果が返ってくるLLMシステムの負荷試験を、k6とDatadogを使って行います。

本記事のゴール

SSEでレスポンスを返すエンドポイントに対して、LLMを載せたサーバをk6で負荷試験し、Datadog上で結果を確認する、簡易的な負荷試験環境とレポーティングシステムを作ります。
LLMOpsの第一段階として、通常のサーバのメトリクスに加えて、LLM特有の以下のメトリクスを計測することを目指します。

  • TTFT(Time To First Token)
    • リクエストを投げてから1トークン目が返ってくるまでの時間
  • TPoT(Time Per Output Token)
    • 1トークン生成するのにかかる時間
  • 待ち時間
    • リクエストを投げてから完全に生成が終わるまでの時間(TTFT + TPoT × トークン数)
  • スループット
    • 秒間のトークン生成数

事前知識

Datadogとは

Datadogとは、モダンなモニタリングとセキュリティを提供するSaaS型のモニタリングツールです。単純な監視だけでなく、トレーシングやプロファイラーも提供しています。最近では、LLM Observabilityの機能も拡充しており、この機能に関してはまた別の機会に紹介したいと思います。

k6とは

k6はGrafana Labsが提供しているオープンソースのGo製の負荷試験ツールです。JavaScriptを使ってテストシナリオを作成することで、柔軟かつ軽量な負荷試験が実行できます。実験結果を確認するための可視化ツールとして、xk6-dashboardといった自前の可視化ツールがあり、他のDatadogやGrafanaといった可視化ツールとのインテグレーションも充実しています

ディレクトリ構成

本システムのディレクトリ構成は以下の通りです。

.
├── Makefile
├── README.md
├── compose.yaml
├── docker
│   ├── Dockerfile
│   └── Dockerfile.loadtest
├── infra
│   ├── ar.tf
│   ├── gke.tf
│   ├── main.tf
│   ├── sa.tf
│   └── variable.tf
├── k8s
│   ├── base
│   │   ├── deployment.yaml
│   │   ├── kustomization.yaml
│   │   ├── service.yaml
│   │   └── serviceaccount.yaml
│   └── overlays
│       ├── datadog-agent.yaml
│       ├── deployment.yaml
│       ├── job.yaml
│       ├── kustomization.yaml
│       └── serviceaccount.yaml
├── loadtest
│   └── main.js
├── pyproject.toml
├── src
│   └── main.py
└── uv.lock

GKEクラスタの作成

まず、すべてのインフラの基盤になるGKEクラスタを設定していきます。今回は簡単のため、Control Planeへのアクセスを許可します。実際にプロダクトを運用する時は、限定公開クラスタを使うといいと思います。
インスタンスとして、ある程度のサイズのモデルを載せるために、CPUのみのノードプールであるe2-standard-4を用意します。

gke.tf
resource "google_container_cluster" "chat_app" {
  project                  = var.project
  name                     = "chat-app-cluster"
  location                 = "asia-northeast1-a"
  remove_default_node_pool = true
  initial_node_count       = 1
}

resource "google_container_node_pool" "chat_app_cpu_node_pool" {
  name       = "chat-app-cpu-node-pool"
  location   = "asia-northeast1-a"
  cluster    = google_container_cluster.chat_app.name
  node_count = 1

  node_config {
    preemptible  = true
    machine_type = "e2-standard-4"

    service_account = google_service_account.chat_app_node_sa.email
    oauth_scopes = [
      "https://www.googleapis.com/auth/cloud-platform"
    ]
  }

  management {
    auto_repair  = "true"
    auto_upgrade = "true"
  }
}

他にもService Accountのリソース等もありますが、上記の準備ができればリソースを作成していきます。適用後にGKEクラスタが作成されていれば成功です。

$terraform plan
$terraform apply

作成後、GKEのコンテキストを指定して、ローカルからkubectlコマンドが使えるようにしておきます。

Google Cloudのその他のリソースの作成

次に、GKE外のリソースを作成していきます。作成するリソースは以下になります。

ar.tf
resource "google_artifact_registry_repository" "chat_app" {
  location      = var.region
  repository_id = "chat_app"
  description   = "chat_app用のArtifact Registry"
  format        = "DOCKER"
}
sa.tf
resource "google_service_account" "chat_app_node_sa" {
  account_id   = "chat-app-node-sa"
  display_name = "chat-app-node-sa"
}

resource "google_service_account" "chat_app_sa" {
  account_id   = "chat-app-sa"
  display_name = "chat-app-sa"
}

resource "google_service_account_iam_binding" "app" {
  service_account_id = google_service_account.chat_app_sa.name
  role               = "roles/iam.workloadIdentityUser"
  members = [
    "serviceAccount:${var.project}.svc.id.goog[default/chat-app-serviceaccount]"
  ]
}

resource "google_project_iam_member" "artifact_registry_iam" {
  project = var.project
  role    = "roles/artifactregistry.reader"
  member  = "serviceAccount:${google_service_account.chat_app_node_sa.email}"
}

resource "google_project_iam_member" "service_account_token_creator_iam" {
  project = var.project
  role    = "roles/iam.serviceAccountTokenCreator"
  member  = "serviceAccount:${google_service_account.chat_app_sa.email}"
}

今回は簡略化のために作成していませんが、本運用ではSecret Managerが必要になります。Hugging FaceのトークンやDatadogのAPIキー等を管理するために使用し、External Secret経由でGKE側から使用できるようにします。Artifact Registryはイメージの管理に使用します。Service Accountはアプリケーションの権限管理のために使用します。以前はGoogleサービスアカウント(GSA)とKubernetesサービスアカウント(KSA)を紐づけて連携していましたが、現在は不要になりました。
こちらも適用していきます。

$terraform plan
$terraform apply

アプリケーションの準備

LLMが推論して返す簡易的なチャットサーバを作成します。フレームワークにFastAPIを使用し、モデルはgemma-2-2b-jpn-itを使用しました。
SSEでのストリーミングレスポンスにしたいため、FastAPIのStreamingResponseを使用し、起動時にモデルをメモリに展開しておきます。ストリーミングレスポンスなので、LLMの推論はpipelineで実行するのではなく、TextIteratorStreamerを使用します。

main.py
import logging
import os
from contextlib import asynccontextmanager

import torch
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class QueryRequest(BaseModel):
    query: str


def get_model_and_tokenizer():
    try:
        device = "mps" if torch.backends.mps.is_available() else "cpu"
        logger.info(f"デバイス: {device}. モデルのロードを開始します...")
        model = AutoModelForCausalLM.from_pretrained(
            "google/gemma-2-2b-jpn-it",
            torch_dtype=torch.bfloat16,
            token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
        )
        tokenizer = AutoTokenizer.from_pretrained(
            "google/gemma-2-2b-jpn-it",
            token=os.environ.get("HUGGINGFACE_HUB_TOKEN"),
        )
        model.to(device)
        logger.info("モデルのロードに成功しました")
        return model, tokenizer, device
    except Exception as e:
        raise RuntimeError(f"モデルのロードに失敗しました: {e}")


@asynccontextmanager
async def lifespan(app: FastAPI):
    # モデルのロード
    app.state.model, app.state.tokenizer, app.state.device = get_model_and_tokenizer()
    yield


app = FastAPI(lifespan=lifespan)

@app.post("/generate/")
async def generate(query: QueryRequest):
    logger.info(f"リクエスト: {query}")
    try:
        inputs = app.state.tokenizer(query.query, return_tensors="pt").to(
            app.state.device
        )
        streamer = TextIteratorStreamer(app.state.tokenizer, skip_prompt=True)
        generation_kwargs = dict(
            inputs=inputs.input_ids,
            max_new_tokens=256,
            streamer=streamer,
            pad_token_id=app.state.tokenizer.eos_token_id,
        )

        app.state.model.generate(**generation_kwargs)

        async def event_generator():
            try:
                for new_text in streamer:
                    logger.info(f"Response: {new_text}")
                    yield f"data: {new_text}\n\n"
            except Exception as e:
                raise HTTPException(
                    status_code=500,
                    detail=f"ストリームの生成中にエラーが発生しました: {e}",
                )

        return StreamingResponse(event_generator(), media_type="text/event-stream")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"応答の生成に失敗しました: {e}")


@app.get("/")
async def root():
    return {"message": "Hello World"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

そして、ビルドしてイメージをArtifact Registryにプッシュしておきます。
次に、アプリケーションをデプロイするためのインフラの準備をします。Kustomizeを使って記述し、適用します。今回は内部から負荷をかけるため、Ingressは用意しません。

base/kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

resources:
  - deployment.yaml
  - serviceaccount.yaml
  - service.yaml
base/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: chat-app-cpu-service
spec:
  type: ClusterIP
  selector:
    app: chat-app-cpu
  ports:
    - port: 8000
      targetPort: chat-app
      protocol: "TCP"
base/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chat-app-cpu
spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: chat-app-cpu
  template:
    metadata:
      labels:
        app: chat-app-cpu
    spec:
      serviceAccountName: chat-app-serviceaccount
      terminationGracePeriodSeconds: 60
      nodeSelector:
        cloud.google.com/gke-nodepool: chat-app-cpu-node-pool
      containers:
        - name: chat-app
          env:
            - name: HUGGINGFACE_HUB_TOKEN
              valueFrom:
                secretKeyRef:
                  name: hugging-face-secret
                  key: token
          ports:
            - containerPort: 8000
              name: chat-app
          startupProbe:
            timeoutSeconds: 5
            successThreshold: 1
            failureThreshold: 5
            initialDelaySeconds: 240
            periodSeconds: 10
            httpGet:
              port: 8000
              scheme: HTTP
              path: /
          livenessProbe:
            timeoutSeconds: 5
            successThreshold: 1
            failureThreshold: 5
            initialDelaySeconds: 10
            periodSeconds: 10
            httpGet:
              port: 8000
              scheme: HTTP
              path: /
          readinessProbe:
            timeoutSeconds: 2
            successThreshold: 2
            failureThreshold: 2
            initialDelaySeconds: 5
            periodSeconds: 5
            httpGet:
              port: 8000
              scheme: HTTP
              path: /
          lifecycle:
            preStop:
              exec:
                command:
                  - /bin/sh
                  - -c
                  - sleep 50
base/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: chat-app-serviceaccount
  namespace: default
overlays/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: chat-app-cpu
spec:
  template:
    spec:
      containers:
        - name: chat-app
          image: asia-northeast1-docker.pkg.dev/<project-id>/chat-app/chat-app:latest
          command:
            [
              ".venv/bin/python",
              "-m",
              "uvicorn",
              "src.main:app",
              "--host",
              "0.0.0.0",
              "--port",
              "8000",
            ]
          resources:
            requests:
              cpu: 1000m
              memory: 8000Mi
            limits:
              cpu: 1400m
              memory: 10000Mi
overlays/kustomization.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

labels:
  - includeSelectors: true
resources:
  - ../../base

patches:
  - path: deployment.yaml
  - path: serviceaccount.yaml
overlays/serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: chat-app-serviceaccount
  annotations:
    iam.gke.io/gcp-service-account: "chat-app@<project-id>.iam.gserviceaccount.com"

ここで、Hugging Faceのトークンをシークレットとして登録しておきます。本番環境で使用する時は、External Secret経由で与えてください。

$ kubectl create secret generic hugging-face-secret --from-literal token=<your-token>

Datadog Agentの準備

基本的にはこちらの記事と同じ手順で進めていきます。
Datadog Agentをdatadog-operatorを使ってインストールします。
基本的に、Datadogの設定ガイドに従って進めていきます。

$ helm repo add datadog https://helm.datadoghq.com
$ helm install datadog-operator datadog/datadog-operator
$ kubectl create secret generic datadog-secret --from-literal api-key=<your-api-key>
datadog-agent.yaml
apiVersion: datadoghq.com/v2alpha1
kind: DatadogAgent
metadata:
  name: datadog
spec:
  global:
    clusterName: chat-app-cluster
    site: ap1.datadoghq.com
    credentials:
      apiSecret:
        secretName: datadog-secret
        keyName: api-key
  features:
    dogstatsd:
      hostPortConfig:
        enabled: true

ここで重要なのは、statsdを使用できるように、dogstatsdの設定を有効にする必要があることです。その後、適用していきます。

$ kubectl apply -f ./k8s/overlays/datadog-agent.yaml

Datadog Agentが動いていることを確認できたらOKです。

そして、Integrations画面で、k6をインストールしておきます。

k6負荷試験サーバの準備

k6を実行するためのサーバを準備します。同一のクラスタのネームスペースに配置し、Service経由で負荷をかけていきます。そして、負荷試験結果をDatadog Agent経由でDatadogに送信し、可視化します。

負荷試験クライアントはKubernetesのJobで実行します。

環境設定

k6はデフォルトではSSE(Server-Sent Events)に対応していないため、拡張機能を使用する必要があります。今回は xk6-sse という拡張機能を導入します。拡張機能を追加するため、xk6をベースイメージとして使用しつつ、xk6-sseのバイナリをビルドします。

Dockerfile.loadtest
FROM grafana/xk6:0.13.0 AS builder

WORKDIR /xk6

RUN xk6 build v0.51.0 \
    --with github.com/phymbert/xk6-sse@v0.1.2 \
    --with github.com/grafana/xk6-dashboard@v0.7.5 \
    --output /xk6/myk6

FROM alpine:3.19
WORKDIR /app
COPY --from=builder /xk6/myk6 /usr/bin/k6
COPY ./loadtest /app

USER 1001
CMD ["/bin/sh"]

コマンドに関しては、Kubernetes Jobで上書きします。

シナリオ作成

次にシナリオを作成します。xk6-sseのを参考に作成していきます。
下記の項目はカスタムメトリクスになるため、意味に合わせてカスタムメトリクスを定義していきます。

  • TTFT(Time To First Token)
  • TPoT(Time Per Output Token)
  • 待ち時間
  • スループット
main.js
import { check, sleep } from 'k6';
import { Counter, Trend } from 'k6/metrics';
import sse from 'k6/x/sse';

let TTFT = new Trend('TTFT');  //ms
let TPoT = new Trend('TPoT');  //ms
let WaitingTime = new Trend('WaitingTime');  //ms
let Throughput = new Trend('Throughput');  //tokens/sec
let TotalTokens = new Counter('TotalTokens');

export let options = {
    stages: [
        { duration: '1m', target: 1 },
        { duration: '1m', target: 2 },
        { duration: '1m', target: 0 },
    ],
};

export default function () {
    let requestStart = Date.now();

    const url = 'http://chat-app-cpu-service:8000/generate/';
    // const url = 'http://k6-datadog:8000/generate/';

    const params = {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Accept': 'text/event-stream',
        },
        body: JSON.stringify({
            query: "日本で最も高い山は何ですか?",
        }),
    };

    let tokenCount = 0;
    const response = sse.open(url, params, function (client) {
        let ttftCaptured = false;
        let tokenTimes = [];
        let firstTokenTime = 0;

        client.on('event', function (event) {
            console.log(`event id=${event.id}, name=${event.name}, data=${event.data}`);
            if (parseInt(event.id) === 2) {
                client.close()
            }
            let chunkTime = Date.now();
            let data = event.data.trim();

            if (data === '') {
                return;
            }
            if (data === '[DONE]') {
                client.close();
                return;
            }

            if (!ttftCaptured) {
                firstTokenTime = chunkTime;
                let ttft = firstTokenTime - requestStart;
                TTFT.add(ttft);
                ttftCaptured = true;
            } else {
                let lastTokenTime = tokenTimes[tokenTimes.length - 1] || firstTokenTime;
                let tpot = chunkTime - lastTokenTime;
                TPoT.add(tpot);
            }
            tokenTimes.push(chunkTime);
            tokenCount += 1;
            TotalTokens.add(1);
        });

        client.on('error', function (e) {
            console.error('An unexpected error occurred: ', e.error());
            client.close();
        });
    });

    let responseEnd = Date.now();
    let waitingTime = responseEnd - requestStart;
    WaitingTime.add(waitingTime);

    if (waitingTime > 0 && tokenCount > 0) {
        let throughput = (tokenCount * 1000) / waitingTime;
        Throughput.add(throughput);
    }

    check(response, {
        'Status is 200': (r) => r && r.status === 200,
        'Received tokens': () => tokenCount > 0,
    });

    sleep(1);
}

インフラ設定

まず、インフラリソースを構築していきます。今回は負荷試験を行うサーバーをJobで定義し、負荷試験の実行後にPodが削除されるようにします。

job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: k6-load-testing
spec:
  ttlSecondsAfterFinished: 300
  template:
    spec:
      containers:
        - name: k6-loadtest
          image: asia-northeast1-docker.pkg.dev/<your-project-id>/chat-app/loadtest:latest
          command:
            [
              "k6",
              "run",
              "--out",
              "statsd",
              "/app/main.js",
            ]
          env:
            - name: K6_STATSD_ENABLE_TAGS
              value: "true"
            - name: K6_STATSD_ADDR
              value: "datadog-agent.default.svc.cluster.local:8125"
      restartPolicy: Never

k6での負荷試験結果をDatadogに送るには、Datadog Agentが動作していれば簡単です。上記の通り、オプションを付加してリクエストを送信していきます。

負荷試験

実際に負荷をかけていきます。

$ kubectl apply -f ./k8s/overlays/job.yaml

実行後、Datadogのダッシュボードに行くと自動でk6用のデフォルトダッシュボードが作成されています。ダッシュボードを確認するとk6のデフォルトメトリクスが表示されています。

今回追加したカスタムメトリクスのグラフも追加していきます。

このようにカスタムメトリクスも表示できるようになりました。TTFTに平均1分ほどかかっているようで、現在のサーバー構成では秒間1リクエストが来ると対応しきれないことがわかります。
※今回はトークンのカウントは概算になっています。

まとめ

今回、k6とDatadogを用いて、SSE(Server-Sent Events)エンドポイントに対する簡易的な負荷試験環境を構築することができました。これにより、LLMOps特有のTTFT(Time To First Token)やTPoT(Time Per Output Token)といったカスタムメトリクスを定義することで、サービスの応答性能やスループットに関するメトリクスを取得・可視化でき、ボトルネックの特定やリソースの最適化に役立てることが期待できます。
この負荷試験のトリガーをCIに組み込むことで、コードの変更やデプロイ時に自動的に性能評価を行えるようになります。また、Slackなどのチャットツールと連携し、コマンド一つで負荷試験を実行できるようにすれば、ChatOpsができるようになります。

参考

https://www.databricks.com/jp/blog/llm-inference-performance-engineering-best-practices

Discussion