📣

Amplify AI Kitの黒魔術から学ぶ、Bedrock 生成AIアプリの実装アイディア

2024/11/28に公開

幽霊の正体見たり枯れ尾花

幽霊だと思って怖がっていたものをよく見ると、風にゆれる枯れすすきであった。 薄気味悪く思うものも、その正体を確かめてみると、実は少しも怖いものではないというたとえ。
(コトバンク様から引用)

はじめに

「愛犬との毎日を楽しく便利にするアプリ オトとりっぷ」でエンジニアしています、足立です!

AWS Amplify は、AWS バックエンドおよびフロントエンドの実装をいい感じに隠蔽してくれるおかげで爆速開発が可能になるサービスおよびライブラリ群です。しかし、その隠蔽という単語から時に「黒魔術」と評されることも少なくありません。

先日発表された Amplify AI Kit ですが、まさに黒魔術と呼ぶに相応しい(一見どうなってるかよく分からない)技術です。しかし実態は OSS であり、中身はトレース可能です。ですので中身をちゃんと理解すれば Bedrock 生成 AI アプリの実装アイディアが身につくはずです。
そこでこの記事では、Amplify AI Kit の黒魔術を深掘りしたいと思います。

Amplify AI Kit とは?

AI Kit とは、先日発表された、新しい Amplify の機能です。

https://aws.amazon.com/jp/blogs/mobile/build-fullstack-ai-apps-in-minutes-with-the-new-amplify-ai-kit/

特に興味深いところは、Amazon Bedrock Tool Useが利用可能な点で、Amazon Bedrock Knowledge Basesを必要に応じて呼び出して使うことが可能になります。

例えば、「オトとりっぷ」の LP ページ利用規約ページの PDF を Knowledge Bases に食わせた Tools を用意した場合、以下のような返事を返すチャット bot が作成できます。

幽霊に出会う

さて、これのどこが黒魔術なのでしょうか?
先程の公式ブログから構成図を引用します。


(Build fullstack AI apps in minutes with the new Amplify AI Kit より)

AWS AppSync を中心にデータのやり取りが行われているようです。
であれば、「AWS AppSync はストリーミングレスポンスできないはずなのに、デモ Gif はそれを成し得ているように見える」という点が変ですね。ここが AI Kit 最大の黒魔術でしょう。

さて、どうやって実装しているのでしょうか?
結果だけ知りたい方は、ページ最後にまとめてありますのでそちらをご覧ください。

黒魔術を暴く

Amplify Gen2 での実装方法

とりあえず、実装方法について眺めてみます。
詳細は割愛しますが、以下のように設定すると AppSync 周りの設定は完了します。

amplify/resource.ts
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";

const schema = a.schema({
  knowledgeBase: a
    .query()
    .arguments({ input: a.string() })
    .handler(
      a.handler.custom({
        dataSource: 'KnowledgeBaseDataSource',
        entry: './resolvers/kbResolver.js',
      })
    )
    .returns(a.string())
    .authorization((allow) => allow.authenticated()),

  chat: a
    .conversation({
      aiModel: a.ai.model('Claude 3.5 Sonnet'),
      systemPrompt: `あなたはオトとりっぷというアプリの解説者です。知っていることだけで回答してください。`,
      tools: [
        a.ai.dataTool({
          name: 'searchDocumentation',
          description: 'オトとりっぷに関するドキュメントを検索します。',
          query: a.ref('knowledgeBase'),
        }),
      ],
    })
    .authorization((allow) => allow.owner()),
});

@aws-amplify/backendというライブラリの中に AI Kit 関連も含まれていそうです。
また、Chat と knowledgeBase を別々に API を作成していますね。

Amplify Backend

Amplify Gen2 と呼ばれる現行の Amplify を構築する場合、Amplify Backend と呼ばれるライブラリを使用してバックエンドが構築されます。秘術はそこに隠されていそうです。中身は AWS CDK で書かれているので、解読可能です。

a.conversationにより作成される API により呼び出される最終的な本体は、ずーーーっと追っていくと、ここに見つけることができます。AWS Lambda の Node.js Runtime ですね。

https://github.com/aws-amplify/amplify-backend/blob/%40aws-amplify/backend%401.8.0/packages/ai-constructs/src/conversation/conversation_handler_construct.ts#L87-L115

肝心の Lambda Handler 関数はというと、こちらの execute 関数です。

https://github.com/aws-amplify/amplify-backend/blob/%40aws-amplify/backend%401.8.0/packages/ai-constructs/src/conversation/runtime/conversation_turn_executor.ts#L30-L60

execute 関数から重要な部分を抜き出します。

amplify-backend/packages/ai-constructs/src/conversation/runtime/conversation_turn_executor.ts
execute = async (): Promise<void> => {
  try {
    if (this.event.streamResponse) {
      const chunks = this.bedrockConverseAdapter.value.askBedrockStreaming();
      for await (const chunk of chunks) {
        await this.responseSender.value.sendResponseChunk(chunk);
      }
    ...

ポイントは 2 つです。

  1. askBedrockStreaming 関数でストリーミングレスポンスを受け取る
  2. sendResponseChunk 関数でチャンク毎にレスポンスを返す

順番に何をやっているのか、確認してみましょう。

askBedrockStreaming 関数

実装はこちらです。

https://github.com/aws-amplify/amplify-backend/blob/%40aws-amplify/backend%401.8.0/packages/ai-constructs/src/conversation/runtime/bedrock_converse_adapter.ts#L165-L342

重要な部分を抜き出します。

amplify-backend/packages/ai-constructs/src/conversation/runtime/bedrock_converse_adapter.ts
async *askBedrockStreaming(): AsyncGenerator<StreamingResponseChunk> {
  const messages: Array<Message> =
    await this.getEventMessagesAsBedrockMessages();

  let stopReason = '';
  do {
    const converseCommandInput = {
      modelId,
      messages: [...messages],
      system: [{ text: systemPrompt }],
      inferenceConfig: inferenceConfiguration,
      toolConfig,
    };

    bedrockResponse = await this.bedrockClient.send(
      new ConverseStreamCommand(converseCommandInput)
    );

    for await (const chunk of bedrockResponse.stream) {

      } else if (chunk.contentBlockDelta) {
          yield {
            contentBlockText: chunk.contentBlockDelta.delta.text,
          };
      } else if (chunk.messageStop) {
        stopReason = chunk.messageStop.stopReason ?? '';
      }

    }

    if (stopReason === 'tool_use') {
      const toolResultContentBlock = await this.executeTool(toolUseBlock);
      messages.push({
        role: 'user',
        content: toolResponseContentBlocks,
      });
    }
  } while (stopReason === 'tool_use');

  yield {
  };
}

ポイントは以下の通りです。

  1. getEventMessagesAsBedrockMessages 関数で過去のメッセージ履歴を取得する
  2. メッセージ履歴やシステムプロンプト、tools などの情報をインプットとして bedrock API を呼び出す
  3. ストリームレスポンスを順々に処理し、conversation_turn_executor 関数側の chunks に格納していく
  4. stopReason がtool_useの場合は、executeTool 関数を呼び出す
  5. executeTool 関数の結果をメッセージ履歴に追加し、再度 bedrock API を呼び出す
  6. stopReason がtool_useでなくなるまで 3 〜 5 を繰り返す

getEventMessagesAsBedrockMessages 関数の実態は、こちらの通り AppSync への GraphQL クエリです。また、executeTool 関数の実態はこちらの通り AppSync への GraphQL クエリであり、それはamplify/resource.tsで設定した dataTool という訳です。

つまり、メッセージ履歴の取得や Bedrock からの tool_use リクエストは AppSync への GraphQL クエリで処理されているということです。なので、最初にknowledgeBaseという AppSync の JS Resolver を用意させられたんですね。

少しずつ、化けの皮が剥がれてきました。

sendResponseChunk 関数

先程まで見てきた通り bedrock API の返り値は chunks に格納されていくんでしたね。
では、その chunks はどのように処理されるのでしょうか?

その実装はこちらです。

https://github.com/aws-amplify/amplify-backend/blob/%40aws-amplify/backend%401.8.0/packages/ai-constructs/src/conversation/runtime/conversation_turn_response_sender.ts#L128-L146

お察しの通り、こちらも AppSync への GraphQL クエリが作成されています。
中身としては、accumulatedTurnContent (今までの返答も含めた全てが含まれます)をそのまま variables に入れて AppSync を呼び出しています。
AppSync の ResolverFn も確認してみます。

MutationCreateAssistantResponseStreamChatDataResolverFn
MutationCreateAssistantResponseStreamChatDataResolverFn
import { util } from '@aws-appsync/utils';
import * as ddb from '@aws-appsync/utils/dynamodb';

/**
 * Sends a request to the attached data source
 * @param {import('@aws-appsync/utils').Context} ctx the context
 * @returns {*} the request
 */
export function request(ctx) {
  const {
    conversationId,
    associatedUserMessageId,
    accumulatedTurnContent,
    errors,
  } = ctx.args.input;

  const { owner } = ctx.args;

  if (errors) {
    runtime.earlyReturn({
      id: `${associatedUserMessageId}#response`,
      conversationId,
      associatedUserMessageId,
      errors,
      owner,
    });
  }
  const { createdAt, updatedAt } = ctx.stash.defaultValues;

  const assistantResponseId = `${associatedUserMessageId}#response`;
  const expression = 'SET #typename = :typename, #conversationId = :conversationId, #associatedUserMessageId = :associatedUserMessageId, #role = :role, #content = :content, #owner = :owner, #createdAt = if_not_exists(#createdAt, :createdAt), #updatedAt = :updatedAt';

  const expressionValues = util.dynamodb.toMapValues({
    ':typename': 'ConversationMessageChat',
    ':conversationId': conversationId,
    ':associatedUserMessageId': associatedUserMessageId,
    ':role': 'assistant',
    ':content': accumulatedTurnContent,
    ':owner': owner,
    ':createdAt': createdAt,
    ':updatedAt': updatedAt,
  });

  // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html
  const expressionNames = {
    '#typename': '__typename',
    '#conversationId': 'conversationId',
    '#associatedUserMessageId': 'associatedUserMessageId',
    '#role': 'role',
    '#content': 'content',
    '#owner': 'owner',
    '#createdAt': 'createdAt',
    '#updatedAt': 'updatedAt',
  };

  return {
    operation: 'UpdateItem',
    key: util.dynamodb.toMapValues({ id: assistantResponseId }),
    update: {
      expression,
      expressionValues,
      expressionNames,
    },
  };
}

/**
 * Returns the resolver result
 * @param {import('@aws-appsync/utils').Context} ctx the context
 * @returns {*} the result
 */
export function response(ctx) {
  if (ctx.error) {
    util.error(ctx.error.message, ctx.error.type);
  }
  const streamId = `${ctx.args.input.associatedUserMessageId}#stream`;
  const { owner } = ctx.args;
  const event = ctx.args.input;

  const streamEvent = {
    ...event,
    __typename: 'ConversationMessageStreamPart',
    id: streamId,
    owner,
  };

  // TODO: The lambda event should provide the toolUse directly.
  if (event.contentBlockToolUse && event.contentBlockToolUse.toolUse) {
    streamEvent.contentBlockToolUse = event.contentBlockToolUse.toolUse;
  }

  return streamEvent;
}

データベースの content を更新しています。

とうとう、化けの皮が剥げましたね。
なんと、ストリーミングレスポンスを Bedrock から受け取ると、そのチャンク毎に content を accumulatedTurnContent 丸ごと AppSync 経由でデータベースを更新しているんですね。

なんでこんなことしてるんでしょうか?

Amplify UI - AI

答えはフロントエンド側にあります。
Amplify AI Kit を実装するために、フロントエンド側はui-react-aiというライブラリを利用します。

実装例は以下の通りです。

app.ts
'use client';
import { useAIConversation } from '@/client';
import { Authenticator } from '@aws-amplify/ui-react';
import { createAIHooks, AIConversation } from '@aws-amplify/ui-react-ai';
import { generateClient } from 'aws-amplify/api';
import { Schema } from '../amplify/data/resource';

const client = generateClient<Schema>({ authMode: 'userPool' });
const { useAIConversation } = createAIHooks(client);

export default function Page() {
  const [
    {
      data: { messages },
      isLoading,
    },
    handleSendMessage,
  ] = useAIConversation('chat');
  // 'chat' is based on the key for the conversation route in your schema.

  return (
    <Authenticator>
      <AIConversation
        messages={messages}
        isLoading={isLoading}
        handleSendMessage={handleSendMessage}
      />
    </Authenticator>
  );
}

useAIConversationの内部を詳しく見ると subscription が設定してあることがわかります。
データベースの変更を監視して、変更があればそれをサブスクライブしていて都度反映させていた、ということです。

つまり

  1. ユーザーがチャットメッセージを送信すると、Lambda から Bedrock が呼び出されます。

  1. tools を使用する必要があれば、AppSync 経由で Bedrock Knowledge Bases が呼び出されます。

  1. Knowledge Bases の結果を踏まえて再度 Bedrock が呼び出され、レスポンス結果をチャンク毎にデータベースに追加更新することでユーザー側に Subscribe イベントとして送信します。

  1. フロントエンドでイベントを受け取って UI に反映させます。

わお!
こんな仕組みで動いていたんですね。これであれば、確かにストリーミングレスポンスっぽくフロントエンドに結果を返すことができますね。

最後に

ここまで読んでいただきありがとうございました。
AWS Amplify は OSS ですので、その中身を垣間見ることができます。全てを黒魔術だと決め付けず中身を覗いてみるのも時には新たな発見があるかもしれません。

もし犬専用の音楽アプリに興味を持っていただけたら、ぜひダウンロードしてみてください!

https://www.oto-trip.com/

Discussion