🐍

ブラウザ拡張機能で収集したアクティビティログのためのデータレイク

2025/01/01に公開

新年のご挨拶

明けましておめでとうございます!
転職してからアウトプットが減っていたので、気合を入れて新年最初の記事を書いてみました。

今回は、転職後の3か月間のアウトプットとして構築した、ブラウザ拡張機能で収集したユーザーのアクティビティデータを活用するAWSデータレイクについてご紹介します。

※この記事ではAWSアーキテクチャに焦点を当てており、ブラウザ拡張機能やコードの詳細には触れていませんのでご了承ください 🙏

やりたいこと

作りたてのサービスの中で、ユーザーのブラウザでのアクティビティログを分析する基盤を作成することになりました。
実現したいことはこちら
1. ブラウザ拡張機能でユーザーのアクティビティデータを収集
2. データを必要な情報に加工し、データ分析基盤に格納
3. 基盤データベース(Aurora)のユーザーデータと統合して分析
4. 分析結果を永続化し、可視化できる形で提供

データ収集

ユーザーアクティビティデータ

ブラウザ拡張機能が生成するアクティビティログを、以下のフローで収集・保存します:

  1. ログ生成
    ブラウザ拡張機能がユーザーの操作ログを生成します。
  2. API Gatewayへの送信
  • 拡張機能からAPI Gatewayにデータを送信します。
  • RequestValidators を設定し、必要なデータが含まれているかを検証します。
  1. FirehoseとLambdaでデータ加工
    Firehoseでデータを受け取り、Lambdaを利用してフォーマット変換やデータ検証を行います。
  • API Gatewayではデータの存在チェックは可能ですが、内容の詳細検証は難しいため、Lambdaで実施しています(参考:Firehoseデータ変換)。
  1. S3に保存
    加工済みデータをS3に保存し、後続の分析処理に備えます。
CDKコード例
...
export class FirehoseLogIntegration extends Construct {
  public readonly domainName: string;
  public readonly serviceName: string;
  public readonly apiName: string;
  public readonly deliveryStreamName: string;
  public readonly bucket: s3.Bucket;

  constructor(
    scope: Construct,
    id: string,
    props: FirehoseLogIntegrationStackProps,
  ) {
    super(scope, id);
    this.domainName = props.domainName;

    this.serviceName = props.serviceName;
    this.apiName = props.apiName;
    this.deliveryStreamName = props.deliveryStreamName;

    this.bucket = this.#createBucket(props);

    const { customDomain } = props;

    this.#createDataStreamToS3({
      ...props,
      bucket: this.bucket,
    });
    const gateway = this.#createApiGateway(props);
  }

  #createBucket(props: FirehoseLogIntegrationStackProps) {
    const bucket = new s3.Bucket(this, 'FirehoseLogIntegrationBucket', {
      bucketName: `${this.serviceName}-${props.stage}`,
      encryption: s3.BucketEncryption.S3_MANAGED,
    });
    return bucket;
  }

  #createDataStreamToS3(props: FirehoseStackProps) {
    const dataStream = new FirehoseConstruct(
      this,
      'FirehoseLogIntegrationDataStream',
      {
        ...props,
      },
    );
  }

  #createApiGateway(props: FirehoseLogIntegrationStackProps) {
    const apiGatewayLogGroup = new logs.LogGroup(this, 'ApiGatewayLogGroup', {
      logGroupName: `/aws/apigateway/${this.serviceName}`,
      retention: props.logRetentionDefault,
    });

    const apiGatewayResourcePolicy = new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: ['execute-api:Invoke'],
          principals: [new iam.AnyPrincipal()],
          resources: ['execute-api:/*/*/*'],
        }),
      ],
    });

    const apiGatewayRole = new iam.Role(this, 'ApiGatewayRole', {
      assumedBy: new iam.ServicePrincipal('apigateway.amazonaws.com'),
      managedPolicies: [
cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
          'AmazonKinesisFirehoseFullAccess',
        ),
      ],
    });

    const apiGatewayRestApi = new ApiGatewayRestApi(
      this,
      'FirehoseLogIntegrationRestApi',
      {
        restApiName: `GatewayFor${this.apiName}`,
        cloudWatchRole: true,
        deployOptions: {
          stageName: 'blue',
          accessLogDestination: new apigateway.LogGroupLogDestination(
            apiGatewayLogGroup,
          ),
          accessLogFormat: apigateway.AccessLogFormat.jsonWithStandardFields({
            caller: false,
            httpMethod: true,
            ip: true,
            protocol: true,
            requestTime: true,
            resourcePath: true,
            responseLength: true,
            status: true,
            user: true,
          }),
          tracingEnabled: true,
          metricsEnabled: true,
          dataTraceEnabled: true,
          loggingLevel: apigateway.MethodLoggingLevel.INFO,
        },
        policy: apiGatewayResourcePolicy,
        cors: true,
      },
    );

    const apiGatewayRequestValidator = new ApiGatewayRequestValidator(
      this,
      'FirehoseLogIntegrationValidator',
      {
        restApiId: apiGatewayRestApi.apiGateway.restApiId,
        rootResourceId: apiGatewayRestApi.apiGateway.restApiRootResourceId,
      },
      {
        contentType: 'application/json',
        description: 'リクエストボディのバリデーション用モデル',
        modelName: this.serviceName),
        schema: {
          type: apigateway.JsonSchemaType.OBJECT,
          required: [
            //データに必須のプロパティ
          ],
        },
      },
      {
        requestValidatorName: this.serviceName,
        validateRequestBody: true,
        validateRequestParameters: false,
      },
    );

    const resource = new ApiGatewayResource(
      this,
      'FirehoseLogIntegrationApiResource',
      {
        resource: apiGatewayRestApi.apiGateway.root,
        pathParts: ['api'],
        cors: true,
      },
    );
...

基盤DB(Aurora)

定期的にAuroraのスナップショットをS3にエクスポートします。

  1. Auroraの定期スナップショット設定(ここでは1日おき)
  2. LambdaでスナップショットのS3エクスポートタスクを作るコードを書く
    エクスポートタスクでは、Auroraにあるスキーマの一部テーブルだけエクスポートするように設定します
  3. EventBridgeでLambdaを定期実行

基盤データの中で使用するデータだけS3に上げることで、データレイクの一部として統合可能な状態になりました。

データカタログ作成

それぞれS3に保存されたデータを分析可能にするためにデータカタログを作成します。
ここではデータ分析基盤にAthenaを採用することになったので、
Glueクローラーを使って、S3に保存したデータからデータカタログを作成することにしました。

  1. Glueクローラーの定期実行
    S3に保存されたデータをGlueクローラーでスキャンし、データカタログを作成します。
  2. Athenaでクエリ可能な状態に
    AthenaのデータベースでGlueのデータカタログを設定します。
CDKコード例
...
// // データカタログ
    this.glueDatabase = new aws_glue.CfnDatabase(
      this,
      'GlueDatabase',
      {
        catalogId: $catalogId,
        databaseInput: {
          name: serviceName,
        },
      },
    );
    this.glueDatabase.applyRemovalPolicy(removalPolicy);

    this.crawlerRole = new iam.Role(this, 'CrawlerRole', {
      assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
    });
    this.crawlerRole.addManagedPolicy(
      iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole'),
    );
    s3Buckets.map(s3 => {
      s3.bucket.grantRead(this.crawlerRole);
    });
    const s3Targets = s3Buckets.map(s3 => ({
      path: `${s3.bucket.s3UrlForObject()}${s3.path}`,
      exclusions: s3.exclusions,
    }));

    const s3Crawler = new aws_glue.CfnCrawler(this, 'S3Crawler', {
      role: this.crawlerRole.roleName,
      databaseName: serviceName,
      name: `${serviceName}-crawler`,
      targets: {
        s3Targets,
      },
      schedule: schedule,
    });
...

データ分析

↑で用意したデータカタログを使ってデータ分析を行います。

  1. LambdaからAthenaへクエリ実行
    LambdaからAthenaにクエリを送信し、それぞれのS3から作ったテーブルから分析結果を取得します。
  2. EventBridgeで定期実行
    作成したLambdaをEventBridgeで定期実行します。

永続化、可視化

分析結果をAuroraに保存し、バックエンドの可視化用APIからフロントエンドで表示させるようにしています。

  1. Auroraに保存
    ↑のLambdaの中で、分析結果をサービスのバックエンドとして使っているFargateのAPIを通じて、Auroraのテーブルに保存します。
  2. フロントエンドでの可視化
    保存されたデータを可視化用APIで取得し、サービスのフロントエンドで可視化します。

全体のアーキテクチャ
全体

まとめ

アーキテクチャを考える上で以下の方針を重視しました。

  • クラウド料金のコストを抑える
    始めたてのサービスなので、リソースにコストはあまりかけられない状態だったので、↓の方針で構成しました
    • なるべく既存のバックエンドやデータベースなどのリソースを活用
    • 他のクラウドやSaaSを使わないでAWSだけで完結
  • 最低限の拡張性の担保
    活用するデータはすべてS3に集約させることで、他のデータ分析基盤にも移行しやすい状態にしました

課題

現時点での課題もいくつか挙がっています。

  1. クエリ負荷:分析結果保存テーブルが可視化に最適化されておらず、他テーブルとのジョインが必要で負荷が高い。
  2. 各ジョブの実行順序が担保されていない
  3. データライフサイクル管理:S3のデータ保存期間やアーカイブ設定が未整備。
  4. データカタログの管理:1日おきに更新されるデータカタログの整理が必要。

実際作ってみて、今までよりコストや拡張性に関して深く考えないといけないことが多かったような気がします。
サービス立ち上げたばっかりだけどデータレイク基盤欲しいと言ったときに参考になれば嬉しいです!

2025年、とりあえず1本目書けた〜〜〜

Discussion