🌟

chainlit&LangGraphを用いたAIエージェント会話とMQTT通信

に公開

2025/08/11に大阪南森町のFabCafeで開催された「ITエンジニアのための個人開発LT会&展示会」」で紹介した「AIアクアリウム」の技術仕様です。

https://orion.connpass.com/event/354726/

ここではAI側の実装方法を紹介します。

githubのリポジトリはこちら
https://github.com/manusa3190/chain_sample

1.最もシンプルなReActエージェント

LangGraphやchainlitの使い方は割愛します。
まずは最もシンプルなAIエージェントを作成し、chainlitで会話ができるのを確認します。

※ chainlitの公式の通りに書いても動かなかったので、2025年8月現在はこのコードが正しいです

[tool.poetry.dependencies]
python = "^3.13"
chainlit = "^2.6.8"
langgraph = "^0.6.4"
python-dotenv = "^1.1.1"
langchain-openai = "^0.3.29"
paho-mqtt = "^2.1.0"
src/app1.py
import chainlit as cl

from langgraph.prebuilt import create_react_agent

from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig

from langchain_openai import ChatOpenAI

from dotenv import load_dotenv

# .envからAPIキーを読み込む
load_dotenv()

# ツールの定義
@tool
def calculator(expression: str) -> float:
    """数式を計算します"""
    try:
        print(expression)
        return eval(expression)
    except Exception as e:
        return str(e)

# LangGraphのReActエージェントを作成
agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o-mini"),
    tools=[calculator],
    prompt="あなたは数学の専門家です。"
)

@cl.on_chat_start
async def on_chat_start():
    cl.user_session.set("agent", agent)

@cl.on_message
async def on_message(message: cl.Message):
    try:
        # セッションIDを使用してスレッド管理
        config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
        
        # Chainlitのメッセージを作成
        msg = cl.Message(content="")
        
        # エージェントをストリーミング実行(カスタムデータも含む)
        async for llm_token, metadata in agent.astream(
            {"messages": [HumanMessage(content=message.content)]}, 
            config=config,
            stream_mode="messages" # https://langchain-ai.github.io/langgraph/how-tos/streaming/
        ):
            # トークンとメタデータを正しく処理
            if isinstance(llm_token, AIMessage) and llm_token.content:
                if isinstance(llm_token.content, str):
                    await msg.stream_token(llm_token.content)
        
        # メッセージを送信
        await msg.send()
        
    except Exception as e:
        print(f"Error: {e}")  # デバッグ用
        error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
        await error_msg.send()

2.ジェネレーターメッセージを順次表示する

最終的にMQTTにパブリッシュされたメッセージを順次chainlitに表示させるようにします。
この機能を実装するために、メッセージを順次表示させるよう実装しましょう

src/app2.py
import chainlit as cl

from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer # https://langchain-ai.github.io/langgraph/how-tos/streaming/#tool-updates

from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig

from langchain_openai import ChatOpenAI

from dotenv import load_dotenv

# .envからAPIキーを読み込む
load_dotenv()

# ツールの定義
@tool
def calculator(expression: str) -> float:
    """数式を計算します"""
    try:
        print(expression)
        return eval(expression)
    except Exception as e:
        return str(e)

@tool
def stream_generator_message(expression: str):
    """1秒おきに現在時刻を取得します"""
    import time
    from datetime import datetime

    def generator():
        while True:
            yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            time.sleep(1)


    print("stream_generator_messageが呼び出された")
    writer = get_stream_writer()
    for message in generator():
        writer(f"現在時刻は{message}です")

# LangGraphのReActエージェントを作成
agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o-mini"),
    tools=[calculator, stream_generator_message],
    prompt="あなたは数学の専門家です。"
)

@cl.on_chat_start
async def on_chat_start():
    cl.user_session.set("agent", agent)

@cl.on_message
async def on_message(message: cl.Message):
    try:
        # セッションIDを使用してスレッド管理
        config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
        
        # Chainlitのメッセージを作成
        msg = cl.Message(content="")
        
        # エージェントをストリーミング実行(カスタムデータも含む)
        async for stream_mode, chunk in agent.astream(
            {"messages": [HumanMessage(content=message.content)]}, 
            config=config,
            stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
        ):
            if stream_mode == "custom":
                # カスタムデータ(ツールからのストリーミング)を処理
                await msg.stream_token(str(chunk)+"\n")

            elif stream_mode == "messages":
                llm_token, metadata = chunk
                # トークンとメタデータを正しく処理
                if isinstance(llm_token, AIMessage) and llm_token.content:
                    if isinstance(llm_token.content, str):
                        await msg.stream_token(llm_token.content)
        
        # メッセージを送信
        await msg.send()
        
    except Exception as e:
        print(f"Error: {e}")  # デバッグ用
        error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
        await error_msg.send()

ジェネレーターメッセージを受け取る部分は以下の通り作成しています。
(将来的には、これをMQTTブローカーからサブスクライブしたメッセージを受け取る機能に変更します)

from langgraph.config import get_stream_writer

@tool
def stream_generator_message(expression: str):
    """1秒おきに現在時刻を取得します"""
    import time
    from datetime import datetime

    def generator():
        while True:
            yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            time.sleep(1)


    print("stream_generator_messageが呼び出された")
    writer = get_stream_writer()
    for message in generator():
        writer(f"現在時刻は{message}です")

ポイントはlanggraphのget_stream_writerを用いることです。

これにより、agentがツールを呼び出した際にツール内のメッセージをstream_mode="custom"として呼び出すことができます。

@cl.on_messageのagent.astreamの部分に「stream_mode="custom"」を追加します。

@cl.on_message
async def on_message(message: cl.Message):
    #・・・
        async for stream_mode, chunk in agent.astream(
            {"messages": [HumanMessage(content=message.content)]}, 
            config=config,
            stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
        ):
            if stream_mode == "custom":
                # カスタムデータ(ツールからのストリーミング)を処理
                await msg.stream_token(str(chunk)+"\n")

            elif stream_mode == "messages":
                llm_token, metadata = chunk
                # トークンとメタデータを正しく処理
                if isinstance(llm_token, AIMessage) and llm_token.content:
                    if isinstance(llm_token.content, str):
                        await msg.stream_token(llm_token.content)

3.MQTTブローカーからメッセージをサブスクライブする

最後にMQTTに接続し、メッセージをサブスクライブする部分を実装します。

静止画だとわかりにくいですが、こんな感じでサブスクライブしたメッセージが順次表示されます。

src/app3.py
import os
from queue import Queue
import time
import threading

import chainlit as cl

from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer # https://langchain-ai.github.io/langgraph/how-tos/streaming/#tool-updates

from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig

from langchain_openai import ChatOpenAI

import paho.mqtt.client as mqtt
from dotenv import load_dotenv

# .envからAPIキーを読み込む
load_dotenv()

#################
# MQTT周りの設定  #
#################

# shiftr.io 設定
MQTT_BROKER = str(os.getenv("MQTT_BROKER"))
MQTT_PORT = int(os.getenv("MQTT_PORT") or 1883)
MQTT_USERNAME = os.getenv("MQTT_USERNAME")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
print(MQTT_BROKER, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD)
if (MQTT_BROKER or MQTT_PORT or MQTT_USERNAME or MQTT_PASSWORD) is None:
    raise ValueError("MQTT_BROKER, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD is not set")


message_queue: Queue[str] = Queue(100)
mqtt_initialized = False

client = mqtt.Client(
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2, # type: ignore
    client_id="chainlit-langgraph-client"
)


def mqtt_on_connect(client: mqtt.Client, userdata, flags, reason_code, properties):
    """MQTT接続時のコールバック関数"""
    print("MQTT接続成功")
    client.subscribe("respond/+/+")

    # ここでは例として、1秒ごとに現在時刻をMQTTにパブリッシュする
    def publish_message():
        count = 0
        while True:
            client.publish("respond/device1/count", f"{count}")
            count += 1
            time.sleep(1)

    thread = threading.Thread(target=publish_message)
    thread.start()

def mqtt_on_message(client, userdata, msg):
    """MQTT メッセージ受信時のコールバック関数"""
    topic = msg.topic
    value = msg.payload.decode('utf-8')
    print(f"MQTT受信: {topic} -> {value}")
    message_queue.put(f"{topic} -> {value}")

# ツールの定義
@tool
def calculator(expression: str) -> float:
    """数式を計算します"""
    try:
        print(expression)
        return eval(expression)
    except Exception as e:
        return str(e)

@tool
def stream_generator_message(expression: str):
    """1秒おきに現在時刻を取得します"""
    import time
    from datetime import datetime

    def generator():
        while True:
            yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            time.sleep(1)


    print("stream_generator_messageが呼び出された")
    writer = get_stream_writer()
    for message in generator():
        writer(f"現在時刻は{message}です")

@tool
def get_count():
    """カウント(count)を取得します"""
    # このツールが呼び出されるまでにQueueに蓄積されたデータを削除する
    while not message_queue.empty():
        message_queue.get_nowait()

    # このあと、Queueに蓄積されたデータを順次取得してストリーミングで返す
    writer = get_stream_writer()
    while True:
        message = message_queue.get()
        writer(message)

# LangGraphのReActエージェントを作成
agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o-mini"),
    tools=[calculator, stream_generator_message, get_count],
    prompt="あなたは数学の専門家です。"
)

@cl.on_chat_start
async def on_chat_start():
    cl.user_session.set("agent", agent)

    """Chainlit セッション開始時の処理"""
    global client, mqtt_initialized
    
    # MQTT接続が未初期化の場合のみ接続処理を実行
    if not mqtt_initialized:
        mqtt_initialized = True
        
        client.on_connect = mqtt_on_connect
        client.on_message = mqtt_on_message
        client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
        
        try:
            client.connect(MQTT_BROKER, MQTT_PORT, 60)
            client.loop_start()
        except Exception as e:
            print(f"MQTT接続エラー: {e}")
            mqtt_initialized = False  # エラー時はフラグをリセット
        
    welcome_msg = cl.Message(content="チャットエージェントが起動しました!\nhttps://electrium.cloud.shiftr.io/")
    await welcome_msg.send()

@cl.on_message
async def on_message(message: cl.Message):
    try:
        # セッションIDを使用してスレッド管理
        config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
        
        # Chainlitのメッセージを作成
        msg = cl.Message(content="")
        
        # エージェントをストリーミング実行(カスタムデータも含む)
        async for stream_mode, chunk in agent.astream(
            {"messages": [HumanMessage(content=message.content)]}, 
            config=config,
            stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
        ):
            if stream_mode == "custom":
                # カスタムデータ(ツールからのストリーミング)を処理
                await msg.stream_token(str(chunk)+"\n")

            elif stream_mode == "messages":
                llm_token, metadata = chunk
                # トークンとメタデータを正しく処理
                if isinstance(llm_token, AIMessage) and llm_token.content:
                    if isinstance(llm_token.content, str):
                        await msg.stream_token(llm_token.content)
        
        # メッセージを送信
        await msg.send()
        
    except Exception as e:
        print(f"Error: {e}")  # デバッグ用
        error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
        await error_msg.send()

MQTTクライアントライブラリとMQTTブローカーの設定

pythonでMQTT通信を実装するには、paho-mqttというライブラリを使います。

pip install paho-mqtt
または
poetry add paho-mqtt

MQTTブローカーとして、今回はshiftr.ioを使っています。
使い方については、昨年テックシーカーハッカソンで教えてもらった時の記事を参照ください
https://zenn.dev/yuta_enginner/articles/4d5c0ab1c2163d

自力で実装するならmosquitoを用いても良いのですが、shiftr.ioも無料枠ありますし、遊び程度なら無料枠でも十分です
(無料枠は1日6時間しか使えないという制限があるので、私は月7ドル課金しています。)

クライアントライブラリの接続の仕方は以下の通りです。
サンプルコードでは、これをchainlitに組み込んでいます。

# クライアントのインスタンスを作成
client = mqtt.Client(
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2, # type: ignore
    client_id="chainlit-langgraph-client"
)

# 接続開始時や受信時のコールバックを登録、ユーザー名を登録
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_message
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)

# MQTTに接続
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()

サブスクライブ設定

先ほど示した接続開始時および受信時のコールバック関数に、MQTTサブスクライブに関する設定を入れます。

MQTT通信では、トピックに名前空間を使うことでサブスクライブするメッセージを選択することができます。
展示会用資料で示したように、今回のアプリケーションにおいてはデバイスからの返答を購読したいので"respond/+/+"としています。

# 購読対象
respond/device1/temperature
respond/device2/light

# これは購読しない
record/device1/temperature

一点注意としては、client.subscribe()はclientがMQTTに接続された後で設定することです。
なので、mqtt_on_connectの中に入れています

def mqtt_on_connect(client: mqtt.Client, userdata, flags, reason_code, properties):
    """MQTT接続時のコールバック関数"""
    print("MQTT接続成功")
    client.subscribe("respond/+/+")

サブスクライブしたメッセージをchainlitでストリーミング表示する

さて、ユーザーが「device1の現在温度を取得して」というメッセージをエージェントに投げたとき、エージェントはget_temperatureツールを起動して、サブスクライブしたデータをリアルタイムに表示させたいです。

pahoのクライアントライブラリでは、MQTT接続時にサブスクライブした時のコールバック関数を渡す必要がありますので、@toolにサブスクライブした時の関数を設定することはできません。

そこでQueueを用いて、サブスクライブしたメッセージは片っ端からQueueに詰め込んでいき、ツールが呼び出されたらQueueに詰め込まれたメッセージをストリーム表示するようにします。

message_queue: Queue[str] = Queue(100)

# MQTT メッセージ受信時のコールバック関数。受け取ったメッセージを片っ端からQueueに詰め込んでいく
def mqtt_on_message(client, userdata, msg):
    topic = msg.topic
    value = msg.payload.decode('utf-8')
    print(f"MQTT受信: {topic} -> {value}")
    message_queue.put(f"{topic} -> {value}")


# AIエージェントに呼び出される関数。Queueに詰め込まれたメッセージをストリーミングで表示する
@tool
def get_count():
    """カウント(count)を取得します"""
    # このツールが呼び出されるまでにQueueに蓄積されたデータを削除する
    while not message_queue.empty():
        message_queue.get_nowait()

    # このあと、Queueに蓄積されたデータを順次取得してストリーミングで返す
    writer = get_stream_writer()
    while True:
        message = message_queue.get()
        writer(message)

なお、この仕様ではユーザーが止めない限り延々と購読したメッセージを表示し続けますので、止めるにはchainlitで停止ボタンを押す必要があります

終わりに

今回はchainlitとlanggraphを使って、シンプルなAIエージェントを実装しました。
LangGraphはマルチエージェントの実装や会話を記憶するためのLangMemの追加を簡単にできます。

多少学習コストは高いですが、AIエージェント全盛の現在、学んでおいて損はないと思います。

Discussion