SlackアプリとAmazon Bedrockによるストリーミングレスポンスの実装
1. はじめに
システムゼウスの杉山です。
前回の記事では、SlackアプリとAmazon Bedrock Agentを連携させる基本的な実装を行いました。今回は、ユーザー体験を向上させるために、Bedrockからのレスポンスをストリーミングで受け取り、リアルタイムにSlackに表示する実装を行います。
前回の記事
前提条件
- 前回の記事で実装したSlackアプリとLambda関数が動作していること
- AWS Lambda関数URLが設定済みであること
2. 実装の概要
本実装では、スラッシュコマンドでの質問に対して非同期で回答を生成します。また、エージェントは使用しませんが、前回の記事でエージェントが実行していたLambda使用します。
処理の流れは以下のようになります。
- ユーザーがスラッシュコマンドで質問を送信
- 処理受付Lambdaが質問を受け付け、SQSにメッセージを送信
- SQSへのメッセージ到着をトリガーとして、処理実行Lambdaが起動し以下の処理を実行
・knowledgebaseの検索
・Bedrockによる回答生成
・Slackへの回答送信(chat.update APIによるストリーミング形式)
この構成により、長時間の処理をバックグラウンドで実行しつつ、ユーザーには生成過程が見えるようになります。
Amzon SQSとは↓
3. リソースの作成
まずは今回使用するリソースの作成と設定を行っていきます。
3.1 SQSの作成
- Amazon SQSコンソールで「キューの作成」をクリック
- 標準キューを選択
- キュー名を設定(例:
slackAppSQS
) - 可視性タイムアウトを1分にして作成
3.2 処理受付用Lambda作成
- 関数の作成
- デフォルトのまま新規作成
- 関数名は任意(例:
slack-command-receiver
) - ランタイムは「Node.js 22.x」を選択
- 関数URLの有効化
- 認証タイプは「NONE」を選択
- IAMロールの設定
- SQSの送信権限が必要
Lambdaを作成した後に設定からデフォルトで作成されたロールの編集画面に移動し、既にアタッチされているポリシーに以下の内容を追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sqs:SendMessage",
"Resource": "作成したSQSのARN"
}
]
}
以下の画像のようになれば成功です。
3.3 IAMロールの再設定
ストリーミングレスポンスを実装するために、前回作成した処理実行Lambdaの権限を増やしてあげる必要があります。
Lambdaを作成した際に自動で作成されたロールをIAMで選択し、
許可を追加からポリシーをアタッチを選択
一覧の中から「AWSLambda_FullAccess」と「AWSLambdaSQSQueueExecutionRole」を選択して右下の許可を追加を選択してください。
以下の権限があることを確認できたらOK
3.4 処理実行用Lambdaのトリガー設定
前回エージェントの呼び出しを行っていたLambdaを今回処理実行Lambdaとして使用していきます。
- 処理実行Lambdaの設定タブからトリガーの追加で「SQS」を選択
- 先ほど作成したSQSのARNを指定
- 詳細設定:
- バッチサイズ: 1(メッセージを1件ずつ処理)
- バッチウィンドウ: 0秒(即時処理)
5. Lambda関数の設定
5.1 処理受付Lambda環境変数の追加
認証処理とSQSへのキューイングの為に以下の環境変数を追加する必要があります。
- SLACK_SIGNING_SECRET: 作成したSlackAppのBasic InformationにあるSigningSecret
- SQS_QUEUE_URL: 作成したSQSのURL
以下の表示になっていれば大丈夫です。
5.2 処理実行lambda環境変数の追加
新たにLAMBDA_FUNCTION_NAMEという環境変数を追加していきます。
以下の記事でエージェントが実行する検索用Lambdaのarnをコピーして貼り付けましょう。
参考記事↓
他の環境変数は削除しても構いません。
貼り付けたら保存して以下の画像のように環境変数があればOK
5.3 前回使用した関数URLの削除
不要になった関数URLを削除していきます。
- 処理実行Lambdaの設定画面から関数URLを選択
- 右上の削除から削除を行ってください。
5.4 処理受付Lambdaの実装
以下のコードを処理受付Lambdaのコードエディタに貼り付けてデプロイ
処理受付Lambdaソースコード
import crypto from 'crypto';
import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs';
const sqsClient = new SQSClient({ region: 'ap-northeast-1' });
// Slackリクエストの署名を検証する関数
function verifyRequestSignature(event) {
const signature = event.headers['x-slack-signature'];
const timestamp = Number(event.headers['x-slack-request-timestamp']);
const now = Math.floor(Date.now() / 1000);
if(!timestamp || Math.abs(now - timestamp) > 300) {
throw new Error('Request is too old');
}
// Base64デコード
const decodedBody = Buffer.from(event.body, 'base64').toString('utf-8');
const signatureBaseString = `v0:${timestamp}:${decodedBody}`;
const slackSigningSecret = process.env.SLACK_SIGNING_SECRET;
if(!slackSigningSecret) {
throw new Error('SLACK_SIGNING_SECRET is not defined');
}
const mySignature = `v0=${
crypto.createHmac('sha256', slackSigningSecret)
.update(signatureBaseString)
.digest('hex')}`;
if(mySignature !== signature) {
throw new Error('Invalid signature');
}
}
export const handler = async event => {
try {
// リクエストの署名を検証
verifyRequestSignature(event);
// Base64デコードとパース
const decodedBody = Buffer.from(event.body, 'base64').toString('utf-8');
const params = new URLSearchParams(decodedBody);
console.log('Params:', JSON.stringify(Object.fromEntries(params)));
// パラメータを取得
const command = params.get('command');
const text = params.get('text');
const response_url = params.get('response_url');
const channel_id = params.get('channel_id');
// コマンドの確認
if(command !== '/chat') {
return {
statusCode: 200,
body : JSON.stringify({
response_type: 'ephemeral',
text : '無効なコマンドです'
})
};
}
// テキストが空の場合のチェック
if(!text || text.trim() === '') {
return {
statusCode: 200,
body : JSON.stringify({
response_type: 'ephemeral',
text : '質問を入力してください'
})
};
}
// SQSにメッセージを送信
const sqsCommand = new SendMessageCommand({
QueueUrl : process.env.SQS_QUEUE_URL,
MessageBody: JSON.stringify({
text: text.trim(),
response_url,
channel_id
})
});
await sqsClient.send(sqsCommand);
// Slackに即時レスポンスを返す
return {
statusCode: 200,
body : JSON.stringify({
response_type: 'in_channel',
text : ''
})
};
} catch (error) {
console.error('Error:', error);
return {
statusCode: 200,
body : JSON.stringify({
response_type: 'ephemeral',
text : 'エラーが発生しました。時間をおいて再度お試しください。'
})
};
}
};
5.5 処理実行Lambdaの実装
処理実行Lambdaのコードエディタから以下のコードを張り付けてデプロイ
処理実行Lambdaソースコード
import { BedrockRuntimeClient, InvokeModelWithResponseStreamCommand } from '@aws-sdk/client-bedrock-runtime';
import { InvokeCommand, LambdaClient } from '@aws-sdk/client-lambda';
const lambdaClient = new LambdaClient({ region: 'ap-northeast-1' });
async function invokeLambda(functionName, payload) {
try {
const command = new InvokeCommand({
FunctionName : functionName,
InvocationType: 'RequestResponse', // 同期呼び出し
Payload : JSON.stringify(payload)
});
const response = await lambdaClient.send(command);
// レスポンスのペイロードをパース
if(response.Payload) {
const responseData = new TextDecoder().decode(response.Payload);
const parseResponse = JSON.parse(responseData);
const responseBody = parseResponse.response.functionResponse.responseBody.TEXT.body;
return responseBody;
}
throw new Error('Lambda response error');
} catch (error) {
console.error('Lambda invocation error:', error);
throw error;
}
}
async function postInitialMessage(channel_id, text) {
const response = await fetch('https://slack.com/api/chat.postMessage', {
method : 'POST',
headers: {
'Content-Type': 'application/json',
Authorization : `Bearer ${process.env.SLACK_BOT_TOKEN}`
},
body: JSON.stringify({
channel: channel_id,
text : text
})
});
if(!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if(!data.ok) {
throw new Error(`Slack API error: ${data.error}`);
}
return data.ts;
}
// slackメッセージを更新する関数
async function updateSlackMessage(ts, channel_id, text) {
const updateResponse = await fetch('https://slack.com/api/chat.update', {
method : 'POST',
headers: {
'Content-Type': 'application/json',
Authorization : `Bearer ${process.env.SLACK_BOT_TOKEN}`
},
body: JSON.stringify({
channel: channel_id,
ts : ts,
text : text
})
});
if(!updateResponse.ok) {
throw new Error(`HTTP error! status: ${updateResponse.status}`);
}
}
async function *processStreamResponse(response) {
let responseText = '';
for await (const chunk of response.body) {
const decoder = new TextDecoder();
const jsonString = decoder.decode(chunk.chunk?.bytes);
await new Promise(resolve => setTimeout(resolve, 100));
const parsedChunk = JSON.parse(jsonString);
if(parsedChunk.type === 'content_block_delta' &&
parsedChunk.delta?.type === 'text_delta' &&
parsedChunk.delta?.text) {
responseText += parsedChunk.delta.text;
yield responseText;
}
}
}
async function sendUserMessageToAiStream(message, searchResult, modelId) {
const client = new BedrockRuntimeClient({ region: 'ap-northeast-1' });
const prompt_config = {
anthropic_version: 'bedrock-2023-05-31',
max_tokens : 4096,
system : `以下の検索結果を参考にして、ユーザーの質問に答えてください。検索結果に含まれない情報は使用しないでください。検索結果に関連する情報がない場合は、その旨を伝えてください。
検索結果:
${searchResult}`,
messages: [
{
role : 'user',
content: [
{ type: 'text', text: message }
]
}
]
};
const sendBody = JSON.stringify(prompt_config);
const response = await client.send(
new InvokeModelWithResponseStreamCommand({
modelId : modelId,
contentType: 'application/json',
accept : 'application/json',
body : sendBody
})
);
return new ReadableStream({
async start(controller) {
try {
for await (const text of processStreamResponse(response)) {
controller.enqueue(text);
}
controller.close();
} catch (error) {
controller.error(error);
}
}
});
}
export const handler = async event => {
try {
console.log('Event:', JSON.stringify(event));
// SQSメッセージの処理
const sqsMessage = JSON.parse(event.Records[0].body);
const { text, channel_id } = sqsMessage;
try {
// lambda呼び出し
const functionName = process.env.LAMBDA_FUNCTION_NAME;
if(!functionName) {
throw new Error('Arn取得失敗');
}
const agentPayload = {
messageVersion: '1.0',
actionGroup : 'action_group_retrieve',
function : functionName,
parameters : [
{
value: text
}
]
};
const [searchResult, responseTs] = await Promise.all([
invokeLambda(functionName, agentPayload),
postInitialMessage(channel_id, '質問を受け付けました。回答を生成中です。')
]);
const stream = await sendUserMessageToAiStream(text, searchResult, 'anthropic.claude-3-5-sonnet-20240620-v1:0');
const reader = stream.getReader();
while(true) {
const { done, value } = await reader.read();
if(done) {
break;
}
await updateSlackMessage(responseTs, channel_id, value);
}
} catch (error) {
// エラー発生時はエラーメッセージを送信
console.error('Processing error:', error);
await postInitialMessage(channel_id, 'エラーが発生しました。時間をおいて再度お試しください。');
}
// 処理の成功失敗に関わらずtrueを返す(メッセージを削除)
return true;
} catch (error) {
console.error('Error:', error);
// 予期せぬエラーの場合でもメッセージを削除
return true;
}
};
セキュリティに関する注意事項
- この署名検証の実装はSlackの公式ドキュメントに基づいています
- Signing Secretは必ず環境変数として管理し、ソースコードには直接記述しないでください
- 本番環境では、AWS Systems ManagerのParameter StoreやSecrets Managerでの管理を推奨します
6.スラッシュコマンドの設定
ここからSlackAppの変更を行っていきます。
- 左メニューから「Slash Commands」を選択
- 「Create New Command」をクリック
- 以下の項目を設定:
- Command:
/chat
- Request URL: 処理受付Lambda関数のURL
- Short Description: 「Knowledge baseに質問します」
- Usage Hint: 「[質問内容]」
- 「Save」をクリック
以下の画像のようになればOK
変更に伴う修正
前回使っていたもので不要になったものを削除します。
- Event SubscriptionsのEnable Eventsはオフに
(スラッシュコマンドからの呼び出しに限定するため)
- OAuth & PermissionsのScopesから不要なものを削除します。
7.動作確認
DMでの動作確認
- Slackの左サイドバーでボットを見つけ、クリックしてDMを開きます。
- テストメッセージを送信します
最初に「回答を生成中..」と表示された後にストリーミング形式で回答が表示され、
以下の画像のように正常に返答が得られれば成功です。
トラブルシューティング
もし正常に動作しない場合、以下の点を確認してください。
- 処理受付Lambdaと処理実行Lambdaのクラウドウォッチログの確認
- 環境変数が正しく設定されているか確認
- SlackAppの設定で、スラッシュコマンドが正しく設定されているか確認
免責事項
作者または著作権者は、契約行為、不法行為、またはそれ以外であろうと、ソフトウェアに起因または関連し、あるいはソフトウェアの使用またはその他の扱いによって生じる一切の請求、損害、その他の義務について何らの責任も負わないものとします。
Discussion