chainlit&LangGraphを用いたAIエージェント会話とMQTT通信
2025/08/11に大阪南森町のFabCafeで開催された「ITエンジニアのための個人開発LT会&展示会」」で紹介した「AIアクアリウム」の技術仕様です。
ここではAI側の実装方法を紹介します。
githubのリポジトリはこちら
1.最もシンプルなReActエージェント
LangGraphやchainlitの使い方は割愛します。
まずは最もシンプルなAIエージェントを作成し、chainlitで会話ができるのを確認します。
※ chainlitの公式の通りに書いても動かなかったので、2025年8月現在はこのコードが正しいです
[tool.poetry.dependencies]
python = "^3.13"
chainlit = "^2.6.8"
langgraph = "^0.6.4"
python-dotenv = "^1.1.1"
langchain-openai = "^0.3.29"
paho-mqtt = "^2.1.0"
src/app1.py
import chainlit as cl
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# .envからAPIキーを読み込む
load_dotenv()
# ツールの定義
@tool
def calculator(expression: str) -> float:
"""数式を計算します"""
try:
print(expression)
return eval(expression)
except Exception as e:
return str(e)
# LangGraphのReActエージェントを作成
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[calculator],
prompt="あなたは数学の専門家です。"
)
@cl.on_chat_start
async def on_chat_start():
cl.user_session.set("agent", agent)
@cl.on_message
async def on_message(message: cl.Message):
try:
# セッションIDを使用してスレッド管理
config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
# Chainlitのメッセージを作成
msg = cl.Message(content="")
# エージェントをストリーミング実行(カスタムデータも含む)
async for llm_token, metadata in agent.astream(
{"messages": [HumanMessage(content=message.content)]},
config=config,
stream_mode="messages" # https://langchain-ai.github.io/langgraph/how-tos/streaming/
):
# トークンとメタデータを正しく処理
if isinstance(llm_token, AIMessage) and llm_token.content:
if isinstance(llm_token.content, str):
await msg.stream_token(llm_token.content)
# メッセージを送信
await msg.send()
except Exception as e:
print(f"Error: {e}") # デバッグ用
error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
await error_msg.send()
2.ジェネレーターメッセージを順次表示する
最終的にMQTTにパブリッシュされたメッセージを順次chainlitに表示させるようにします。
この機能を実装するために、メッセージを順次表示させるよう実装しましょう
src/app2.py
import chainlit as cl
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer # https://langchain-ai.github.io/langgraph/how-tos/streaming/#tool-updates
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# .envからAPIキーを読み込む
load_dotenv()
# ツールの定義
@tool
def calculator(expression: str) -> float:
"""数式を計算します"""
try:
print(expression)
return eval(expression)
except Exception as e:
return str(e)
@tool
def stream_generator_message(expression: str):
"""1秒おきに現在時刻を取得します"""
import time
from datetime import datetime
def generator():
while True:
yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time.sleep(1)
print("stream_generator_messageが呼び出された")
writer = get_stream_writer()
for message in generator():
writer(f"現在時刻は{message}です")
# LangGraphのReActエージェントを作成
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[calculator, stream_generator_message],
prompt="あなたは数学の専門家です。"
)
@cl.on_chat_start
async def on_chat_start():
cl.user_session.set("agent", agent)
@cl.on_message
async def on_message(message: cl.Message):
try:
# セッションIDを使用してスレッド管理
config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
# Chainlitのメッセージを作成
msg = cl.Message(content="")
# エージェントをストリーミング実行(カスタムデータも含む)
async for stream_mode, chunk in agent.astream(
{"messages": [HumanMessage(content=message.content)]},
config=config,
stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
):
if stream_mode == "custom":
# カスタムデータ(ツールからのストリーミング)を処理
await msg.stream_token(str(chunk)+"\n")
elif stream_mode == "messages":
llm_token, metadata = chunk
# トークンとメタデータを正しく処理
if isinstance(llm_token, AIMessage) and llm_token.content:
if isinstance(llm_token.content, str):
await msg.stream_token(llm_token.content)
# メッセージを送信
await msg.send()
except Exception as e:
print(f"Error: {e}") # デバッグ用
error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
await error_msg.send()
ジェネレーターメッセージを受け取る部分は以下の通り作成しています。
(将来的には、これをMQTTブローカーからサブスクライブしたメッセージを受け取る機能に変更します)
from langgraph.config import get_stream_writer
@tool
def stream_generator_message(expression: str):
"""1秒おきに現在時刻を取得します"""
import time
from datetime import datetime
def generator():
while True:
yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time.sleep(1)
print("stream_generator_messageが呼び出された")
writer = get_stream_writer()
for message in generator():
writer(f"現在時刻は{message}です")
ポイントはlanggraphのget_stream_writerを用いることです。
これにより、agentがツールを呼び出した際にツール内のメッセージをstream_mode="custom"として呼び出すことができます。
@cl.on_messageのagent.astreamの部分に「stream_mode="custom"」を追加します。
@cl.on_message
async def on_message(message: cl.Message):
#・・・
async for stream_mode, chunk in agent.astream(
{"messages": [HumanMessage(content=message.content)]},
config=config,
stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
):
if stream_mode == "custom":
# カスタムデータ(ツールからのストリーミング)を処理
await msg.stream_token(str(chunk)+"\n")
elif stream_mode == "messages":
llm_token, metadata = chunk
# トークンとメタデータを正しく処理
if isinstance(llm_token, AIMessage) and llm_token.content:
if isinstance(llm_token.content, str):
await msg.stream_token(llm_token.content)
3.MQTTブローカーからメッセージをサブスクライブする
最後にMQTTに接続し、メッセージをサブスクライブする部分を実装します。
静止画だとわかりにくいですが、こんな感じでサブスクライブしたメッセージが順次表示されます。
src/app3.py
import os
from queue import Queue
import time
import threading
import chainlit as cl
from langgraph.prebuilt import create_react_agent
from langgraph.config import get_stream_writer # https://langchain-ai.github.io/langgraph/how-tos/streaming/#tool-updates
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
# .envからAPIキーを読み込む
load_dotenv()
#################
# MQTT周りの設定 #
#################
# shiftr.io 設定
MQTT_BROKER = str(os.getenv("MQTT_BROKER"))
MQTT_PORT = int(os.getenv("MQTT_PORT") or 1883)
MQTT_USERNAME = os.getenv("MQTT_USERNAME")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
print(MQTT_BROKER, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD)
if (MQTT_BROKER or MQTT_PORT or MQTT_USERNAME or MQTT_PASSWORD) is None:
raise ValueError("MQTT_BROKER, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD is not set")
message_queue: Queue[str] = Queue(100)
mqtt_initialized = False
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2, # type: ignore
client_id="chainlit-langgraph-client"
)
def mqtt_on_connect(client: mqtt.Client, userdata, flags, reason_code, properties):
"""MQTT接続時のコールバック関数"""
print("MQTT接続成功")
client.subscribe("respond/+/+")
# ここでは例として、1秒ごとに現在時刻をMQTTにパブリッシュする
def publish_message():
count = 0
while True:
client.publish("respond/device1/count", f"{count}")
count += 1
time.sleep(1)
thread = threading.Thread(target=publish_message)
thread.start()
def mqtt_on_message(client, userdata, msg):
"""MQTT メッセージ受信時のコールバック関数"""
topic = msg.topic
value = msg.payload.decode('utf-8')
print(f"MQTT受信: {topic} -> {value}")
message_queue.put(f"{topic} -> {value}")
# ツールの定義
@tool
def calculator(expression: str) -> float:
"""数式を計算します"""
try:
print(expression)
return eval(expression)
except Exception as e:
return str(e)
@tool
def stream_generator_message(expression: str):
"""1秒おきに現在時刻を取得します"""
import time
from datetime import datetime
def generator():
while True:
yield datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time.sleep(1)
print("stream_generator_messageが呼び出された")
writer = get_stream_writer()
for message in generator():
writer(f"現在時刻は{message}です")
@tool
def get_count():
"""カウント(count)を取得します"""
# このツールが呼び出されるまでにQueueに蓄積されたデータを削除する
while not message_queue.empty():
message_queue.get_nowait()
# このあと、Queueに蓄積されたデータを順次取得してストリーミングで返す
writer = get_stream_writer()
while True:
message = message_queue.get()
writer(message)
# LangGraphのReActエージェントを作成
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[calculator, stream_generator_message, get_count],
prompt="あなたは数学の専門家です。"
)
@cl.on_chat_start
async def on_chat_start():
cl.user_session.set("agent", agent)
"""Chainlit セッション開始時の処理"""
global client, mqtt_initialized
# MQTT接続が未初期化の場合のみ接続処理を実行
if not mqtt_initialized:
mqtt_initialized = True
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_message
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
try:
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
except Exception as e:
print(f"MQTT接続エラー: {e}")
mqtt_initialized = False # エラー時はフラグをリセット
welcome_msg = cl.Message(content="チャットエージェントが起動しました!\nhttps://electrium.cloud.shiftr.io/")
await welcome_msg.send()
@cl.on_message
async def on_message(message: cl.Message):
try:
# セッションIDを使用してスレッド管理
config = RunnableConfig(configurable={"thread_id": cl.context.session.id})
# Chainlitのメッセージを作成
msg = cl.Message(content="")
# エージェントをストリーミング実行(カスタムデータも含む)
async for stream_mode, chunk in agent.astream(
{"messages": [HumanMessage(content=message.content)]},
config=config,
stream_mode=["messages", "custom"] # https://langchain-ai.github.io/langgraph/how-tos/streaming/
):
if stream_mode == "custom":
# カスタムデータ(ツールからのストリーミング)を処理
await msg.stream_token(str(chunk)+"\n")
elif stream_mode == "messages":
llm_token, metadata = chunk
# トークンとメタデータを正しく処理
if isinstance(llm_token, AIMessage) and llm_token.content:
if isinstance(llm_token.content, str):
await msg.stream_token(llm_token.content)
# メッセージを送信
await msg.send()
except Exception as e:
print(f"Error: {e}") # デバッグ用
error_msg = cl.Message(content=f"エラーが発生しました: {str(e)}")
await error_msg.send()
MQTTクライアントライブラリとMQTTブローカーの設定
pythonでMQTT通信を実装するには、paho-mqttというライブラリを使います。
pip install paho-mqtt
または
poetry add paho-mqtt
MQTTブローカーとして、今回はshiftr.ioを使っています。
使い方については、昨年テックシーカーハッカソンで教えてもらった時の記事を参照ください
自力で実装するならmosquitoを用いても良いのですが、shiftr.ioも無料枠ありますし、遊び程度なら無料枠でも十分です
(無料枠は1日6時間しか使えないという制限があるので、私は月7ドル課金しています。)
クライアントライブラリの接続の仕方は以下の通りです。
サンプルコードでは、これをchainlitに組み込んでいます。
# クライアントのインスタンスを作成
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2, # type: ignore
client_id="chainlit-langgraph-client"
)
# 接続開始時や受信時のコールバックを登録、ユーザー名を登録
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_message
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# MQTTに接続
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
サブスクライブ設定
先ほど示した接続開始時および受信時のコールバック関数に、MQTTサブスクライブに関する設定を入れます。
MQTT通信では、トピックに名前空間を使うことでサブスクライブするメッセージを選択することができます。
展示会用資料で示したように、今回のアプリケーションにおいてはデバイスからの返答を購読したいので"respond/+/+"としています。
# 購読対象
respond/device1/temperature
respond/device2/light
# これは購読しない
record/device1/temperature
一点注意としては、client.subscribe()はclientがMQTTに接続された後で設定することです。
なので、mqtt_on_connectの中に入れています
def mqtt_on_connect(client: mqtt.Client, userdata, flags, reason_code, properties):
"""MQTT接続時のコールバック関数"""
print("MQTT接続成功")
client.subscribe("respond/+/+")
サブスクライブしたメッセージをchainlitでストリーミング表示する
さて、ユーザーが「device1の現在温度を取得して」というメッセージをエージェントに投げたとき、エージェントはget_temperature
ツールを起動して、サブスクライブしたデータをリアルタイムに表示させたいです。
pahoのクライアントライブラリでは、MQTT接続時にサブスクライブした時のコールバック関数を渡す必要がありますので、@toolにサブスクライブした時の関数を設定することはできません。
そこでQueueを用いて、サブスクライブしたメッセージは片っ端からQueueに詰め込んでいき、ツールが呼び出されたらQueueに詰め込まれたメッセージをストリーム表示するようにします。
message_queue: Queue[str] = Queue(100)
# MQTT メッセージ受信時のコールバック関数。受け取ったメッセージを片っ端からQueueに詰め込んでいく
def mqtt_on_message(client, userdata, msg):
topic = msg.topic
value = msg.payload.decode('utf-8')
print(f"MQTT受信: {topic} -> {value}")
message_queue.put(f"{topic} -> {value}")
# AIエージェントに呼び出される関数。Queueに詰め込まれたメッセージをストリーミングで表示する
@tool
def get_count():
"""カウント(count)を取得します"""
# このツールが呼び出されるまでにQueueに蓄積されたデータを削除する
while not message_queue.empty():
message_queue.get_nowait()
# このあと、Queueに蓄積されたデータを順次取得してストリーミングで返す
writer = get_stream_writer()
while True:
message = message_queue.get()
writer(message)
なお、この仕様ではユーザーが止めない限り延々と購読したメッセージを表示し続けますので、止めるにはchainlitで停止ボタンを押す必要があります
終わりに
今回はchainlitとlanggraphを使って、シンプルなAIエージェントを実装しました。
LangGraphはマルチエージェントの実装や会話を記憶するためのLangMemの追加を簡単にできます。
多少学習コストは高いですが、AIエージェント全盛の現在、学んでおいて損はないと思います。
Discussion