Vertex AI Agent Engine のカスタムテンプレートでセッション機能付きチャットボットを作る
Vertex AI Agent Engine は AI エージェントを構築・デプロイするための Google Cloud のマネージドサービスです。[1]
以下のフレームワークに対してはテンプレートが用意されており、簡単にデプロイすることができます。
- Agent Development Kit
- LangChain
- LangGraph
- AG2
- LlamaIndex
また上記に挙げられていないフレームワークについても、カスタムテンプレートを作成することでデプロイすることができます。
今回はカスタムテンプレートを用いて、セッション機能付きの AI チャットボットを実装してみます。
なお本記事のソースコードは以下のリポジトリで公開しています。
Agent Engine の基礎知識
カスタムテンプレートについて
カスタムテンプレートでは以下のメソッドを持ったクラスを実装します。[2]
__init__()
set_up()
-
query()
,stream_query()
__init__()
と set_up()
はどちらも初期化ロジックなので違いが分かりづらいですが、Agent Engine のドキュメントによるとコンストラクタの結果は「picklable」である必要があります。
「オブジェクトが picklable である」とは
Python のドキュメントによると「オブジェクトが picklable である」とは以下のいずれかです。[3]
- 組み込み定数
- 整数、浮動小数点数、複素数
- 文字列、バイト、バイト配列
- 要素が picklable なタプル、リスト、集合、辞書
- 組み込み関数とユーザー定義関数
- モジュールのトップ・レベルからアクセス可能なクラス
-
__getstate__()
の結果が picklable なクラスのインスタンス
ざっくりした使い分けとしては
-
__init__()
:設定値をインスタンス変数に保存するだけ -
set_up()
:クライアントの初期化などを行う
となります。
query()
、stream_query()
にはカスタムエージェントのロジックを実装します。
が、この 2 つのメソッドしか使えないのはとても不便なので実際には register_operations()
でカスタムメソッドを登録することになると思います。
ちなみに全てのメソッドは任意で、必要なものだけ定義すれば良いです。
セッション機能
Agent Engine には組み込みのセッション機能があり、Agent Development Kit(ADK)もしくは API から利用可能です。[4]
今回は ADK から利用する前提で説明します。
ADK には VertexAiSessionService
というクラスがあり、以下のメソッドが定義されています。
create_session
get_session
list_session
delete_session
list_events
append_event
セッションが 1 つの会話、イベントがその中のメッセージのイメージです。
ChatGPT などのチャットアプリでは過去に送ったメッセージの編集ができますが、VertexAiSessionService
ではイベント操作は一覧取得と追加のみなので、そういう要件が入ってくると使えなそうです。
注意点としてセッション機能は現在、us-central1
リージョンでのみ利用可能です。[5]
今回はこのクラスの範囲内でセッション機能を実装することにします。
実装
カスタムテンプレート
まずは __init__()
を定義します。
今回作成するエージェントでは Google Cloud のプロジェクト ID、リージョン、生成 AI のモデル名を渡すことにします。
class CustomAgent:
def __init__(self, project_id: str, location: str, model: str):
self.project_id = project_id
self.location = location
self.model = model
次に set_up()
を定義します。
__init__()
で保存したプロジェクト ID とリージョンから Google Gen AI SDK クライアントと VertexAiSessionService
を作成します。
またセッション管理で Agent Engine インスタンスの ID が必要となるので、環境変数 GOOGLE_CLOUD_AGENT_ENGINE_ID
から取得しておきます。
この環境変数はデプロイされた際に自動で与えられるものです。[6]
def set_up(self):
self.client = genai.Client(
vertexai=True,
project=self.project_id,
location=self.location,
)
self.session_service = VertexAiSessionService(
self.project_id,
self.location,
)
self.app_name = os.environ["GOOGLE_CLOUD_AGENT_ENGINE_ID"]
続いてセッションを操作するカスタムメソッドを追加していきます。
ほとんどセッションサービスのラッパーです。
get_session
はエージェントから提供する必要がないのでそれだけ省いています。
def create_session(self, user_id: str):
session = self.session_service.create_session(
app_name=self.app_name,
user_id=user_id,
)
return session.id
def list_sessions(self, user_id: str):
response = self.session_service.list_sessions(
app_name=self.app_name,
user_id=user_id,
)
return [session.id for session in response.sessions]
def delete_session(self, user_id: str, session_id: str):
self.session_service.delete_session(
app_name=self.app_name,
user_id=user_id,
session_id=session_id,
)
作成したメソッドをカスタムメソッドとして登録します。
def register_operations(self):
return {
"": [
"create_session",
"list_sessions",
"delete_session",
],
}
続いてメインの生成 AI と会話する部分の処理を実装します。
get_session
で会話履歴を取得し、送られてきたメッセージを結合してモデルに渡すようにします。
結果が生成されたらメッセージと結果を会話履歴に追加し、生成結果を返却します。
def send_message(self, user_id: str, session_id: str, message: str):
session = self.session_service.get_session(
app_name=self.app_name,
user_id=user_id,
session_id=session_id,
)
history = [event.content for event in session.events if event.content is not None]
user_content = UserContent(message)
response = self.client.models.generate_content(
model=self.model,
contents=history + [user_content],
)
user_event = Event(
invocation_id=response.response_id,
author="user",
content=user_content,
)
self.session_service.append_event(
session=session,
event=user_event,
)
agent_event = Event(
invocation_id=response.response_id,
author="agent",
content=ModelContent(response.text),
)
self.session_service.append_event(
session=session,
event=agent_event,
)
return response.text
チャット Bot アプリでは既存の会話履歴の表示なども必要なので、あるセッションのメッセージ一覧取得も作成しておきます。
ADK の Event
インスタンスをそのまま返すのはなんか嫌なので一応メッセージ用の TypedDict を作っておきます。
class Message(TypedDict):
id: str
author: str
content: str
timestamp: float
セッションサービスから取得したイベント一覧をメッセージに詰め替えて返却します。
def list_messages(self, user_id: str, session_id: str):
response = self.session_service.list_events(
app_name=self.app_name,
user_id=user_id,
session_id=session_id,
)
return [
Message(
id=event.id,
author=event.author,
content=event.content.parts[0].text,
timestamp=event.timestamp,
)
for event in response.events
]
最後に register_operations
を更新しておきます。
実装は省略しますがストリーミング生成用の send_message_stream
も定義しており、そちらも登録しておきます。
def register_operations(self):
return {
"": [
"create_session",
"list_sessions",
"delete_session",
"send_message",
"list_messages",
],
"stream": ["send_message_stream"],
}
custom_agent.py の全体像
デプロイスクリプト
Agent Engine インスタンスの作成や更新はコンソールからはできず、スクリプトで行う必要があるので、以下のように作成します。
agent_engine
としてカスタムエージェントのインスタンスを渡すのとは別に以下が必要です。[7]
- エージェントが利用しているライブラリを
requirements
で指定する - カスタムエージェントを別ファイルで定義した場合、
extra_packages
で指定する
vertexai.init(
project=PROJECT_ID,
location=LOCATION,
staging_bucket=f"gs://{BUCKET_NAME}",
)
agent_engines.create(
agent_engine=CustomAgent(
project_id=PROJECT_ID,
location=LOCATION,
model="gemini-2.5-flash-preview-05-20",
),
requirements=[
"google-cloud-aiplatform[adk,agent-engines]>=1.94.0",
"google-genai>=1.16.1",
],
extra_packages=["custom_agent.py"],
)
スクリプトを実行すると Agent Engine インスタンスが作成され、リソース ID が表示されます。
(一部マスクしています。)
Identified the following requirements: {'cloudpickle': '3.1.1', 'pydantic': '2.11.5'}
The following requirements are missing: {'cloudpickle', 'pydantic'}
The following requirements are appended: {'pydantic==2.11.5', 'cloudpickle==3.1.1'}
The final list of requirements: ['google-cloud-aiplatform[adk,agent-engines]>=1.94.0', 'google-genai>=1.16.1', 'pydantic==2.11.5', 'cloudpickle==3.1.1']
Using bucket **********
Wrote to gs://**********/agent_engine/agent_engine.pkl
Writing to gs://**********/agent_engine/requirements.txt
Creating in-memory tarfile of extra_packages
Writing to gs://**********/agent_engine/dependencies.tar.gz
Creating AgentEngine
Create AgentEngine backing LRO: projects/**********/locations/us-central1/reasoningEngines/**********/operations/**********
View progress and logs at https://console.cloud.google.com/logs/query?project=**********
AgentEngine created. Resource name: projects/**********/locations/us-central1/reasoningEngines/**********
To use this AgentEngine in another session:
agent_engine = vertexai.agent_engines.get('projects/**********/locations/us-central1/reasoningEngines/**********')
動作確認
以下のシナリオで動作確認するスクリプトを書きます。
- セッション作成
- セッション一覧取得で作成されたことを確認
- 1 回目のメッセージで自己紹介
- 2 回目のメッセージで名前を確認(会話履歴が使われていることの確認)
- メッセージ一覧を確認
- セッション削除
- セッション一覧取得で削除されたことを確認
agent_engine = agent_engines.get(RESOURCE_ID)
user_id = "test"
session_id = agent_engine.create_session(user_id=user_id)
print(agent_engine.list_sessions(user_id=user_id))
print(agent_engine.send_message(user_id=user_id, session_id=session_id, message="私の名前は山田太郎です。"))
for chunk in agent_engine.send_message_stream(user_id=user_id, session_id=session_id, message="私の名前はなんでしょう。"):
print(chunk)
print(agent_engine.list_messages(user_id=user_id, session_id=session_id))
agent_engine.delete_session(user_id=user_id, session_id=session_id)
print(agent_engine.list_sessions(user_id=user_id))
実行すると以下のようなログが出ます。
['6310788274864521216']
承知いたしました。山田太郎さんですね。
あなたは**山田太郎**さんです。
[{'author': 'user', 'timestamp': 1748239265.17235, 'id': '2798912951375888384', 'content': '私の名前は山田太郎です。'}, {'author': 'agent', 'timestamp': 1748239265.2611, 'id': '7419606169058017280', 'content': '承知いたしました。山田太郎さんですね。'}, {'author': 'user', 'timestamp': 1748239269.107376, 'id': '2249473796836687872', 'content': '私の名前はなんでしょう。'}, {'author': 'agent', 'timestamp': 1748239269.173521, 'id': '1654998646023782400', 'content': 'あなたは**山田太郎**さんです。'}]
[]
良さそうです。
Discussion