📖

Langfuseのrun_experiment機能でAgent評価コードを80%削減した話

に公開

はじめに

こんにちはあんどおです。
最近Agentが話題ですね。僕も最近Agent開発に没頭しています。稼働の90%以上がAgent開発という日も珍しくないです。
今回は個人的にヘビーユースしているLangfuseの評価機能についてまとめました。

サマリ

この記事のまとめです。

langfuseのrun_experiment機能を導入して
- 評価部分のコード量が約80%(約215行→約40行)削減できた
- 並行実行機能を利用し、評価時間を短縮できた
- 評価プロセスへの評価器の追加・削除が容易になり実験管理ミスも減った

Agent評価の課題

Langfuseではトレース機能の他にもAgent評価機能や実験の管理機能が利用できます。
この実験管理機能と評価機能が提供されているだけでもとても便利なのですが、以下のような課題がありました。

課題1: コードの肥大化

  • Agentの複雑化に合わせて評価も複雑になると、複数の評価器が必要になります。その際に全ての処理を記載する旧来の方法で実装すると、コード量の増え方が大きく、管理が難しくなるという課題がありました

課題2: 長引く評価時間

  • 課題1にもつながりますが評価ロジックが複雑になったり、評価器が増えればそれだけ処理時間も増えます。評価がいつまで経っても終わらず、改善のPDCAを回すスピードにも影響してしまうという課題がありました

上記を踏まえ、Agentの複雑化に合わせて評価機構もアップデートする必要があると感じていました。

run_experiment機能が追加された

何かいい解決策はないかと調べていたところ、2025年9月17日のChange logで報告されているrun_experiment機能を見つけました。最初は正直あまり期待してませんでしたが、想像以上に便利でした。

特徴

まず特徴的な機能は以下です。

① 並行実行(Concurrent execution)
 └─ max_concurrencyで並行実行数を制御可能
② 自動トレーシング(Automatic tracing)
 └─ item.run()が不要に
③ 柔軟な評価(Flexible evaluation)
 └─ 複数evaluatorを配列で渡すだけ
④ エラー分離(Error isolation)
 └─ 1つの失敗が全体を止めない
⑤ データセット統合(Dataset integration)
 └─ UI上での実験比較が自動的に可能

この中でも特に便利だと思ったのが① 並行実行、③ 柔軟な評価、⑤ データセット統合です。

実装コードのBefore, After

機能が便利なことはもちろんですが、その結果として実装量を減らせることも大きなメリットだと感じました。実装する前後でかなりコード量が減り、変更も格段に楽になりました。

どれだけコード量が減ったかのbefore, afterです。
例として評価で利用するevaluatorsは2つとしています。

Beforeのコード

"""
Before実装: Langfuse Datasets + 手動ループによる評価パイプライン
※実際は214行あったが、記事用に割愛して簡略化
"""

import asyncio
import json
from typing import Any
from langfuse import get_client

# 設定
AGENT_NAME = "conversation_evaluator_agent"
DATASET_NAME = "conversation_quality_dataset"

def ensure_dict(data: Any) -> dict[str, Any]:
    """dict/JSON文字列を辞書へ変換"""
    if isinstance(data, dict):
        return data
    if isinstance(data, str):
        return json.loads(data)
    return {}

def extract_evaluation_values(dict_evaluation_values: dict[str, Any]) -> dict[str, bool]:
    """評価項目からboolean値を抽出"""
    eval_items = [
        "quality_check_1",
        "quality_check_2", 
        "quality_check_3",
        "quality_check_4",
        "quality_check_5",
        "quality_check_6",
        "quality_check_7",
        "quality_check_8",
    ]
    
    result = {}
    for item in eval_items:
        item_dict = dict_evaluation_values.get(item)
        if item_dict and isinstance(item_dict, dict):
            value = item_dict.get("value")
            if value is not None:
                result[item] = value
    return result

async def process_with_agent(input_data: str, agent) -> str:
    """エージェントを使って入力データを処理する"""
    from agent_runner import AgentRunner
    
    runner = AgentRunner(agent, AGENT_NAME)
    await runner.initialize_session()
    return await runner.send_message(str(input_data))

def generate_unique_run_name(dataset_name: str, langfuse_client, logger) -> str:
    """既存のデータセットランを取得し、重複しない'eval_X'形式の名前を生成"""
    # データセットの全ランを取得
    dataset_runs = langfuse_client.api.datasets.get_runs(dataset_name)
    
    # 既存のrun_nameを取得し、eval_X形式のものから番号を抽出
    existing_numbers = set()
    for run in dataset_runs.data:
        run_name = run.name
        # eval_X形式かチェック
        if run_name.startswith("eval_") and run_name[5:].isdigit():
            number = int(run_name[5:])
            existing_numbers.add(number)
    
    # 1から順番に使われていない最小の番号を見つける
    next_number = 1
    while next_number in existing_numbers:
        next_number += 1
    
    new_run_name = f"eval_{next_number}"
    logger.info(f"生成されたデータセットラン名: {new_run_name}")
    return new_run_name

def calculate_match_rate(expected_output: dict, actual_output: dict) -> dict:
    """期待値と実際の出力の一致率を計算"""
    expected_values = extract_evaluation_values(expected_output)
    actual_values = extract_evaluation_values(actual_output)
    
    matched_count = sum(
        1 for key in expected_values 
        if expected_values.get(key) == actual_values.get(key)
    )
    total_count = len(expected_values)
    match_rate = matched_count / total_count if total_count > 0 else 0.0
    
    return {
        "match_rate": match_rate,
        "matched_count": matched_count,
        "total_count": total_count
    }

async def main():
    from logger import get_logger, setup_logging
    from evaluators import QualityJudgeEvaluator
    from agents import conversation_agent
    
    setup_logging()
    logger = get_logger(__name__)
    
    # Langfuseクライアント取得
    langfuse = get_client()
    
    # データセットを取得
    dataset = langfuse.get_dataset(DATASET_NAME)
    
    # 一意なデータセットラン名を生成
    run_name = generate_unique_run_name(dataset.name, langfuse, logger)
    
    # LLM-as-a-Judge評価器を初期化
    llm_judge_evaluator = QualityJudgeEvaluator()
    logger.info("評価器の初期化完了")
    
    # データセットの各アイテムを処理
    for item in dataset.items:
        logger.info("評価開始")
        
        # langfuse traceの設定(手動リンク)
        with item.run(
            run_name=run_name,
            run_description="Evaluating conversation quality",
            run_metadata={"agent_name": AGENT_NAME},
        ) as root_span:
            input_data = item.input
            
            # 評価器1: LLM-as-a-Judge評価を手動実行
            try:
                dict_input_data = ensure_dict(input_data)
                conversation_history = dict_input_data.get("conversation_history", [])
                
                logger.info(f"LLM評価開始 (会話ターン数: {len(conversation_history)})")
                
                # 複数の品質観点で評価
                quality_results = llm_judge_evaluator.evaluate_quality_dimensions(
                    conversation_history
                )
                
                # 各品質観点のスコアを手動で記録
                root_span.score_trace(
                    name="llm_judge_dimension_1",
                    value=quality_results["dimension_1"]["score"],
                )
                root_span.score_trace(
                    name="llm_judge_dimension_2", 
                    value=quality_results["dimension_2"]["score"],
                )
                root_span.score_trace(
                    name="llm_judge_dimension_3",
                    value=quality_results["dimension_3"]["score"],
                )
                
            except Exception as e:
                logger.error(f"LLM評価中にエラー発生: {e}")
            
            # 評価器2: MatchRate評価
            # エージェントを実行して出力を取得
            actual_output = await process_with_agent(input_data, conversation_agent)
            
            # traceのinput/outputを手動更新
            root_span.update_trace(input=input_data, output=actual_output)
            
            # 一致率を計算
            result = calculate_match_rate(
                expected_output=item.expected_output,
                actual_output=actual_output,
            )
            
            logger.info(f"一致率: {result['match_rate']:.2%}")
            
            # スコアを手動で記録
            root_span.score_trace(
                name="match_rate",
                value=result["match_rate"],
                comment=f"一致率 {result['match_rate']:.2%}",
            )
            root_span.score_trace(
                name="matched_count",
                value=result["matched_count"],
                comment=f"{result['total_count']}項目中{result['matched_count']}項目が一致",
            )
            
            logger.info(f"Trace ID: {root_span.trace_id}")
    
    # データをlangfuseに送信
    langfuse.flush()
    logger.info("評価完了")

if __name__ == "__main__":
    asyncio.run(main())

Afterのコード

"""
After実装: run_experimentによる評価パイプライン
※214行 → 40行(セットアップ含む)に削減
"""

import asyncio
import os
from langfuse import get_client

# 設定
DATASET_NAME = "conversation_quality_dataset"

def setup_environment():
    """環境設定"""
    os.environ["GOOGLE_CLOUD_PROJECT"] = "my-ai-project"
    os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"

async def main():
    from logger import get_logger, setup_logging
    from evaluators import MatchRateEvaluator, QualityJudgeEvaluator
    
    setup_logging()
    logger = get_logger(__name__)
    
    # 環境設定
    setup_environment()
    
    # Langfuseクライアント取得
    langfuse = get_client()
    
    # データセットを取得
    dataset = langfuse.get_dataset(DATASET_NAME)
    
    # 評価器を初期化
    match_rate_evaluator = MatchRateEvaluator()
    llm_judge_evaluator = QualityJudgeEvaluator()
    logger.info("評価器の初期化完了")
    
    # 評価タスク定義
    async def evaluation_task(*, item, **kwargs):
        return await match_rate_evaluator.eval_task(item=item, **kwargs)
    
    # 評価を実行
    logger.info("評価開始")
    dataset.run_experiment(
        name="conversation_quality_eval",
        task=evaluation_task,
        evaluators=[match_rate_evaluator, llm_judge_evaluator],
    )
    
    # データをlangfuseに送信
    langfuse.flush()
    logger.info("評価完了")

if __name__ == "__main__":
    asyncio.run(main())

補足1: 評価器(evaluator)の種類と役割

評価器 評価タイプ 評価内容 スコア数
MatchRateEvaluator ルールベース 品質チェック項目の一致率 2個(match_rate, matched_count)
QualityJudgeEvaluator LLM-as-a-Judge 複数の品質観点での評価 3個(dimension_1/2/3)

補足2: Before/After での評価器(evaluator)の扱い

項目 Before After
評価器の初期化 各評価器を個別に初期化 同じ(変更なし)
評価の実行 手動で各評価器を呼び出し evaluators配列に渡すだけ
スコアの記録 root_span.score_trace()を各スコアごとに呼び出し 自動記録
エラーハンドリング try-catchを手動で配置 自動分離

結果的に80%くらいのコードを削減

約210行 → 約40行に減ったので大体80%くらいコード量が減りました。(Beforeのコードではコード内に含まれていた評価器自体のコードを別ファイルに切り出した部分は大目にみてください)

開発体験の変化

コード量の削減以外にも以下のような効果を感じました。

  • 変化1: 評価器(evaluator)の追加・削除が容易に
    • Classとして定義した評価器をevaluatorsプロパティに設定するだけなので評価器の追加削除がとても簡単にできるようになりました。
    • 同時に評価機構と評価ロジック(evaluator)を完全に分離できるのでロジックを安全に変更しやすくなったことも大きいと思いました。
  • 変化2: 実験管理がミスりづらくなった
    • langfuseのDatasets機能では実験管理(runの管理)の際にrun名が重複していると正しく結果が記録されなかったと記憶しているのですが、run_experimentではrun名が重複していても任意のrun名 - 2025-XX-XXTXX:XX:XX.XXXXXZのように日時を自動生成してrun名が重複しないようにしてくれます。これはうっかりミス防止にとても効果的です。
  • 変化3: 実験データが大きくなってもそんなに実験の待ち時間が増えない
    • 複数の実験データで評価を並行実行できるので実験データの件数が増えてもシリアルに処理していた頃より評価時間への影響が軽減されたと感じます

まとめ

  • run_experiment機能は想像以上に便利な機能です!

今までの手動実行では難易度が高かった機能が簡単に利用できるようになった上、その副次効果としてコード量の削減にも絶大な効果をもたらしてくれます。ぜひ一回試してみてください。

次回は評価器まわりの作成について書きたいなと思っています!

補足

何か間違った理解をしている部分があれば教えていただけると大変嬉しいです。それ以外にももっと便利な使い方があるよなどがあればぜひコメントください。泣いて喜びます

Discussion