💬

[実践ADK] ADKとLyriaとChainlitで音楽生成エージェント - ② Chainlitを使ったADK向けのChatUI

に公開

こんにちは、サントリーこと大橋です。
前回に引き続き、ADKとLyriaとChainlitで音楽生成エージェントを作成し、Agent EngineとCloud Runにデプロイして利用してみたいと思います。

前回の記事は以下です。構成や、ADKで作成したAI Agentについては前回の記事を参照してください。

https://zenn.dev/soundtricker/articles/fba90dc901ab46

今回はADKの開発UIではなくChainlitを利用したChatUIを作成しローカルでAI AgentとChatUIを動かしてみようと思います。

Chainlitを使ったChatUI

Chainlitとは

https://docs.chainlit.io/get-started/overview

Chainlitは対話型AIアプリケーションを効率的に開発するためのオープンソースPythonフレームワークです。Pythonコードと組み合わせることで、直感的でリッチなユーザーインターフェースを素早く構築できるだけでなく、AIの推論過程の可視化や、ユーザーからのフィードバック収集といった機能も備えています。これにより、誰もが簡単に高品質な対話型AIを構築・運用できる環境を提供します。

他のフレームワークとの比較、なぜStreamlitを使わない?

Chainlitと同様のフレームワークとして、Streamlitが有名ですね。
StreamlitはChainlitよりも汎用的なユーザーインターフェース構築フレームワークで、データサイエンスや機械学習のプロトタイプ作成、ダッシュボード構築、簡易的なWebアプリ開発に非常に強みを発揮します。豊富なウィジェットと柔軟なレイアウト機能により、様々なデータを視覚化し、インタラクティブな操作を可能にします。汎用性が高く、多様なWebアプリケーションに対応できますが、対話型UIの構築にはある程度のカスタム実装が求められます。

またStreamlitは画面を開いている限り常にバックエンドと通信するため、Cloud Runで動かすと、ユーザーが画面を閉じてくれない限り0スケールができずお金がかかってしまうという噂があります。
(筆者未検証)

https://discuss.streamlit.io/t/streamlit-app-on-google-cloud-run-is-expensive/38085

なお他に同様のフレームワークとしてGradioがあります。
GradioもStreamlitとほぼ同様なユーザーインターフェース構築フレームワークで、機械学習分野のプロトタイプとして、有名です。

Gradioについては、ADKとFirestore、Gradioを利用したコードラボがGoogle Codelabにあるので、試してみたい方はそちらを参照してみてください。

https://codelabs.developers.google.com/personal-expense-assistant-multimodal-adk?hl=ja#1

今回はChatUIのみがほしいので、Chainlitを用いてUIを構築していきます。

ADK向けのChainlitのコード

以下が今回作成したADK向けのChainlitのメッセージ処理のコードです。
LangChainやOpenAI向けにはインテグレーションがあるのですが、ADKには無いため、自分が行いたい処理に合わせてある程度カスタマイズが必要です。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py

ChainlitとAI Agent ~ リモート or インサイド

前回の記事にも構成を書きましたが、今回のシステム構成では、Chainlitを動かしているサーバーと、AI Agentが動くサーバーが異なります。

Chainlitの各種Exampleを見ると、基本的にはChainlitと同じコード内でAI Agentを動かすことを想定しているように見えます。
ADKで言えば、adk webadk api-serverのようなサーバーを起動すし、APIとしてAI Agentを呼び出すのではなく、直接runner.run_asyncを用いて、AI Agentを動かすほうがChainlitとしては標準的な実装に見えます。

このあたりが、いくつかコード上の不調和を生んでしまっている部分があるのでご注意ください。

なお本記事ではChainlitの初歩的な内容については説明しません。 他の記事や公式ドキュメントを参照してください。

ADKの呼び出し

今回ADKで実装されたAI Agentは別サーバ上にいることを想定しています。
ローカルの場合はadk api-serverで実行するので、requests経由でAPIを実行し、
Agent Engineの場合はVertex AI SDKの agentengine.get(resource_id)で取得したインスタンス経由でAI Agentを実行します。
ローカルとAgent Engineの実装を切り替えられるようにラッパークラスを作成します。

まずは抽象クラスです。 sessionの管理メソッド群と、stream_queryasync_stream_queryを定義しています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/chatui/services/chat_api.py#L19-L51

そしてローカル(などのAPI経由)の実装クラスです。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/chatui/services/chat_api.py#L85-L165

stream_queryasync_stream_queryはSSEで実行されるためSSEClientAsyncSSEClientを利用します。
AsyncSSEClientread_bufsizeがとてつもなく大きいのはストリームで音楽データがまるっと来るため、チャンクサイズが大きくないとエラーが発生してしまうためです。

次にAgent Engineを利用した実装クラスです。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/chatui/services/chat_api.py#L54-L82

こちらはAgent Engineのインスタンスのラッパーとなります。
通常のAdkAppでは存在しないload_artifactstream_query_sseasync_stream_query_sseというメソッドを呼び出していますが、これは次回の記事で説明したいと思います。

最後にこれらのクラスを呼び分ける関数です。
環境変数に設定された BACKEND_TYPE を参照して、作成するクラスを振り分けていいます。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/chatui/services/chat_api.py#L187-L193

event処理(on_message)

ADKを利用した場合、別サーバー上にいるAgentからAPI経由で返却される値はfrom google.adk.events.Eventのdictです。

なので、これに合わせて処理をする必要があります。

上記で作成したChatAPIというクラス経由でasync_stream_queryを呼び出します。
async_stream_queryはAsyncIteratorを返却するので、async for inを利用して、Eventを取得します。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/chatui/services/chat_api.py#L187-L193

event処理1・エラー処理

前半はEventではないレスポンスが返却された場合のエラー処理をしています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L174-L186

Agent Engineでクオータエラーになった場合などでこの処理を使うことが比較的多いです。

event処理2・ストリーム処理(partial=True)

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L93-L116

GeminiなどのLLMではストリーム形式のレスポンスをサポートしています。
ストリームモードを用いた場合、返却されるメッセージは一部分ずつ連続的にストリームで返却されます。
例えばLLMから「こんにちは」と返却される場合、「こ」「ん」「に」「ち」「わ」とバラバラなEventがストリームとして連続的に返却されます。
この部分的な返却が行われる場合、メッセージが部分的であることを記すためにevent.partialがtrueになります。

今回のコードでもevent.partialがtrueの場合にはstream用の処理を行っています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L187-L190

またChainlit側にもこのようなストリームで返却されたメッセージを扱うための関数 cl.Message().stream_token(msg)があります。
上記のような部分的なメッセージが返却される場合はこの関数を利用します。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L93-L116

この関数を利用することで、以下のようにAI Agentからのレスポンスを連続的に表示することができUXが向上します。(一気に表示とかなり待った印象になりUXがあまり良くないです。)

以下がストリームモードでない場合です。

ストリームモードの場合

ストリームモードのほうが途中だとしてもレスポンス自体があるため速く感じます。

なおこのevent.partialは、trueできていたメッセージが、最後にevent.partialがfalseになると全文改めて送信されます。たまにpartial=true無しでいきなりpartial=falseで来ることがある気がしていて、このあたりの扱いがちょっとむずい印象です。

event処理3・ツール呼び出し(function_call & function_response)

eventにfunction_callがある場合はこれからTool Callingを実施しようとしているということです。
eventにfunction_responseがある場合はTool Callingの実行が完了しその結果が含まれています。

呼び出す処理によりけりですが、この処理は比較的長い処理になる場合が多いため、UXを考えると読込中の表示を行いたいです。

ローディングが無いと以下のように処理が止まってしまったように見えます。

ローディングがあれば処理中であることが明示できます。

そこでChainlitではこの実行中の表示を行うためにStepという機能が用意されています。
https://docs.chainlit.io/concepts/step

Stepは2種類の呼び出し方法が用意されていて、一つは関数を用意し、デコレータで装飾する方法です。

@cl.step(type="tool")
async def tool():
    # Simulate a running task
    await cl.sleep(2)

    return "Response from the tool!"

@cl.on_message
async def main(message: cl.Message):
    # Call the tool
    tool_res = await tool()

    # Send the final answer.
    await cl.Message(content="This is the final answer").send()

もう一つはwith句を使って、処理を囲う方法です。

@cl.on_message
async def main(msg: cl.Message):

    async with cl.Step(name="gpt4", type="llm") as step:
        step.input = msg.content

        stream = await client.chat.completions.create(
            messages=[{"role": "user", "content": msg.content}],
            stream=True,
            model="gpt-4",
            temperature=0,
        )

        async for part in stream:
            delta = part.choices[0].delta
            if delta.content:
                # Stream the output of the step
                await step.stream_token(delta.content)

どちらもStepの内側で長い処理を行うことを想定しており、
今回のように関数の呼び出しと、関数の呼び出し結果が別れている処理は想定していないようです。
※しっかりした方法があったら教えて下さいm(_ _)m

今回は仕方がないので、Stepクラスの __aenter____aexit__ を直接使って処理を分けています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L118-L129

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L131-L137

これでeventの状態をみてローディング表示ができるようになりました。

event処理4・音楽の表示

すべての処理が完了すると最後に作成した音楽の表示が必要です。

以下のように直接ChatUI上に表示します。

AI AgentがローカルやCloud Runにいる場合はchunkサイズを気にしなければ、SSEで取得されるストリーム内に音楽を詰め込んでしまってeventといっしょに送り出します。
この処理は handle_inline_data で行っています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L139-L150

ただAgent Engineの場合は制限があるのか、ストリームに音楽データをいれるとエラーになります。
このため、Agentからは<artifact>artifact_id</artifact>という形式のテキストで返却し、それがあれば改めてArtifactを取得するようにしています。

https://github.com/soundTricker/composer-agent/blob/main/apps/chatui/main.py#L71-L91

通常のAgent Engine(AdkApp)の場合はアーティファクトをAI Agentから取得する手段は無いため、少し特殊な方法でアーティファクトを取得しています。
AdkAppにアーティファクトを管理する関数が有ってもいいと思うのでこの辺りはissueが投げてあります。

https://github.com/googleapis/python-aiplatform/issues/5352

以上でevent処理は完了です。

実行

Chainlitの実行はchainlitコマンド経由で行います。
AI Agentもローカルで実行する場合は先にadk api-serveradk webでAI Agentを起動しておきます。
chainlit側はchainlit run main.pyで実行します。

まとめ

以上でChainlit + ADKの説明は終了です。
コード自体は試しながらだったので、もう少し綺麗にできそうですね...

ChainlitはWeb上だけでなく、SlackやDiscord、Teams、Copilotへの連携もコードの修正をすることなく、連携が可能です。
別サーバでAI Agentを実行する場合は少し考慮する点が増えますが、AI Agent向けのChat UIとしては非常に有力な選択肢ではないでしょうか。

次回はAgent Engineについて書きます。

https://zenn.dev/soundtricker/articles/25324dfd702883

Discussion