「DSPy 3.0」を改めて試す ④Tutorials: Build AI Programs with DSPy
ちょっと順番が前後しちゃってるけども、前回の続き。
引き続き、DSPyをアプリケーションフレームワークとして使う、という観点で見ていく。
DSPy公式チュートリアルの「Build AI Programs with DSPy」にはアプリケーション作成にフォーカスしたチュートリアルが12個ほど用意されている。
- 会話履歴の管理
- DSPy を使った AI エージェントの構築
- DSPyモジュールをカスタマイズしてAIアプリケーションを構築する
- 検索拡張生成(RAG)
- エージェントとしてのRAGシステム構築
- エンティティ抽出
- 分類タスク
- マルチホップ RAG
- プライバシー保護を考慮した委任処理
- 思考プログラム
- 画像生成プロンプトの反復処理
- 音声
全部は多分やれないと思うけど、気になったものをピックアップしてやっていくつもり。
挙動を把握するにあたり、トレーシングを有効にしておいたほうが色々捗るだろうということで。まずは以下でトレーシングなどを試してから、進めることにする。
会話履歴の管理
DSPyではdspy.Moduleで自動的に会話履歴を管理するような機能はないが、会話履歴管理を支援するユーティリティとしてdspy.Historyがある。
事前準備
Colaboratoryで試す
!pip install -U dspy
!pip show dspy
Name: dspy
Version: 3.0.3
Summary: DSPy
Home-page: https://github.com/stanfordnlp/dspy
Author: 
Author-email: Omar Khattab <okhattab@stanford.edu>
License: 
Location: /usr/local/lib/python3.12/dist-packages
Requires: anyio, asyncer, backoff, cachetools, cloudpickle, diskcache, gepa, joblib, json-repair, litellm, magicattr, numpy, openai, optuna, orjson, pydantic, regex, requests, rich, tenacity, tqdm, xxhash
Required-by: 
from google.colab import userdata
import os
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
 dspy.Historyを使った会話履歴管理
dspy.History クラスは、messages: list[dict[str, Any]]で会話履歴を保存するための箱となる。で、
- 入力フィールドとしてdspy.History型の入力フィールドをシグネチャに指定する
- 会話履歴インスタンスを作成して、新しい会話履歴のターンを追加する。追加する各エントリは、関連するすべての入出力フィールドを含める必要がある。
となる。
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class QA(dspy.Signature):
    question: str = dspy.InputField()
    history: dspy.History = dspy.InputField()
    answer: str = dspy.OutputField()
predict = dspy.Predict(QA)
# 会話履歴を初期化
history = dspy.History(messages=[])
print("会話を開始します。終了するときは、'quit' / 'q' を入力してください。")
while True:
    question = input("ユーザ:")
    if question in ["quit", "q", "終了"]:
        break
    # historyパラメータで会話履歴を有効化して推論
    outputs = predict(question=question, history=history)
    print(f"AI: {outputs.answer}\n")
    # このターンの入出力を会話履歴に保存
    history.messages.append({"question": question, **outputs})
なるほど、この仕組みだと確かに保存は自分でやらないといけないね。
実行結果
会話を開始します。終了するときは、'quit' / 'q' を入力してください。
ユーザ:おはようございます。私の名前は太郎です。
AI: おはようございます、太郎さん!今日はどんなことをお話ししましょうか?
ユーザ:私の趣味は競馬なんですよ。
AI: 競馬が趣味なんですね!どのようなレースが好きですか?また、好きな馬や騎手はいますか?
ユーザ:私について教えて。
AI: あなたは競馬が趣味の太郎さんですね!競馬に興味を持っているということは、レースや馬についての知識が豊富かもしれません。どのようなことに特に興味がありますか?
ユーザ:quit
dspy.inspect_history()を見てみる。
[2025-10-03T15:03:33.725531]
System message:
Your input fields are:
1. `question` (str): 
2. `history` (History):
Your output fields are:
1. `answer` (str):
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## question ## ]]
{question}
[[ ## history ## ]]
{history}
[[ ## answer ## ]]
{answer}
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `question`, `history`, produce the fields `answer`.
User message:
[[ ## question ## ]]
おはようございます。私の名前は太郎です。
Assistant message:
[[ ## answer ## ]]
おはようございます、太郎さん!今日はどんなことをお話ししましょうか?
[[ ## completed ## ]]
User message:
[[ ## question ## ]]
私の趣味は競馬なんですよ。
Assistant message:
[[ ## answer ## ]]
競馬が趣味なんですね!どのようなレースが好きですか?また、好きな馬や騎手はいますか?
[[ ## completed ## ]]
User message:
[[ ## question ## ]]
私について教えて。
Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## answer ## ]]
あなたは競馬が趣味の太郎さんですね!競馬に興味を持っているということは、レースや馬についての知識が豊富かもしれません。どのようなことに特に興味がありますか?
[[ ## completed ## ]]
ドキュメントには
言語モデルに実際に送信されるプロンプトは、
dspy.inspect_historyの出力が示すように、複数ターンにわたるメッセージです。各会話ターンは、ユーザーのメッセージに続いてアシスタントのメッセージという形で表現されます。
とある。historyがどこにも展開されていないんだけど、そこはマルチターンメッセージとして表現される、ってことなのかな?
ちょっとこのあとの章にも気になることが書いてあるんだけど、一旦ローカルのMacでMLflowのトレーシングを有効にして、同じことをやってみた。
import dspy
import mlflow
# トレーシングの設定(各環境ごとに適宜設定)
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class QA(dspy.Signature):
    question: str = dspy.InputField()
    history: dspy.History = dspy.InputField()
    answer: str = dspy.OutputField()
predict = dspy.Predict(QA)
history = dspy.History(messages=[])
print("会話を開始します。終了するときは、'quit' / 'q' を入力してください。")
while True:
    question = input("ユーザ:")
    if question in ["quit", "q", "終了"]:
        break
    outputs = predict(question=question, history=history)
    print(f"AI: {outputs.answer}\n")
    history.messages.append({"question": question, **outputs})
実行結果
会話を開始します。終了するときは、'quit' / 'q' を入力してください。
ユーザ: おはよう!私の名前は太郎です。
AI: おはよう、太郎さん!今日はどんなことを話しましょうか?
ユーザ: 私の趣味は競馬です。
AI: 競馬が趣味なんですね!どのレースや馬が特に好きですか?
ユーザ: 私について知ってることを教えて。
AI: あなたの名前は太郎さんで、趣味は競馬だということを知っています。他に何か教えてもらえますか?
ユーザ: q
MLFlowで最後のリクエストのトレース野中のLLMとの入出力の部分はこんな感じ。

どうやらマルチターンで展開されるように見える。シグネチャの定義は何かしらプロンプト内でゴニョゴニョされるものだと思ってるんだけど(ただし開発者はそれを意識しないのがDSPy)、その点を踏まえると若干モヤるが・・・
とりあえずdspy.inspect_history()である程度確認はできそう。
Few-Shotにおける会話履歴
プロンプトの入力フィールドセクションには履歴が表示されないことに気付くかもしれません。システムメッセージでは入力フィールドとしてリストされているにもかかわらずです(例: 「2. history (History):」)。これは意図的な仕様です: 会話履歴を含むFew Shotの例をフォーマットする際、DSPyは履歴を複数のターンに展開しません。代わりに、OpenAI標準フォーマットとの互換性を保つため、各Few Shotの例は単一のターンとして表現されます。
説明を読んでも何がいいたいのか自分にはさっぱりわからなかった。とりあえずサンプルコードを実行してみる。引き続き、MLflowでトレーシングを有効化してある。
import dspy
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class QA(dspy.Signature):
    question: str = dspy.InputField()
    history: dspy.History = dspy.InputField()
    answer: str = dspy.OutputField()
predict = dspy.Predict(QA)
history = dspy.History(messages=[])
predict.demos.append(
    dspy.Example(
        question="フランスの首都は?",
        history=dspy.History(
            messages=[{"question": "ドイツの首都は?", "answer": "ドイツの首都はベルリンです。"}]
        ),
        answer="フランスの首都はパリです。",
    )
)
predict(question="日本の首都は?", history=dspy.History(messages=[]))
dspy.inspect_history()
とりあえず知らないものがでてきた。以下。
predict.demos.append(
    dspy.Example(
        question="フランスの首都は?",
        history=dspy.History(
            messages=[{"question": "ドイツの首都は?", "answer": "ドイツの首都はベルリンです。"}]
        ),
        answer="フランスの首都はパリです。",
    )
)
以下を読むとどうやらこれがFew Shotになるみたい。
そして、モジュールにはdemosという属性があって、これはどうやらdspy.Adapterに渡されて、LLMへのプロンプト内にFew Shotとして埋め込まれる、ということになるみたい。
上記の例がややこしいのはFew Shotの中に会話履歴があるんだよね。ただこれは追加されることがない会話履歴なので、つまり会話履歴を踏まえたFew Shotというふうに自分は感じた。
とりあえず実行してみる。
[2025-10-03T16:03:41.040369]
System message:
Your input fields are:
1. `question` (str): 
2. `history` (History):
Your output fields are:
1. `answer` (str):
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## question ## ]]
{question}
[[ ## history ## ]]
{history}
[[ ## answer ## ]]
{answer}
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `question`, `history`, produce the fields `answer`.
User message:
[[ ## question ## ]]
フランスの首都は?
[[ ## history ## ]]
{"messages": [{"question": "ドイツの首都は?", "answer": "ドイツの首都はベルリンです。"}]}
Assistant message:
[[ ## answer ## ]]
フランスの首都はパリです。
[[ ## completed ## ]]
User message:
[[ ## question ## ]]
日本の首都は?
Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## answer ## ]]
日本の首都は東京です。
[[ ## completed ## ]]
ややこしいのだが、ここがFew Shot。で、これはマルチターンで展開されているように見える。
User message:
[[ ## question ## ]]
フランスの首都は?
[[ ## history ## ]]
{"messages": [{"question": "ドイツの首都は?", "answer": "ドイツの首都はベルリンです。"}]}
Assistant message:
[[ ## answer ## ]]
フランスの首都はパリです。
[[ ## completed ## ]]
上記の中のFew Shot内の会話履歴はマルチターンで展開されずにhistoryフィールドにJSONオブジェクトして埋め込まれている。
User message:
[[ ## question ## ]]
フランスの首都は?
[[ ## history ## ]]
{"messages": [{"question": "ドイツの首都は?", "answer": "ドイツの首都はベルリンです。"}]}
MLflowのトレース。

うーん、改めてこの章の意図がさっぱりわからない。要は
- 会話履歴はdspy.Historyで管理して、これを入力フィールドとしてシグネチャに含めれば、OpenAIなどで一般的なマルチターンのmessages形式に良しなに展開してくれる。(ただし追加は自分で管理)
- Few Shotも同様にマルチターンの会話履歴として展開される。
- ただしFew Shot内に会話履歴を含めると、その会話履歴については展開されない。
ってことが言いたいのかな?
とりあえずこのチュートリアルの章、文章の書き方がハイコンテキスト過ぎて、めちゃめちゃわかりにくい・・・「評価」のところを先にやったら理解できたのかな?と思って読み直してみたけど、やっぱりわからん・・・
エージェントをDSPyで作る
エージェントを作るチュートリアル。引き続きローカルのMac上で、MLflowでトレーシングしつつ進める。
事前準備
MLFlowを起動。
uv init -p 3.12 mlflow-work && cd $_
uv add "mlflow>=2.18.0"
uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite
別ターミナルでJupyterLabを起動。
mkdir dspy-agent-work && cd $_
docker run --rm \
    -p 8888:8888 \
    -u root \
    -e GRANT_SUDO=yes \
    -v .:/home/jovyan/work \
    quay.io/jupyter/minimal-notebook:latest
以降はJupyterLabで。
パッケージインストール
!pip install -qU dspy pydantic mlflow
!pip freeze | egrep -i "dspy|pydantic|mlflow"
dspy==3.0.3
mlflow==3.4.0
mlflow-skinny==3.4.0
mlflow-tracing==3.4.0
openapi-pydantic==0.5.1
pydantic==2.11.10
pydantic-settings==2.11.0
pydantic_core @ file:///home/conda/feedstock_root/build_artifacts/bld/rattler-build_pydantic-core_1746625350/work
OpenAI APIキーをセット
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass('OPENAI_API_KEY')
MLflowへのトレーシングを有効化
import mlflow
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()
ツール・データ構造の定義
このチュートリアルで作成するエージェントは「航空会社向けカスタマーサービスエージェント」となっており、以下の機能を持つ。
- 旅行を新規に予約
- 既存の旅程の変更(フライトの変更やキャンセルなど)
- 対応できない場合はカスタマーサポートチケットを発行
これを実現するために、以下のツールをエージェントに与える。
- 
fetch_flight_info: 与えられた日付のフライト情報を取得
- 
pick_flight: 条件に基づいた最適なフライトの選択
- 
book_flight: 航空券を予約
- 
fetch_itinerary: 予約済みの旅程情報を取得
- 
cancel_itinerary: 予約済みの旅程をキャンセル
- 
get_user_info: ユーザー情報を取得
- 
file_ticket: サポートチケットを登録して、人間によるサポートを依頼
これらのツールで使用するデータをPydanticモデルで定義する。
from pydantic import BaseModel
class Date(BaseModel):
    # LLMはどうも`datetime.datetime`の指定がうまくないため、
    # 日付を表現するためのカスタムクラスを定義
    year: int
    month: int
    day: int
    hour: int
class UserProfile(BaseModel):
    user_id: str
    name: str
    email: str
class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float
class Itinerary(BaseModel):
    confirmation_number: str
    user_profile: UserProfile
    flight: Flight
class Ticket(BaseModel):
    user_request: str
    user_profile: UserProfile
ユーザ情報、フライト情報のダミーデータを作成し、旅行予約、サポートチケットのデータも初期化しておく。
user_database = {
    "山田": UserProfile(user_id="1", name="山田太郎", email="yamada@example.com"),
    "佐藤": UserProfile(user_id="2", name="佐藤一郎", email="sato@example.com"),
    "鈴木": UserProfile(user_id="3", name="鈴木花子", email="suzuki@example.com"),
    "田中": UserProfile(user_id="4", name="田中良子", email="tanaka@example.com"),
}
flight_database = {
    "DA123": Flight(
        flight_id="DA123",  # DSPy航空123
        origin="HND",       # 羽田
        destination="CTS",  # 新千歳
        date_time=Date(year=2025, month=11, day=1, hour=7),
        duration=2,
        price=20000,
    ),
    "DA125": Flight(
        flight_id="DA125",
        origin="HND",
        destination="CTS",
        date_time=Date(year=2025, month=11, day=1, hour=12),
        duration=2,
        price=22000,
    ),
    "DA456": Flight(
        flight_id="DA456",
        origin="HND",
        destination="ITM",  # 伊丹
        date_time=Date(year=2025, month=12, day=1, hour=7),
        duration=1,
        price=12000,
    ),
    "DA460": Flight(
        flight_id="DA460",
        origin="HND",
        destination="ITM",
        date_time=Date(year=2025, month=12, day=1, hour=9),
        duration=1,
        price=14000,
    ),
}
itinery_database = {}
ticket_database = {}
ではツールを定義。ツールはPythonの関数を使えるが、docstringと型ヒントをきちんと定義すること。
import random
import string
def fetch_flight_info(date: Date, origin: str, destination: str):
    """与えられた日付の、出発地・目的地のフライト情報を取得する。出発地・目的地はIATAコードで指定すること"""
    flights = []
    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    if len(flights) == 0:
        raise ValueError("該当するフライトはありません。")
    return flights
def fetch_itinerary(confirmation_number: str):
    """予約済み旅程情報をデータベースから取得する"""
    return itinery_database.get(confirmation_number)
def pick_flight(flights: list[Flight]):
    """ユーザーのリクエストに最適なフライトを選択する。複数の場合は最短かつ最安のフライトを選定する"""
    sorted_flights = sorted(
        flights,
        key=lambda x: (
            x.get("duration") if isinstance(x, dict) else x.duration,
            x.get("price") if isinstance(x, dict) else x.price,
        ),
    )
    return sorted_flights[0]
def _generate_id(length=8):
    chars = string.ascii_lowercase + string.digits
    return "".join(random.choices(chars, k=length))
def book_flight(flight: Flight, user_profile: UserProfile):
    """ユーザーに代わってフライトを予約する"""
    confirmation_number = _generate_id()
    while confirmation_number in itinery_database:
        confirmation_number = _generate_id()
    itinery_database[confirmation_number] = Itinerary(
        confirmation_number=confirmation_number,
        user_profile=user_profile,
        flight=flight,
    )
    return confirmation_number, itinery_database[confirmation_number]
def cancel_itinerary(confirmation_number: str, user_profile: UserProfile):
    """ユーザーに代わって旅程をキャンセルする"""
    if confirmation_number in itinery_database:
        del itinery_database[confirmation_number]
        return
    raise ValueError("旅程情報が見つかりませんでした。確認番号を再度チェックしてください。")
def get_user_info(name: str):
    """与えられた名前から、データベースを検索し、ユーザープロファイルを取得する"""
    return user_database.get(name)
def file_ticket(user_request: str, user_profile: UserProfile):
    """エージェントが処理できない場合はカスタマーサポートチケットを発行する"""
    ticket_id = _generate_id(length=6)
    ticket_database[ticket_id] = Ticket(
        user_request=user_request,
        user_profile=user_profile,
    )
    return ticket_id
ReActエージェントの作成
ReActエージェントをdspy.ReActで作成する。エージェントのタスクの定義、入出力の仕様、ツールを指定する。
import dspy
class DSPyAirlineCustomerService(dspy.Signature):
    """あなたは航空会社のカスタマーサービス担当者であり、ユーザーのフライト予約と管理を支援します。
    あなたには、ユーザーのリクエストに対応するためのツール一覧が提供され、ユーザーの要望を満たすために、
    適切なツールを選択する必要があります。"""
    user_request: str = dspy.InputField()
    process_result: str = dspy.OutputField(
        desc=(
            "処理結果の概要と、ユーザーが確認すべき情報をまとめたメッセージ。例えば、新規フライト予約時"
            "に発行される「確認番号」などが該当する。"
        )
    )
agent = dspy.ReAct(
    DSPyAirlineCustomerService,
    tools = [
        fetch_flight_info,
        fetch_itinerary,
        pick_flight,
        book_flight,
        cancel_itinerary,
        get_user_info,
        file_ticket,
    ]
)
エージェントを使用する
ではエージェントにメッセージを送ってみる。
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
result = agent(user_request="2025年11月1日に羽田から新千歳までのフライトを予約したいです。名前は山田です。")
print(result)
出力。読みやすさのために改行を入れている。
Prediction(
    trajectory={
        'thought_0': 'ユーザーは2025年11月1日に羽田から新千歳までのフライトを予約したいと考えています。まず、フライト情報を取得する必要がありますので、指定された日付のフライト情報を取得するために、`fetch_flight_info`ツールを使用します。',
        'tool_name_0': 'fetch_flight_info',
        'tool_args_0': {'date': {'year': 2025, 'month': 11, 'day': 1, 'hour': 0}, 'origin': 'HND', 'destination': 'CTS'},
        'observation_0': [Flight(flight_id='DA123', date_time=Date(year=2025, month=11, day=1, hour=7), origin='HND', destination='CTS', duration=2.0, price=20000.0), Flight(flight_id='DA125', date_time=Date(year=2025, month=11, day=1, hour=12), origin='HND', destination='CTS', duration=2.0, price=22000.0)],
        'thought_1': 'ユーザーのリクエストに基づいて、羽田から新千歳までのフライト情報を取得しました。2つのフライトが見つかりました。最初のフライト(DA123)は7時出発で、価格は20,000円です。2番目のフライト(DA125)は12時出発で、価格は22,000円です。最短かつ最安のフライトを選択するために、`pick_flight`ツールを使用します。',
        'tool_name_1': 'pick_flight',
        'tool_args_1': {'flights': [{'flight_id': 'DA123', 'date_time': {'year': 2025, 'month': 11, 'day': 1, 'hour': 7}, 'origin': 'HND', 'destination': 'CTS', 'duration': 2.0, 'price': 20000.0}, {'flight_id': 'DA125', 'date_time': {'year': 2025, 'month': 11, 'day': 1, 'hour': 12}, 'origin': 'HND', 'destination': 'CTS', 'duration': 2.0, 'price': 22000.0}]},
        'observation_1': Flight(flight_id='DA123', date_time=Date(year=2025, month=11, day=1, hour=7), origin='HND', destination='CTS', duration=2.0, price=20000.0),
        'thought_2': 'ユーザーのために最適なフライト(DA123)を選択しました。次に、フライトを予約するために、ユーザーの情報が必要です。ユーザーの名前は山田ですが、メールアドレスやユーザーIDが必要ですので、`get_user_info`ツールを使用して、ユーザーのプロファイルを取得します。',
        'tool_name_2': 'get_user_info',
        'tool_args_2': {'name': '山田'},
        'observation_2': UserProfile(user_id='1', name='山田太郎', email='yamada@example.com'),
        'thought_3': 'ユーザーのプロファイル情報を取得しました。ユーザーIDは1、名前は山田太郎、メールアドレスはyamada@example.comです。これを使用して、フライト(DA123)を予約するために`book_flight`ツールを使用します。',
        'tool_name_3': 'book_flight',
        'tool_args_3': {'flight': {'flight_id': 'DA123', 'date_time': {'year': 2025, 'month': 11, 'day': 1, 'hour': 7}, 'origin': 'HND', 'destination': 'CTS', 'duration': 2.0, 'price': 20000.0}, 'user_profile': {'user_id': '1', 'name': '山田太郎', 'email': 'yamada@example.com'}},
        'observation_3': ('exon1lej', Itinerary(confirmation_number='exon1lej', user_profile=UserProfile(user_id='1', name='山田太郎', email='yamada@example.com'), flight=Flight(flight_id='DA123', date_time=Date(year=2025, month=11, day=1, hour=7), origin='HND', destination='CTS', duration=2.0, price=20000.0))),
        'thought_4': 'フライト(DA123)の予約が完了しました。確認番号は「exon1lej」です。ユーザーに予約完了の情報を提供するために、タスクを完了します。',
        'tool_name_4': 'finish',
        'tool_args_4': {},
        'observation_4': 'Completed.'
    },
    reasoning='ユーザーのリクエストに基づき、2025年11月1日に羽田から新千歳までのフライトを予約しました。最適なフライト(DA123)を選択し、ユーザーの情報を取得して予約を完了しました。',
    process_result='フライトの予約が完了しました。確認番号は「exon1lej」です。予約内容を確認する際にこの番号を使用してください。'
)
予約済み旅程データベースにも登録されている。
print(itinery_database)
{
    'exon1lej': Itinerary(
        confirmation_number='exon1lej',
        user_profile=UserProfile(user_id='1', name='山田太郎', email='yamada@example.com'),
        flight=Flight(flight_id='DA123', date_time=Date(year=2025, month=11, day=1, hour=7), origin='HND', destination='CTS', duration=2.0, price=20000.0)
    )
}
結果はシグネチャでも定義されている通り、process_result に出力され、ReActエージェントの場合は reasoning に推論プロセスが記録されている。またtrajectory には以下が出力される。
- 各ステップでの推論プロセス
- 各ステップで選択されたツール
- 各ステップで選択されたツールへのツール呼び出し引数
- 各ステップで選択されたツールの実行結果
内部ではReActによる推論ループが繰り返されており、LLMがそのステップの処理を終了と判断するか、max_iters に達するまで、繰り返される。この過程は dspy.inspect_history(n=10) や MLflowのとレースで確認できる。
dspy.inspect_history(n=10)
一部抜粋
[2025-10-05T10:51:57.375319]
System message:
Your input fields are:
1. `user_request` (str): 
2. `trajectory` (str):
Your output fields are:
1. `next_thought` (str): 
2. `next_tool_name` (Literal['fetch_flight_info', 'fetch_itinerary', 'pick_flight', 'book_flight', 'cancel_itinerary', 'get_user_info', 'file_ticket', 'finish']): 
3. `next_tool_args` (dict[str, Any]):
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## user_request ## ]]
{user_request}
[[ ## trajectory ## ]]
{trajectory}
[[ ## next_thought ## ]]
{next_thought}
[[ ## next_tool_name ## ]]
{next_tool_name}        # note: the value you produce must exactly match (no extra characters) one of: fetch_flight_info; fetch_itinerary; pick_flight; book_flight; cancel_itinerary; get_user_info; file_ticket; finish
[[ ## next_tool_args ## ]]
{next_tool_args}        # note: the value you produce must adhere to the JSON schema: {"type": "object", "additionalProperties": true}
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        あなたは航空会社のカスタマーサービス担当者であり、ユーザーのフライト予約と管理を支援します。
        
        あなたには、ユーザーのリクエストに対応するためのツール一覧が提供され、ユーザーの要望を満たすために、
        適切なツールを選択する必要があります。
        
        You are an Agent. In each episode, you will be given the fields `user_request` as input. And you can see your past trajectory so far.
        Your goal is to use one or more of the supplied tools to collect any necessary information for producing `process_result`.
        
        To do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task.
        After each tool call, you receive a resulting observation, which gets appended to your trajectory.
        
        When writing next_thought, you may reason about the current situation and plan for future steps.
        When selecting the next_tool_name and its next_tool_args, the tool must be one of:
        
        (1) fetch_flight_info, whose description is <desc>与えられた日付の、出発地・目的地のフライト情報を取得する。出発地・目的地はIATAコードで指定すること</desc>. It takes arguments {'date': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'type': 'string'}, 'destination': {'type': 'string'}}.
        (2) fetch_itinerary, whose description is <desc>予約済み旅程情報をデータベースから取得する</desc>. It takes arguments {'confirmation_number': {'type': 'string'}}.
        (3) pick_flight, whose description is <desc>ユーザーのリクエストに最適なフライトを選択する。複数の場合は最短かつ最安のフライトを選定する</desc>. It takes arguments {'flights': {'items': {'properties': {'flight_id': {'title': 'Flight Id', 'type': 'string'}, 'date_time': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'title': 'Origin', 'type': 'string'}, 'destination': {'title': 'Destination', 'type': 'string'}, 'duration': {'title': 'Duration', 'type': 'number'}, 'price': {'title': 'Price', 'type': 'number'}}, 'required': ['flight_id', 'date_time', 'origin', 'destination', 'duration', 'price'], 'title': 'Flight', 'type': 'object'}, 'type': 'array'}}.
        (4) book_flight, whose description is <desc>ユーザーに代わってフライトを予約する</desc>. It takes arguments {'flight': {'properties': {'flight_id': {'title': 'Flight Id', 'type': 'string'}, 'date_time': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'title': 'Origin', 'type': 'string'}, 'destination': {'title': 'Destination', 'type': 'string'}, 'duration': {'title': 'Duration', 'type': 'number'}, 'price': {'title': 'Price', 'type': 'number'}}, 'required': ['flight_id', 'date_time', 'origin', 'destination', 'duration', 'price'], 'title': 'Flight', 'type': 'object'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (5) cancel_itinerary, whose description is <desc>ユーザーに代わって旅程をキャンセルする</desc>. It takes arguments {'confirmation_number': {'type': 'string'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (6) get_user_info, whose description is <desc>与えられた名前から、データベースを検索し、ユーザープロファイルを取得する</desc>. It takes arguments {'name': {'type': 'string'}}.
        (7) file_ticket, whose description is <desc>エージェントが処理できない場合はカスタマーサポートチケットを発行する</desc>. It takes arguments {'user_request': {'type': 'string'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (8) finish, whose description is <desc>Marks the task as complete. That is, signals that all information for producing the outputs, i.e. `process_result`, are now available to be extracted.</desc>. It takes arguments {}.
        When providing `next_tool_args`, the value inside the field must be in JSON format
User message:
[[ ## user_request ## ]]
2025年11月1日に羽田から新千歳までのフライトを予約したいです。名前は山田です。
[[ ## trajectory ## ]]
Respond with the corresponding output fields, starting with the field `[[ ## next_thought ## ]]`, then `[[ ## next_tool_name ## ]]` (must be formatted as a valid Python Literal['fetch_flight_info', 'fetch_itinerary', 'pick_flight', 'book_flight', 'cancel_itinerary', 'get_user_info', 'file_ticket', 'finish']), then `[[ ## next_tool_args ## ]]` (must be formatted as a valid Python dict[str, Any]), and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## next_thought ## ]]
ユーザーは2025年11月1日に羽田から新千歳までのフライトを予約したいと考えています。まず、フライト情報を取得する必要がありますので、指定された日付のフライト情報を取得するために、`fetch_flight_info`ツールを使用します。
[[ ## next_tool_name ## ]]
fetch_flight_info
[[ ## next_tool_args ## ]]
{"date":{"year":2025,"month":11,"day":1,"hour":0},"origin":"HND","destination":"CTS"}
[[ ## completed ## ]]
[2025-10-05T10:51:57.418672]
System message:
Your input fields are:
1. `user_request` (str): 
2. `trajectory` (str):
Your output fields are:
1. `next_thought` (str): 
2. `next_tool_name` (Literal['fetch_flight_info', 'fetch_itinerary', 'pick_flight', 'book_flight', 'cancel_itinerary', 'get_user_info', 'file_ticket', 'finish']): 
3. `next_tool_args` (dict[str, Any]):
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## user_request ## ]]
{user_request}
[[ ## trajectory ## ]]
{trajectory}
[[ ## next_thought ## ]]
{next_thought}
[[ ## next_tool_name ## ]]
{next_tool_name}        # note: the value you produce must exactly match (no extra characters) one of: fetch_flight_info; fetch_itinerary; pick_flight; book_flight; cancel_itinerary; get_user_info; file_ticket; finish
[[ ## next_tool_args ## ]]
{next_tool_args}        # note: the value you produce must adhere to the JSON schema: {"type": "object", "additionalProperties": true}
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        あなたは航空会社のカスタマーサービス担当者であり、ユーザーのフライト予約と管理を支援します。
        
        あなたには、ユーザーのリクエストに対応するためのツール一覧が提供され、ユーザーの要望を満たすために、
        適切なツールを選択する必要があります。
        
        You are an Agent. In each episode, you will be given the fields `user_request` as input. And you can see your past trajectory so far.
        Your goal is to use one or more of the supplied tools to collect any necessary information for producing `process_result`.
        
        To do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task.
        After each tool call, you receive a resulting observation, which gets appended to your trajectory.
        
        When writing next_thought, you may reason about the current situation and plan for future steps.
        When selecting the next_tool_name and its next_tool_args, the tool must be one of:
        
        (1) fetch_flight_info, whose description is <desc>与えられた日付の、出発地・目的地のフライト情報を取得する。出発地・目的地はIATAコードで指定すること</desc>. It takes arguments {'date': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'type': 'string'}, 'destination': {'type': 'string'}}.
        (2) fetch_itinerary, whose description is <desc>予約済み旅程情報をデータベースから取得する</desc>. It takes arguments {'confirmation_number': {'type': 'string'}}.
        (3) pick_flight, whose description is <desc>ユーザーのリクエストに最適なフライトを選択する。複数の場合は最短かつ最安のフライトを選定する</desc>. It takes arguments {'flights': {'items': {'properties': {'flight_id': {'title': 'Flight Id', 'type': 'string'}, 'date_time': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'title': 'Origin', 'type': 'string'}, 'destination': {'title': 'Destination', 'type': 'string'}, 'duration': {'title': 'Duration', 'type': 'number'}, 'price': {'title': 'Price', 'type': 'number'}}, 'required': ['flight_id', 'date_time', 'origin', 'destination', 'duration', 'price'], 'title': 'Flight', 'type': 'object'}, 'type': 'array'}}.
        (4) book_flight, whose description is <desc>ユーザーに代わってフライトを予約する</desc>. It takes arguments {'flight': {'properties': {'flight_id': {'title': 'Flight Id', 'type': 'string'}, 'date_time': {'properties': {'year': {'title': 'Year', 'type': 'integer'}, 'month': {'title': 'Month', 'type': 'integer'}, 'day': {'title': 'Day', 'type': 'integer'}, 'hour': {'title': 'Hour', 'type': 'integer'}}, 'required': ['year', 'month', 'day', 'hour'], 'title': 'Date', 'type': 'object'}, 'origin': {'title': 'Origin', 'type': 'string'}, 'destination': {'title': 'Destination', 'type': 'string'}, 'duration': {'title': 'Duration', 'type': 'number'}, 'price': {'title': 'Price', 'type': 'number'}}, 'required': ['flight_id', 'date_time', 'origin', 'destination', 'duration', 'price'], 'title': 'Flight', 'type': 'object'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (5) cancel_itinerary, whose description is <desc>ユーザーに代わって旅程をキャンセルする</desc>. It takes arguments {'confirmation_number': {'type': 'string'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (6) get_user_info, whose description is <desc>与えられた名前から、データベースを検索し、ユーザープロファイルを取得する</desc>. It takes arguments {'name': {'type': 'string'}}.
        (7) file_ticket, whose description is <desc>エージェントが処理できない場合はカスタマーサポートチケットを発行する</desc>. It takes arguments {'user_request': {'type': 'string'}, 'user_profile': {'properties': {'user_id': {'title': 'User Id', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}, 'email': {'title': 'Email', 'type': 'string'}}, 'required': ['user_id', 'name', 'email'], 'title': 'UserProfile', 'type': 'object'}}.
        (8) finish, whose description is <desc>Marks the task as complete. That is, signals that all information for producing the outputs, i.e. `process_result`, are now available to be extracted.</desc>. It takes arguments {}.
        When providing `next_tool_args`, the value inside the field must be in JSON format
User message:
[[ ## user_request ## ]]
2025年11月1日に羽田から新千歳までのフライトを予約したいです。名前は山田です。
[[ ## trajectory ## ]]
[[ ## thought_0 ## ]]
ユーザーは2025年11月1日に羽田から新千歳までのフライトを予約したいと考えています。まず、フライト情報を取得する必要がありますので、指定された日付のフライト情報を取得するために、`fetch_flight_info`ツールを使用します。
[[ ## tool_name_0 ## ]]
fetch_flight_info
[[ ## tool_args_0 ## ]]
{"date": {"year": 2025, "month": 11, "day": 1, "hour": 0}, "origin": "HND", "destination": "CTS"}
[[ ## observation_0 ## ]]
[1] «flight_id='DA123' date_time=Date(year=2025, month=11, day=1, hour=7) origin='HND' destination='CTS' duration=2.0 price=20000.0»
[2] «flight_id='DA125' date_time=Date(year=2025, month=11, day=1, hour=12) origin='HND' destination='CTS' duration=2.0 price=22000.0»
Respond with the corresponding output fields, starting with the field `[[ ## next_thought ## ]]`, then `[[ ## next_tool_name ## ]]` (must be formatted as a valid Python Literal['fetch_flight_info', 'fetch_itinerary', 'pick_flight', 'book_flight', 'cancel_itinerary', 'get_user_info', 'file_ticket', 'finish']), then `[[ ## next_tool_args ## ]]` (must be formatted as a valid Python dict[str, Any]), and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## next_thought ## ]]
ユーザーのリクエストに基づいて、羽田から新千歳までのフライト情報を取得しました。2つのフライトが見つかりました。最初のフライト(DA123)は7時出発で、価格は20,000円です。2番目のフライト(DA125)は12時出発で、価格は22,000円です。最短かつ最安のフライトを選択するために、`pick_flight`ツールを使用します。
[[ ## next_tool_name ## ]]
pick_flight
[[ ## next_tool_args ## ]]
{"flights":[{"flight_id":"DA123","date_time":{"year":2025,"month":11,"day":1,"hour":7},"origin":"HND","destination":"CTS","duration":2.0,"price":20000.0},{"flight_id":"DA125","date_time":{"year":2025,"month":11,"day":1,"hour":12},"origin":"HND","destination":"CTS","duration":2.0,"price":22000.0}]}
[[ ## completed ## ]]
(snip)
MLflowだとこんな感じ。

DSPyモジュールをカスタマイズしてAIアプリケーションを作る
DSPyでは、Chain-of-ThoughtやReActといったプロンプトエンジニアリング手法を内部で抽象化して、入出力だけで使えるような汎用的な仕組みとして dspy.Module を提供している。モジュールは、複数のものを組み合わせた大規模モジュールとして作ることもできるし、単体のモジュールを自分でカスタムして作ることもできる。
引き続きローカルのMac上で、MLflowでトレーシングしつつ進める。事前準備は上と同じなので以下を参照。
事前準備
MLFlowを起動。
uv init -p 3.12 mlflow-work && cd $_
uv add "mlflow>=2.18.0"
uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite
別ターミナルでJupyterLabを起動。
mkdir dspy-custom-module-work && cd $_
docker run --rm \
    -p 8888:8888 \
    -u root \
    -e GRANT_SUDO=yes \
    -v .:/home/jovyan/work \
    quay.io/jupyter/minimal-notebook:latest
以降はJupyterLabで。
パッケージインストール
!pip install -qU dspy pydantic mlflow
!pip freeze | egrep -i "dspy|pydantic|mlflow"
dspy==3.0.3
mlflow==3.4.0
mlflow-skinny==3.4.0
mlflow-tracing==3.4.0
openapi-pydantic==0.5.1
pydantic==2.12.0
pydantic-settings==2.11.0
pydantic_core==2.41.1
OpenAI APIキーをセット
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass('OPENAI_API_KEY')
MLflowへのトレーシングを有効化
import mlflow
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()
DSPyモジュールのカスタマイズ
カスタムなDSPyモジュールを作ることで、独自のプロンプトロジックや外部サービスの組み込みなどが可能になる。
カスタムなDSPyモジュールを作成するには
- 
dspy.Moduleを継承したサブクラスを作成
- 以下のメソッドを実装
- 
__init__: コンストラクタ
- 
forward: モジュールのコアとなる処理ロジック
 
- 
が必要になる。基本的な例は以下となる。
class MyProgram(dspy.Module):
    
    def __init__(self, ...):
        # 属性やサブモジュールを定義する
        ...
    def forward(self, input_name1, input_name2, ...):
        # ロジックを実装する
        ...
具体的な例として、以下の機能を持つRAGモジュールの例が挙げられている。
- 検索クエリの生成: ユーザーの質問から、適切なコンテキストを取得するためのクエリを生成
- コンテキスト取得: 上のクエリを使用して検索を行い、関連コンテキストを取得
- 回答生成: 元のユーザの質問と検索結果のコンテキストを使って、最終的な回答を生成
サンプルコードではColBERTv2のwikipedia検索が使用されているが、以前と同様に Exa の検索APIを使うことにする。
!pip install exa-py
import getpass
import os
os.environ["EXA_API_KEY"] = getpass.getpass('EXA_API_KEY')
import dspy
from exa_py import Exa
import json
# シグネチャの定義
class QueryGenerator(dspy.Signature):
    """質問に基づいて、関連コンテキストを取得するための検索クエリを生成する"""
    question: str = dspy.InputField()
    query: str = dspy.OutputField()
# モジュール内で使用する関数
def search_exa(query: str) -> str:
    """Exa 検索APIで、クエリに基づいた関連情報を3件検索する"""
    exa = Exa(os.getenv('EXA_API_KEY'))
    result = exa.search_and_contents(
        query,
        type="auto",
        highlights=True,  # Exaの検索結果は、コンテンツだと文字数が多いので、ハイライトを使用
        num_results=5
    )
    return json.dumps([
        {
            "title": r.title,
            "url": r.url,
            "published_date": r.published_date,
            "highlites": r.highlights,
        }
        for r in result.results
    ], indent=2, ensure_ascii=False)
# カスタムモジュールの定義
class RAG(dspy.Module):
    def __init__(self):
        self.query_generator = dspy.Predict(QueryGenerator)
        self.answer_generator = dspy.ChainOfThought("question,context->answer")
    def forward(self, question, **kwargs):
        query = self.query_generator(question=question).query
        context = search_exa(query)
        return self.answer_generator(question=question, context=context).answer
forwardメソッドのロジックは以下となっている。
- 
self.query_generatorにユーザ入力を渡す→dspy.Predictで検索クエリを生成
- 
search_exaで検索クエリから、関連コンテキストを取得
- 
self.answer_generatorに ユーザ入力と関連コンテキストを渡す→dspy.ChainOfThoughtで最終回等を生成
つまりここでRAGパイプラインが実装されているのがわかる。
では実際にクエリを投げてみる。カスタムモジュールからインスタンスを作成して入力を渡す。
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
rag = RAG()
print(rag(question="大谷翔平の2025年の成績は?"))
大谷翔平の2025年の成績は、打率.282、611打席で172安打、25本塁打、55打点、102得点、投手としては14試合登板、47.0イニングで1勝1敗、防御率2.87です。
RAG
RAGのチュートリアル。LinuxやiPhoneアプリについての技術的な質問に答えるというものを作っていく。
引き続きローカルのMac上で、MLflowでトレーシングしつつ進める。事前準備は上と「ほぼ」同じだが、一部パッケージだけが異なる。以下を参照。
事前準備
MLFlowを起動。
uv init -p 3.12 mlflow-work && cd $_
uv add "mlflow>=2.18.0"
uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite
別ターミナルでJupyterLabを起動。
mkdir dspy-custom-module-work && cd $_
docker run --rm \
    -p 8888:8888 \
    -u root \
    -e GRANT_SUDO=yes \
    -v .:/home/jovyan/work \
    quay.io/jupyter/minimal-notebook:latest
以降はJupyterLabで。
パッケージインストール。今回はdatasetsとusjonも追加。
!pip install -qU dspy datasets mlflow ujson
!pip freeze | egrep -i "dspy|datasets|mlflow|ujson"
datasets==4.2.0
dspy==3.0.3
mlflow==3.4.0
mlflow-skinny==3.4.0
mlflow-tracing==3.4.0
ujson==5.11.0
OpenAI APIキーをセット
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass('OPENAI_API_KEY')
MLflowへのトレーシングを有効化
import mlflow
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("DSPy")
mlflow.dspy.autolog()
基本的なDSPyモジュールの使い方
ここまでやってきた基本的な使い方。
import dspy
lm = dspy.LM('openai/gpt-4o-mini')
dspy.configure(lm=lm)
dspy.Predictを使ったシンプルなQA。
qa = dspy.Predict('question: str -> response: str')
response = qa(question="Linuxにおけるハイメモリとローメモリとは何ですか?")
print(response.response)
出力
Linuxにおけるハイメモリとローメモリは、メモリ管理に関連する用語です。
ローメモリ(Low Memory)は、通常、システムが直接アドレス指定できるメモリの範囲を指します。32ビットのアーキテクチャでは、通常は4GBのアドレス空間のうち、上位の一部(通常は3GBまたは2GB)がローメモリとして使用されます。ローメモリは、カーネルやプロセスが直接アクセスできるため、パフォーマンスが良好です。
ハイメモリ(High Memory)は、ローメモリの上に位置するメモリ領域で、特に32ビットシステムにおいては、カーネルが直接アドレス指定できないメモリを指します。ハイメモリは、通常、物理メモリが4GBを超える場合に発生し、カーネルはこの領域にアクセスするために特別な手法(例えば、ページテーブルを使用)を用いる必要があります。
このように、ローメモリとハイメモリは、メモリのアドレス空間の異なる部分を指し、それぞれ異なるアクセス方法とパフォーマンス特性を持っています。
実際のやり取り
dspy.inspect_history(n=1)
[2025-10-09T17:51:44.818428]
System message:
Your input fields are:
1. `question` (str):
Your output fields are:
1. `response` (str):
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## question ## ]]
{question}
[[ ## response ## ]]
{response}
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `question`, produce the fields `response`.
User message:
[[ ## question ## ]]
Linuxにおけるハイメモリとローメモリとは何ですか?
Respond with the corresponding output fields, starting with the field `[[ ## response ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## response ## ]]
Linuxにおけるハイメモリとローメモリは、メモリ管理に関連する用語です。
- **ローメモリ**(Low Memory)は、通常、システムが直接アドレス指定できるメモリの範囲を指します。32ビットのアーキテクチャでは、通常は4GBのアドレス空間のうち、上位の一部(通常は3GBまたは2GB)がローメモリとして使用されます。ローメモリは、カーネルやプロセスが直接アクセスできるため、パフォーマンスが良好です。
- **ハイメモリ**(High Memory)は、ローメモリの上に位置するメモリ領域で、特に32ビットシステムにおいては、カーネルが直接アドレス指定できないメモリを指します。ハイメモリは、通常、物理メモリが4GBを超える場合に発生し、カーネルはこの領域にアクセスするために特別な手法(例えば、ページテーブルを使用)を用いる必要があります。
このように、ローメモリとハイメモリは、メモリのアドレス空間の異なる部分を指し、それぞれ異なるアクセス方法とパフォーマンス特性を持っています。
[[ ## completed ## ]]
dspy.ChainOfThought を使ったChain-of-Thoughtな推論。
cot = dspy.ChainOfThought('question -> response')
cot(question="中括弧は単独の行に表示されるべきですか?")
Prediction(
    reasoning='中括弧(例えば、{}や[])が単独の行に表示されるべきかどうかは、文書のスタイルやフォーマットによります。一般的には、プログラミングや数式の文脈では、中括弧がコードブロックやリストの開始・終了を示すために単独の行に置かれることが多いです。一方、文章やエッセイの中では、通常は文の一部として扱われ、単独の行にはしないことが一般的です。したがって、使用する文脈に応じて判断する必要があります。',
    response='中括弧が単独の行に表示されるべきかどうかは、文脈によります。プログラミングや特定のフォーマットでは単独の行にすることが一般的ですが、文章の中では通常は文の一部として扱われます。使用するスタイルガイドや目的に応じて決定してください。'
)
評価と反復的な開発
ここまではシンプルな使い方だが、より精度高い回答を得て、継続的に改善していくには、「評価」とそれを踏まえた「開発の繰り返し」が必要になる。
ここでは、実際にRAGを作る前に、DSPyを使った例示データ(学習・評価用データセット)の操作とそれを使った評価の流れについて見ていく。
DSPyにおける例示データの操作。
評価を行うには以下が必要になる。
- 多数の入力データ。QAシステムであれば「質問」
- 
評価指標。入力に対して得られた出力をどういう観点で評価するか?
- 分類問題やQAタスクの場合には正解ラベル(理想的な出力)も必要になる
- 自己教師あり学習に基づく指標。例えば、忠実性やハルシネーションの有無など。
 
このチュートリアルでは RAG-QAアリーナのデータセットが使用されている。このデータセットにはStackExchangeから取得された技術的な質問と正解回答のペア約2000件が含まれているのだが、全て英語。一旦中味を見てみる。
まず、元のデータセットをダウンロード。なんか dspy.utils.download() ってのがあるみたいだけどどうもうまくいかなかったのでwgetで。
!wget https://huggingface.co/dspy/cache/resolve/main/ragqa_arena_tech_examples.jsonl
データセットを読み取り
with open("ragqa_arena_tech_examples.jsonl") as f:
    data = [ujson.loads(line) for line in f]
データをピックアップしてみてみる。
data[0]
{'question': 'why igp is used in mpls?',
 'response': "An IGP exchanges routing prefixes between gateways/routers.  \nWithout a routing protocol, you'd have to configure each route on every router and you'd have no dynamic updates when routes change because of link failures. \nFuthermore, within an MPLS network, an IGP is vital for advertising the internal topology and ensuring connectivity for MP-BGP inside the network.",
 'gold_doc_ids': [2822, 2823]}
これを日本語に翻訳する。自分は vLLM で APIサーバを立てて、 cyberagent/DeepSeek-R1-Distill-Qwen-14B-Japanese を使って翻訳した。
データセットの翻訳(本題ではないので折りたたみ)
vLLMサーバはUbuntu-22.04 + RTX4090に立てた。
uv init -p 3.12 vllm-api-work && cd $_
uv venv --seed
source .venv/bin/activate
uv pip install vllm --torch-backend=auto
uv pip install bitsandbytes
vLLMでモデルをロードしてAPIサーバを起動。並列度はvLLMデフォルトの256だとダメだったので128で。
vllm serve cyberagent/DeepSeek-R1-Distill-Qwen-14B-Japanese \
    --host 0.0.0.0 \
    --port 8000 \
    --quantization bitsandbytes \
    --max_model_len 8192 \
    --kv-cache-dtype fp8 \
    --reasoning-parser deepseek_r1 \
    --max-num-seqs 128
JupyterLab側ではこんな感じ。並列処理は ppqueue を使っている。詳しくはこちら。
!pip install ppqueue
import json
from openai import OpenAI
from pydantic import BaseModel
client = OpenAI(
    base_url="http://<APIサーバのIPアドレス>:8000/v1",
    api_key="dummy",
)
model = client.models.list().data[0].id
class TranslatedQA(BaseModel):
    question: str
    response: str
    
def translate(data_dict: dict)->dict:
    prompt = "与えられたJSONの質問と応答を日本語に翻訳しつつ、正しいJSONで出力してください。\n"
    prompt += f"質問: {data_dict['question']}\n回答: {data_dict['response']}"
    
    completion = client.chat.completions.parse(
        model="cyberagent/DeepSeek-R1-Distill-Qwen-14B-Japanese",
        messages=[
            {
                "role": "user",
                "content": prompt
            }
        ],
        temperature=0.6,
        response_format=TranslatedQA
    )
    parsed = completion.choices[0].message.parsed
    return {**data_dict, "question": parsed.question, "response": parsed.response}
from ppqueue import Queue
from threading import Thread
with Queue(
    max_concurrent=128,
    show_progress=True,
    engine=Thread,
) as queue:
    jobs = queue.map(translate, data[:500])
results = [job.result for job in jobs]
だいたい40分ぐらいで終わった。ただし、一部トークンサイズオーバーやリクエストタイムアウトのエラーが起きる場合がある(どうやら途中で出力が延々と繰り返しになってしまっているのではないかと思う)。エラーとなったものは結果がNoneとなるので、これを取り除いてやる。
data_ja_cleaned = [d for d in data_ja if d is not None]
len(data_ja_cleaned)
2055
元が2064権だったので10件ぐらいは削られたみたい。
一応ファイルに保存しておく。
with open("ragqa_arena_tech_examples_ja.jsonl", "w", encoding="utf-8") as f:
    for d in data_ja_cleaned:
        f.write(json.dumps(d, ensure_ascii=False))
        f.write("\n")
翻訳後のデータをサンプリング。元のデータの question と response を日本語にしている。
data_ja_cleaned[0]
{
    'question': 'IGPがMPLSで使用される理由はなぜですか?',
    'response': 'IGPはゲートウェイ/ルーター間でルーティングプレフィックスを交換します。ルーティングプロトコルがない場合、各ルーターで個々のルートを設定する必要があり、リンクの障害による変更が発生した場合に動的更新が行われません。さらに、MPLSネットワーク内では、IGPは内部トポロジーを広告し、MP-BGPの接続を保証するために重要です。',
    'gold_doc_ids': [2822, 2823]
}
こういった辞書データをDSPyに読み込むには、個々のデータを dspy.Example として読み込みリストで保持する。dspy.Exampleオブジェクト作成時はwith_inputs()で「入力」となるフィールドを指定する必要がある。指定しなかったフィールドはメタデータとなる。
data = [dspy.Example(**d).with_inputs('question') for d in data_ja_cleaned]
先程と同じデータを見てみる。後で使うのでここではexampleに入れている。
example = data[0]
example
Example({
    'question': 'IGPがMPLSで使用される理由はなぜですか?',
    'response': 'IGPはゲートウェイ/ルーター間でルーティングプレフィックスを交換します。ルーティングプロトコルがない場合、各ルーターで個々のルートを設定する必要があり、リンクの障害による変更が発生した場合に動的更新が行われません。さらに、MPLSネットワーク内では、IGPは内部トポロジーを広告し、MP-BGPの接続を保証するために重要です。',
    'gold_doc_ids': [2822, 2823]
}) (input_keys={'question'})
先程のデータがdspy.Exampleオブジェクトになっていて、入力フィールド名も定義されているのがわかる。
このデータセットを3つに分ける。この分け方は一般的なデータセットと同じような感じ。
- 学習用(および検証用)
- DSPyオプティマイザに入力
- 学習用データから最適化アルゴリズムの学習を行って、検証用データで学習度合いを確認する
- 学習用と検証用のデータ量の推奨は各30~300件
- プロンプト最適化の場合、学習用データよりも検証データが多いほうが効果的な場合が多い
- MIPROv2の場合、valsetを指定しなければ、自動で学習用20%、検証用80%に分割
 
- 開発用・テスト用
- 30~1,000件程
- 開発用: 改善の過程で内容を確認するのに使う
- テスト用: 最終的な評価用
 
ここでは
- 学習用: 200件
- 開発用: 300件
- テスト用: 500件
にデータを分割する。
import random
random.Random(0).shuffle(data)
trainset, devset, testset = data[:200], data[200:500], data[500:1000]
len(trainset), len(devset), len(testset)
(200, 300, 500)
DSPyにおける評価方法
DSPyで評価を行う際の指標はいくつかあるようだが、ここでは セマンティックF1 を使う。これは、
- 出力された回答が、正解回答に含まれるすべての重要な事実を網羅しているか
- 出力された回答が、正解回答に含まれていない内容をどれだけ適切に省略しているか
というものらしい。そしてこれもDSPyモジュールで実装されている。これを使ったサンプルコードはこんな感じ。
from dspy.evaluate import SemanticF1
# メトリクスをインスタンス化
metric = SemanticF1(decompositional=True)
# 少し前に用意した`example`を入力として、`cot`モジュールで回答を生成
pred = cot(**example.inputs())
# 生成結果のメトリクススコアを計算
score = metric(example, pred)
print(f"質問: \t {example.question}\n")
print(f"正解回答: \t {example.response}\n")
print(f"生成された回答: \t {pred.response}\n")
print(f"セマンティックF1スコア: {score:.2f}")
質問: 	 IGPがMPLSで使用される理由はなぜですか?
正解回答: 	 IGPはゲートウェイ/ルーター間でルーティングプレフィックスを交換します。ルーティングプロトコルがない場合、各ルーターで個々のルートを設定する必要があり、リンクの障害による変更が発生した場合に動的更新が行われません。さらに、MPLSネットワーク内では、IGPは内部トポロジーを広告し、MP-BGPの接続を保証するために重要です。
生成された回答: 	 IGPがMPLSで使用される理由は、効率的なルーティングとトラフィックエンジニアリングの向上にあります。MPLSはラベルを使用してパケットを転送し、従来のIPルーティングよりも高速で柔軟なデータ転送を実現します。これにより、IGPは迅速にルートを計算し、ネットワークのパフォーマンスを向上させることができます。
セマンティックF1スコア: 0.31
まず入力を dspy.ChainOfThought に渡して回答を生成。生成された回答をデータセットの正解回答をあわせて dspy.SemanticF1 に渡して評価し、スコアを計算するという流れ。
評価時のプロンプトもみてみる。
dspy.inspect_history(n=1)
[2025-10-12T12:07:29.166254]
System message:
Your input fields are:
1. `question` (str): 
2. `ground_truth` (str): 
3. `system_response` (str):
Your output fields are:
1. `reasoning` (str): 
2. `ground_truth_key_ideas` (str): enumeration of key ideas in the ground truth
3. `system_response_key_ideas` (str): enumeration of key ideas in the system response
4. `discussion` (str): discussion of the overlap between ground truth and system response
5. `recall` (float): fraction (out of 1.0) of ground truth covered by the system response
6. `precision` (float): fraction (out of 1.0) of system response covered by the ground truth
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## question ## ]]
{question}
[[ ## ground_truth ## ]]
{ground_truth}
[[ ## system_response ## ]]
{system_response}
[[ ## reasoning ## ]]
{reasoning}
[[ ## ground_truth_key_ideas ## ]]
{ground_truth_key_ideas}
[[ ## system_response_key_ideas ## ]]
{system_response_key_ideas}
[[ ## discussion ## ]]
{discussion}
[[ ## recall ## ]]
{recall}        # note: the value you produce must be a single float value
[[ ## precision ## ]]
{precision}        # note: the value you produce must be a single float value
[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Compare a system's response to the ground truth to compute recall and precision of key ideas.
        You will first enumerate key ideas in each response, discuss their overlap, and then report recall and precision.
User message:
[[ ## question ## ]]
IGPがMPLSで使用される理由はなぜですか?
[[ ## ground_truth ## ]]
IGPはゲートウェイ/ルーター間でルーティングプレフィックスを交換します。ルーティングプロトコルがない場合、各ルーターで個々のルートを設定する必要があり、リンクの障害による変更が発生した場合に動的更新が行われません。さらに、MPLSネットワーク内では、IGPは内部トポロジーを広告し、MP-BGPの接続を保証するために重要です。
[[ ## system_response ## ]]
IGPがMPLSで使用される理由は、効率的なルーティングとトラフィックエンジニアリングの向上にあります。MPLSはラベルを使用してパケットを転送し、従来のIPルーティングよりも高速で柔軟なデータ転送を実現します。これにより、IGPは迅速にルートを計算し、ネットワークのパフォーマンスを向上させることができます。
Respond with the corresponding output fields, starting with the field `[[ ## reasoning ## ]]`, then `[[ ## ground_truth_key_ideas ## ]]`, then `[[ ## system_response_key_ideas ## ]]`, then `[[ ## discussion ## ]]`, then `[[ ## recall ## ]]` (must be formatted as a valid Python float), then `[[ ## precision ## ]]` (must be formatted as a valid Python float), and then ending with the marker for `[[ ## completed ## ]]`.
Response:
[[ ## reasoning ## ]]
The ground truth explains that IGP is used for exchanging routing prefixes between gateways/routers and highlights the importance of dynamic updates in case of link failures. It also mentions that IGP is crucial for advertising internal topology and ensuring MP-BGP connections within an MPLS network. The system response focuses on the efficiency of routing and traffic engineering improvements provided by IGP in MPLS, emphasizing the speed and flexibility of data transfer. While both responses address the role of IGP in MPLS, the system response does not mention the dynamic updates or the advertising of internal topology, which are key aspects in the ground truth.
[[ ## ground_truth_key_ideas ## ]]
1. IGP exchanges routing prefixes between gateways/routers.
2. Without routing protocols, individual routes must be set on each router.
3. Dynamic updates occur in case of link failures.
4. IGP is important for advertising internal topology.
5. IGP ensures MP-BGP connections in MPLS networks.
[[ ## system_response_key_ideas ## ]]
1. IGP improves routing efficiency and traffic engineering.
2. MPLS uses labels for packet forwarding.
3. MPLS provides faster and more flexible data transfer.
4. IGP allows for quick route calculations.
5. IGP enhances network performance.
[[ ## discussion ## ]]
There is some overlap between the ground truth and the system response, particularly in the acknowledgment of IGP's role in improving routing efficiency and performance within MPLS. However, the system response lacks critical details about the necessity of dynamic updates and the advertising of internal topology, which are essential for understanding the full context of IGP's function in MPLS networks. Therefore, while both responses touch on the benefits of IGP in MPLS, the ground truth provides a more comprehensive view.
[[ ## recall ## ]]
0.4
[[ ## precision ## ]]
0.25
[[ ## completed ## ]]
これで評価を回してもいいんだけど、dspy.Evaluateを使うことで、並列処理を良しなにやってくれたり、便利なユーティリティ機能が使えたりする。
# dspy.Evaluate を初期化して、再利用できるようにする
evaluate = dspy.Evaluate(
    devset=devset,
    metric=metric,
    num_threads=24,
    display_progress=True,
    display_table=2
)
# CoTプログラムを評価
evaluate(cot)

EvaluationResult(score=35.42, results=<list of 300 results>)
基本的な検索拡張生成(RAG)の仕組み
(続く)

