🕓

AWS API GatewayのWeb Socket機能を使ってAmazon Athenaのクエリ結果を非同期にReactで受け取る

12 min read

はじめに

おはようございます、加藤です。API GatewayとLambdaを使ってサーバーレスなWeb APIを構築していると、バックエンドに時間がかかる処理をさせたい時にAPI Gatewayのタイムアウトである29秒が障壁になることがあります。また、タイムアウトに収まるとしても長時間レスポンスを返さず拘束してしまうことはユーザー体験の観点で好ましくありません。

この課題をAPI GatewayのWeb Socket機能とStep Functionsを使って解決するアーキテクチャを検討してみました。

前提

AWS CDKを使って環境構築を行います。CDKの書き方及びデプロイ方法について本記事では説明しません。

Node.js(TypeScript)を使用してWeb APIを構築します。React(TypeScript)を使用してフロントエンドを構築します。フロントエンドは動作を確認する為の最低限の実装です。これらについて動作の詳細は本記事では説明しません。

今回書いたコードはコチラで公開しています。

構成

構成図

非同期処理のWeb APIを構築する検討ポイントとして、API Gateway→Lambda以降の処理をどうやって非同期にするかということがあります。方法はいくつか存在し、SQSキューやLambda非同期実行、Step Functions実行があります。
今回はStep Functionsを利用して構築して使い心地を試してみます。

やってみた

Web Socket APIの構築

  • API Gateway(Web Socket)
  • Step Functions
  • Lambda Functions
  • S3 Bucket

インフラストラクチャとして上記をAWS CDKで構築します。

今回はCloudTrailに対してDynamoDBを作成した証跡をクエリすることにしました。デプロイして動作を試す場合は環境に合わせて対象のeventnameと期間を変更してください。
パーティションが切られたテーブル定義やクエリは下記のブログを参考にさせて頂きました。

import { CfnOutput, Construct, Stack, StackProps } from '@aws-cdk/core';
import { WebSocketApi, WebSocketStage } from '@aws-cdk/aws-apigatewayv2';
import { LambdaWebSocketIntegration } from '@aws-cdk/aws-apigatewayv2-integrations';
import { NodejsFunction } from '@aws-cdk/aws-lambda-nodejs';
import { PolicyStatement } from '@aws-cdk/aws-iam';
import { AthenaGetQueryResults, AthenaStartQueryExecution, LambdaInvoke } from '@aws-cdk/aws-stepfunctions-tasks';
import { IntegrationPattern, JsonPath, StateMachine } from '@aws-cdk/aws-stepfunctions';
import { Bucket } from '@aws-cdk/aws-s3';
import { Tracing } from '@aws-cdk/aws-lambda';


export class BackendStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    /**
     * クエリ結果格納バケットを格納する
     */
    const bucket = new Bucket(this, 'QueryResultBucket');

    /**
     * Step Functionsを作成
     */
    const queryJob = new AthenaStartQueryExecution(this, 'StartAthenaQuery', {
      queryString: 'SELECT * FROM cloudtrail_logs_partition_projection WHERE eventname = \'CreateTable\' AND  region = \'ap-northeast-1\' AND date BETWEEN \'2021/08/01\' AND \'2021/09/01\';',
      queryExecutionContext: {
        databaseName: 'default'
      },
      resultConfiguration: {
        outputLocation: {
          bucketName: bucket.bucketName,
          objectKey: 'results'
        }
      },
      integrationPattern: IntegrationPattern.RUN_JOB,
      resultPath: '$.StartAthenaQuery'
    });

    const getJob = new AthenaGetQueryResults(this, 'GetAthenaQueryResult', {
      queryExecutionId: JsonPath.stringAt('$.StartAthenaQuery.QueryExecution.QueryExecutionId'),
      resultPath: '$.GetAthenaQueryResult'
    });

    /**
     * クエリ結果通知関数を作成する
     */
    const pushResultFn = new NodejsFunction(this, 'PushResultFn', {
      entry: 'src/push-result.ts',
      tracing: Tracing.ACTIVE
    });
    const pushResultJob = new LambdaInvoke(this, 'PushResult', {
      lambdaFunction: pushResultFn
    });

    const stateMachine = new StateMachine(this, 'StateMachine', {
      definition: queryJob.next(getJob).next(pushResultJob),
      tracingEnabled: true
    });

    /**
     * クエリ開始関数の作成
     */
    const queryFn = new NodejsFunction(this, 'QueryFn', {
      entry: 'src/query.ts',
      tracing: Tracing.ACTIVE,
      environment: {
        STATE_MACHINE_ARN: stateMachine.stateMachineArn
      }
    });
    stateMachine.grantStartExecution(queryFn);

    /**
     * API Gatewayの作成
     */
    const webSocketApi = new WebSocketApi(this, 'WebSocketApi');
    webSocketApi.addRoute('query', {
      integration: new LambdaWebSocketIntegration({
        handler: queryFn
      })
    });
    new WebSocketStage(this, 'WebSocketApiDevStage', {
      webSocketApi,
      stageName: 'dev',
      autoDeploy: true
    });

    queryFn.addToRolePolicy(new PolicyStatement({
      resources: [`arn:aws:execute-api:${this.region}:${this.account}:${webSocketApi.apiId}/*`],
      actions: ['execute-api:ManageConnections']
    }));
    pushResultFn.addToRolePolicy(new PolicyStatement({
      resources: [`arn:aws:execute-api:${this.region}:${this.account}:${webSocketApi.apiId}/*`],
      actions: ['execute-api:ManageConnections']
    }));

    new CfnOutput(this, 'Url', {
      value: webSocketApi.apiEndpoint
    });
  }
}

Step Functionsの実行、つまりAthenaへのクエリ要求を受け付けるLambda関数を作成します。

// src/query.ts
import { APIGatewayProxyHandler } from 'aws-lambda';
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
import { StartExecutionCommand, SFNClient } from '@aws-sdk/client-sfn';
import { captureAWSv3Client, enableAutomaticMode } from 'aws-xray-sdk';


export const handler: APIGatewayProxyHandler = async (event, context) => {
  enableAutomaticMode()

  console.log(JSON.stringify({
    ...event,
    logName: 'event'
  }))
  console.log(JSON.stringify({
    ...context,
    logName: context
  }))

  const connectionId = event.requestContext.connectionId
  if (connectionId === undefined) {
    throw new Error("cannot get connection id")
  }

  const sfnClient = captureAWSv3Client(new SFNClient({}))
  const output = await sfnClient.send(new StartExecutionCommand({
    input: JSON.stringify({
      apiId: event.requestContext.apiId,
      connectionId
    }),
    stateMachineArn: process.env.STATE_MACHINE_ARN
  }));

  const endpoint = `https://${event.requestContext.apiId}.execute-api.ap-northeast-1.amazonaws.com/dev`
  const apiGwClient = captureAWSv3Client(
    new ApiGatewayManagementApiClient({endpoint}))

  await apiGwClient.send(new PostToConnectionCommand({
    ConnectionId: connectionId,
    // @ts-ignore
    Data: JSON.stringify({
      message: `Processing started with execution ARN: ${output.executionArn}`
    })
  }))

  return {
    statusCode: 200,
    body: ''
  }
}

クエリ完了後に結果を通知するLambda関数を作成します。

// src/push-result.ts
import { Context } from 'aws-lambda';
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
import { captureAWSv3Client, enableAutomaticMode } from 'aws-xray-sdk';


interface Event {
  apiId: string
  connectionId: string
  GetAthenaQueryResult: {
    NextToken?: string,
    ResultSet: {
      ResultSetMetadata: {
        ColumnInfo: {
          CaseSensitive: boolean,
          CatalogName: string,
          Label: string,
          Name: string,
          Nullable: string,
          Precision: number,
          Scale: number,
          SchemaName: string,
          TableName: string,
          Type: string
        }[]
      },
      Rows: { Data: { VarCharValue?: string }[] }[]
    },
    UpdateCount: number
  },
}


export const handler = async (event: Event, context: Context) => {
  enableAutomaticMode()
  console.log(JSON.stringify({
    ...event,
    logName: 'event'
  }));
  console.log(JSON.stringify({
    ...context,
    logName: context
  }));


  const columns = event.GetAthenaQueryResult.ResultSet.Rows[0].Data.map(({VarCharValue}) => VarCharValue!);
  const data = event.GetAthenaQueryResult.ResultSet.Rows
    .filter((_, idx) => idx !== 0)
    .map(({Data}) => Data
      .map(({VarCharValue}) => VarCharValue ?? null))
    .map((v) => {
      return v.reduce((prev, cur, idx) => {
        return {
          ...prev,
          [columns[idx]]: cur
        }
      }, {})
    })

  const endpoint = `https://${event.apiId}.execute-api.ap-northeast-1.amazonaws.com/dev`;
  const client = captureAWSv3Client(new ApiGatewayManagementApiClient({endpoint}));

  await client.send(new PostToConnectionCommand({
    ConnectionId: event.connectionId,
    // @ts-ignore
    Data: JSON.stringify({
      results: data
    })
  }));


  return {
    statusCode: 200,
    body: ''
  };
};

デプロイを行い、wscatを使用して動作を確認します。

npx wscat -c wss://${API_ID}.execute-api.${AWS_REGION}.amazonaws.com/dev/
Connected (press CTRL+C to quit)
> {"action":"query"}
< {"message":"Processing started with execution ARN: arn:aws:states:${AWS_REGION}:${AWS_ACCOUNT_ID}:execution:StateMachine2E01A3A5-BvpPbmmwpAaI:703bdd22-f97e-471b-935f-d7c5f1be5252"}

私の環境では数分で結果が返ってきました。

```bash
< {"results":[{...},{...}]} # 結果省略

Reactでのフロントエンドの構築

こういったWeb Socket APIを使う場合にどういったフロントエンドが必要かも理解しておくためにReactでフロントエンドを構築します。
環境変数REACT_APP_WEB_SOCKET_API_URLにデプロイされたAPI GatewayのURLをセットして使用します。

Appコンポーネントを読み込んだタイミングでWeb Socketをオープンし、useEffect()内でsocket.onmessageを使いメッセージを待ち受けます。
ボタンを押すとクエリを実行し、結果を<p>タグで描画します。クエリ要求を受け付けた段階でメッセージが返ってくるのでアラートで表示します。

// src/App.tsx
import React from 'react';

function App() {
  const socket = new WebSocket(process.env.REACT_APP_WEB_SOCKET_API_URL!)
  const [result, setResult] = React.useState<Object[]>([{}]);

  React.useEffect(() => {
    socket.onmessage = (event) => {
      const data = JSON.parse(event.data)

      if (data.message != null) {
        alert(data.message)
      }

      if (Array.isArray(data.results)) {
        setResult(data.results)
      }
    }

    return () => {
      socket.close()
    }
  })

  const handleOnClick:   React.MouseEventHandler<HTMLButtonElement> = () => {
    socket.send(JSON.stringify({
      action: 'query'
    }))
  }

  return (
    <>
      <p>SELECT * FROM cloudtrail_logs_partition_projection WHERE eventname = 'CreateTable' AND  region = 'ap-northeast-1' AND date BETWEEN '2021/08/01' AND '2021/09/01';</p>
      <button onClick={handleOnClick}>クエリ開始</button>

      <p>result: {JSON.stringify(result)}</p>
    </>
  );
}

export default App;

npm run devでサーバーをDev環境で起動します。

Frontend1

クエリボタンを押すと、Web APIにリクエストが飛びます。リクエストが受け付けられるとアラートが表示されます。

Frontend2

しばらく待つと結果が描画されます。

Frontend3

トレーシング

今回の構成ではAWS X-Rayを有効にしていたのでトレーシングができます。このように図示されました。

X-Ray

Service Lens Mapも確認できます。

Service Lens Map

あとがき

Step Functionsを使うのが初めてだったので最初は戸惑いましたが、AWS CDKを使っているからか簡単に構築することができました。
この構成はAPI Gatewayのタイムアウトから解放され安心してバックエンドで非同期処理を行えます。しかし、通知用のLambda関数の起動(特にコールドスタートの場合)やStep Functionsの実行といったオーバーヘッドが発生するので、今回のようにクエリが直ぐに完了する場合は不格好だとしてもLambda上でループしながら処理の完了を待ったほうがユーザーへのレスポンスも早く構成要素も少なくて管理しやすかったしれません。
今回実際に作ってみて、非同期処理だからと言ってむやみにStep Functionsを使用するのではなく処理にかかる時間をしっかりと想定し、実際に構築してみてユーザー体験を検証した上で導入すべきだと感じました。
以上でした。

Discussion

ログインするとコメントできます