[実践ADK] ADKとLyriaとChainlitで音楽生成エージェント - ② Chainlitを使ったADK向けのChatUI
こんにちは、サントリーこと大橋です。
前回に引き続き、ADKとLyriaとChainlitで音楽生成エージェントを作成し、Agent EngineとCloud Runにデプロイして利用してみたいと思います。
前回の記事は以下です。構成や、ADKで作成したAI Agentについては前回の記事を参照してください。
今回はADKの開発UIではなくChainlitを利用したChatUIを作成しローカルでAI AgentとChatUIを動かしてみようと思います。
Chainlitを使ったChatUI
Chainlitとは
Chainlitは対話型AIアプリケーションを効率的に開発するためのオープンソースPythonフレームワークです。Pythonコードと組み合わせることで、直感的でリッチなユーザーインターフェースを素早く構築できるだけでなく、AIの推論過程の可視化や、ユーザーからのフィードバック収集といった機能も備えています。これにより、誰もが簡単に高品質な対話型AIを構築・運用できる環境を提供します。
他のフレームワークとの比較、なぜStreamlitを使わない?
Chainlitと同様のフレームワークとして、Streamlitが有名ですね。
StreamlitはChainlitよりも汎用的なユーザーインターフェース構築フレームワークで、データサイエンスや機械学習のプロトタイプ作成、ダッシュボード構築、簡易的なWebアプリ開発に非常に強みを発揮します。豊富なウィジェットと柔軟なレイアウト機能により、様々なデータを視覚化し、インタラクティブな操作を可能にします。汎用性が高く、多様なWebアプリケーションに対応できますが、対話型UIの構築にはある程度のカスタム実装が求められます。
またStreamlitは画面を開いている限り常にバックエンドと通信するため、Cloud Runで動かすと、ユーザーが画面を閉じてくれない限り0スケールができずお金がかかってしまうという噂があります。
(筆者未検証)
なお他に同様のフレームワークとしてGradioがあります。
GradioもStreamlitとほぼ同様なユーザーインターフェース構築フレームワークで、機械学習分野のプロトタイプとして、有名です。
Gradioについては、ADKとFirestore、Gradioを利用したコードラボがGoogle Codelabにあるので、試してみたい方はそちらを参照してみてください。
今回はChatUIのみがほしいので、Chainlitを用いてUIを構築していきます。
ADK向けのChainlitのコード
以下が今回作成したADK向けのChainlitのメッセージ処理のコードです。
LangChainやOpenAI向けにはインテグレーションがあるのですが、ADKには無いため、自分が行いたい処理に合わせてある程度カスタマイズが必要です。
ChainlitとAI Agent ~ リモート or インサイド
前回の記事にも構成を書きましたが、今回のシステム構成では、Chainlitを動かしているサーバーと、AI Agentが動くサーバーが異なります。
Chainlitの各種Exampleを見ると、基本的にはChainlitと同じコード内でAI Agentを動かすことを想定しているように見えます。
ADKで言えば、adk web
やadk api-server
のようなサーバーを起動すし、APIとしてAI Agentを呼び出すのではなく、直接runner.run_async
を用いて、AI Agentを動かすほうがChainlitとしては標準的な実装に見えます。
このあたりが、いくつかコード上の不調和を生んでしまっている部分があるのでご注意ください。
なお本記事ではChainlitの初歩的な内容については説明しません。 他の記事や公式ドキュメントを参照してください。
ADKの呼び出し
今回ADKで実装されたAI Agentは別サーバ上にいることを想定しています。
ローカルの場合はadk api-server
で実行するので、requests
経由でAPIを実行し、
Agent Engineの場合はVertex AI SDKの agentengine.get(resource_id)
で取得したインスタンス経由でAI Agentを実行します。
ローカルとAgent Engineの実装を切り替えられるようにラッパークラスを作成します。
まずは抽象クラスです。 sessionの管理メソッド群と、stream_query
と async_stream_query
を定義しています。
そしてローカル(などのAPI経由)の実装クラスです。
stream_query
やasync_stream_query
はSSEで実行されるためSSEClient
やAsyncSSEClient
を利用します。
AsyncSSEClient
のread_bufsize
がとてつもなく大きいのはストリームで音楽データがまるっと来るため、チャンクサイズが大きくないとエラーが発生してしまうためです。
次にAgent Engineを利用した実装クラスです。
こちらはAgent Engineのインスタンスのラッパーとなります。
通常のAdkAppでは存在しないload_artifact
、stream_query_sse
、async_stream_query_sse
というメソッドを呼び出していますが、これは次回の記事で説明したいと思います。
最後にこれらのクラスを呼び分ける関数です。
環境変数に設定された BACKEND_TYPE
を参照して、作成するクラスを振り分けていいます。
on_message
)
event処理(ADKを利用した場合、別サーバー上にいるAgentからAPI経由で返却される値はfrom google.adk.events.Event
のdictです。
なので、これに合わせて処理をする必要があります。
上記で作成したChatAPI
というクラス経由でasync_stream_query
を呼び出します。
async_stream_query
はAsyncIteratorを返却するので、async for in
を利用して、Eventを取得します。
event処理1・エラー処理
前半はEventではないレスポンスが返却された場合のエラー処理をしています。
Agent Engineでクオータエラーになった場合などでこの処理を使うことが比較的多いです。
event処理2・ストリーム処理(partial=True)
GeminiなどのLLMではストリーム形式のレスポンスをサポートしています。
ストリームモードを用いた場合、返却されるメッセージは一部分ずつ連続的にストリームで返却されます。
例えばLLMから「こんにちは」と返却される場合、「こ」「ん」「に」「ち」「わ」とバラバラなEventがストリームとして連続的に返却されます。
この部分的な返却が行われる場合、メッセージが部分的であることを記すためにevent.partial
がtrueになります。
今回のコードでもevent.partial
がtrueの場合にはstream用の処理を行っています。
またChainlit側にもこのようなストリームで返却されたメッセージを扱うための関数 cl.Message().stream_token(msg)
があります。
上記のような部分的なメッセージが返却される場合はこの関数を利用します。
この関数を利用することで、以下のようにAI Agentからのレスポンスを連続的に表示することができUXが向上します。(一気に表示とかなり待った印象になりUXがあまり良くないです。)
以下がストリームモードでない場合です。
ストリームモードの場合
ストリームモードのほうが途中だとしてもレスポンス自体があるため速く感じます。
なおこのevent.partial
は、trueできていたメッセージが、最後にevent.partialがfalseになると全文改めて送信されます。たまにpartial=true
無しでいきなりpartial=false
で来ることがある気がしていて、このあたりの扱いがちょっとむずい印象です。
event処理3・ツール呼び出し(function_call & function_response)
eventにfunction_call
がある場合はこれからTool Callingを実施しようとしているということです。
eventにfunction_response
がある場合はTool Callingの実行が完了しその結果が含まれています。
呼び出す処理によりけりですが、この処理は比較的長い処理になる場合が多いため、UXを考えると読込中の表示を行いたいです。
ローディングが無いと以下のように処理が止まってしまったように見えます。
ローディングがあれば処理中であることが明示できます。
そこでChainlitではこの実行中の表示を行うためにStep
という機能が用意されています。
Stepは2種類の呼び出し方法が用意されていて、一つは関数を用意し、デコレータで装飾する方法です。
@cl.step(type="tool")
async def tool():
# Simulate a running task
await cl.sleep(2)
return "Response from the tool!"
@cl.on_message
async def main(message: cl.Message):
# Call the tool
tool_res = await tool()
# Send the final answer.
await cl.Message(content="This is the final answer").send()
もう一つはwith
句を使って、処理を囲う方法です。
@cl.on_message
async def main(msg: cl.Message):
async with cl.Step(name="gpt4", type="llm") as step:
step.input = msg.content
stream = await client.chat.completions.create(
messages=[{"role": "user", "content": msg.content}],
stream=True,
model="gpt-4",
temperature=0,
)
async for part in stream:
delta = part.choices[0].delta
if delta.content:
# Stream the output of the step
await step.stream_token(delta.content)
どちらもStepの内側で長い処理を行うことを想定しており、
今回のように関数の呼び出しと、関数の呼び出し結果が別れている処理は想定していないようです。
※しっかりした方法があったら教えて下さいm(_ _)m
今回は仕方がないので、Stepクラスの __aenter__
と __aexit__
を直接使って処理を分けています。
これでeventの状態をみてローディング表示ができるようになりました。
event処理4・音楽の表示
すべての処理が完了すると最後に作成した音楽の表示が必要です。
以下のように直接ChatUI上に表示します。
AI AgentがローカルやCloud Runにいる場合はchunkサイズを気にしなければ、SSEで取得されるストリーム内に音楽を詰め込んでしまってeventといっしょに送り出します。
この処理は handle_inline_data
で行っています。
ただAgent Engineの場合は制限があるのか、ストリームに音楽データをいれるとエラーになります。
このため、Agentからは<artifact>artifact_id</artifact>
という形式のテキストで返却し、それがあれば改めてArtifactを取得するようにしています。
通常のAgent Engine(AdkApp)の場合はアーティファクトをAI Agentから取得する手段は無いため、少し特殊な方法でアーティファクトを取得しています。
AdkAppにアーティファクトを管理する関数が有ってもいいと思うのでこの辺りはissueが投げてあります。
以上でevent処理は完了です。
実行
Chainlitの実行はchainlit
コマンド経由で行います。
AI Agentもローカルで実行する場合は先にadk api-server
かadk web
でAI Agentを起動しておきます。
chainlit側はchainlit run main.py
で実行します。
まとめ
以上でChainlit + ADKの説明は終了です。
コード自体は試しながらだったので、もう少し綺麗にできそうですね...
ChainlitはWeb上だけでなく、SlackやDiscord、Teams、Copilotへの連携もコードの修正をすることなく、連携が可能です。
別サーバでAI Agentを実行する場合は少し考慮する点が増えますが、AI Agent向けのChat UIとしては非常に有力な選択肢ではないでしょうか。
次回はAgent Engineについて書きます。
Discussion