【Cloud Data Fusion】プライベートインスタンスでS3からデータ収集する方法

7 min read読了の目安(約7000字

はじめに

以前、Cloud Data Fusion(以降、CDF)のパイプラインをプライベートインスタンスで構築し
Google Cloud Storage(以降、GCS)からログデータを収集しました。

https://zenn.dev/hssh2_bin/articles/478e6a41e103c7

しかし、データソースが必ずしもGCSにあるとは限りません。
欲しいデータがAWS上のS3に格納されているようなケースもあります。

今回は一旦プライベートインスタンスとして構築してしまったCDFで
AWSのS3からログデータを収集する方法を解説します。

実行環境

Product version
CDAP 6.4.0 (Developer)
Amazon S3 Batch Source 1.14.0
GCP Region asia-northeast1
AWS Region ap-northeast-1

【構成図】

全体構成


パイプライン構成

【補足】
・ CDFは事前にデプロイ済みとなります。
・ Amazon S3のバケットにすでにデータが格納されている状態としています。
・ 上記のバケットにdata_fusion_testフォルダを作成しています。
・ 利用するファイルはweblog_20210516000000_20210516235959.csv.zipです。

実施手順

  1. Cloud NATの構築
  2. IAMユーザの作成
  3. Amazon S3 Batch Sourceの設定

1. Cloud NATの構築

S3に対して、データアクセスするコンポーネントはDataprocになります。
しかしプライベート環境の場合、DataprocにはプライベートIPしか割り当てられません。
そのため、事前にCloud NAT[1]を構築します。

GCPコンソールにログインします。
ネットワーク サービス > Cloud NATを開き、NATゲートウェイを作成をクリックします。

以下のパラメータでNATゲートウェイを作成します。
(NATゲートウェイは、Cloud Routerの機能として動作)

設定項目 設定値 備考
ゲートウェイの名前 test_natgw 好きな名前を入力します。
VPCネットワーク default デプロイするネットワークを選択します。
リージョン asia-northeast1 デプロイするリージョンを選択します。
Cloud Router 新しいルーターを作成 デプロイ先のCloud Routerを指定します。
ルータの名前 test_rt 好きな名前を入力します。
ソース(内部) すべてのサブネットのプライマリとセカンダリの範囲 NAT ゲートウェイにマップするサブネットを選択します。
NAT IPアドレス 自動(推奨) インターネットに出ていくNAT外部IPを固定もしくは自動を指定します。
Stackdriver Logging 変換とエラー トラブルシュート時の切り分けのために有効化しました。

【補足】
・ 上記以外の設定項目はデフォルト値のままとしています。

2. IAMユーザの作成

Amazon S3 Batch Sourceでは、AWSのアクセスキーを利用してS3にアクセスします。
そのためデータが格納されているS3領域に対して読み取り可能なIAMユーザを作成します。

AWSマネージメントコンソールにログインします。
サービスからIAMをクリックし、以下の内容でユーザを追加します

設定項目 設定値 備考
ユーザ名 cdf_user 好きな名前を入力します。
アクセスの種類 プログラムによるアクセス アクセスキーの利用のみです。
アクセス許可の設定 ユーザ-をグループに追加 どれを選択してもOKです。
ユーザーをグループに追加 - どのグループにも所属させない。

アクセスキーとシークレットキーが発行されるので、メモしておきます。

この情報が漏れるとセキュリティ事故になりますので、扱いは慎重に!

作成後、該当ユーザのアクセス権限タブを開き、インラインポリシーの追加をクリックします。
以下の内容でIAMポリシーを追加します。

ポリシー内容
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Get*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::cdf-test",
                "arn:aws:s3:::cdf-test/*"
            ]
        }
    ]
}

3. Amazon S3 Batch Sourceの設定

ここでようやくCDFのパイプラインの作成を行います。
本投稿では、Sourceで選択するAmazon S3のプロパティのみ説明します。

設定項目 設定値 備考
Reference Name input_s3_weblog 任意の処理名をつけます。
Path s3a://cdf-test/data_fusion_test S3バケットのPrefixまでを指定します。
Format blob ZIPファイルを読み込むためblobを指定します。
Authentication Method Access Credentials AWSアクセスキーを利用します。
Access ID xxxxxxxxxxxxx IAMユーザのアクセスキーを入力します。
Access Key xxxxxxxxxxxxx IAMユーザのシークレットキーを入力します。
Regex Path Filter weblog_.*.csv.zip 対象ファイル名を正規表現で指定します。

【補足】
・ 上記以外の設定項目はデフォルト値のままとしています。
・ 後段のパイプライン処理の設定手順は過去記事[2]を参照ください。

地味にハマったポイント

Amazon S3 Batch Sourceの設定において
PathRegex Path Filterの書き方で少しつまずきました。

Pathの設定ではS3バケット名の指定のみではダメでした。
その下のPrefix(フォルダ)まで指定する必要がありました。

S3のURIの指定方法でs3aS3n[3]を知りませんでした。
HadoopでS3のURIを指定する場合に利用する方法ということを今回初めて知りました。

また何が原因でDataprocインスタンスがAmazon S3にアクセス出来ていないのか
クラウドをまたぐこともあり、切り分け方法を予め考えておくことをお勧めします。

本投稿には記載していないものもありますが、今回は以下の準備をして望みました。
❶ Cloud NATのアドレス変換処理をCloud Loggingに出力する。
❷ S3アクセスログをCloudTrailのデータイベントで設定しCloudWatchLogsに出力する。

ちなみにCloud Loggingに出力されるログは以下のようになっています。
宛先(dest_ip)にAWSが保有するグローバルIPが表示されていれば、OKそうですね。

アドレス変換ログ
{
  "insertId": "1sxh6jvg6ikbcuw",
  "jsonPayload": {
    "allocation_status": "OK",
    "vpc": {
      "project_id": "xxxxxxxxxxxxxxx",
      "subnetwork_name": "default",
      "vpc_name": "default"
    },
    "gateway_identifiers": {
      "gateway_name": "test_natgw",
      "router_name": "test_rt",
      "region": "asia-northeast1"
    },
    "destination": {
      "geo_location": {
        "region": "Tokyo",
        "country": "jpn",
        "asn": 16509,
        "continent": "Asia"
      }
    },
    "connection": {
      "src_port": 60190,
      "nat_port": 1027,
      "src_ip": "10.146.15.193",
      "dest_ip": "52.219.4.177",
      "dest_port": 443,
      "protocol": 6,
      "nat_ip": "104.198.91.140"
    },
    "endpoint": {
      "zone": "asia-northeast1-a",
      "vm_name": "cdap-xxxxxxx-6870bced-c39e-11eb-9a20-66f85d5e5b8b-m",
      "project_id": "xxxxxxxxxxxxxxx",
      "region": "asia-northeast1"
    }
  },
  "resource": {
    "type": "nat_gateway",
    "labels": {
      "gateway_name": "test_natgw",
      "project_id": "xxxxxxxxxxxxxxx",
      "router_id": "3848806225329968252",
      "region": "asia-northeast1"
    }
  },
  "timestamp": "2021-06-02T12:34:10.613299818Z",
  "labels": {
    "nat.googleapis.com/nat_ip": "104.198.91.140",
    "nat.googleapis.com/instance_zone": "asia-northeast1-a",
    "nat.googleapis.com/instance_name": "cdap-vxxxxxxx-6870bced-c39e-11eb-9a20-66f85d5e5b8b-m",
    "nat.googleapis.com/subnetwork_name": "default",
    "nat.googleapis.com/network_name": "default",
    "nat.googleapis.com/router_name": "test_rt"
  },
  "logName": "projects/xxxxxxxxxxxxxxx/logs/compute.googleapis.com%2Fnat_flows",
  "receiveTimestamp": "2021-06-02T12:34:17.913137941Z"
}

vm_nameがDataprocのインスタンス名になります。

以下は、CloudWatchLogsInsightでS3アクセスログを検索するクエリです。

CloudWatchLogsInsightのサンプルクエリ
fields @timestamp, @message
| filter eventSource = "s3.amazonaws.com" 
and userIdentity.userName = "cdf_user" 
and eventCategory = "Data"
and requestParameters.bucketName = "cdf-test"
| sort @timestamp desc
| limit 100

eventCategoryDataに指定するとデータイベントのみに絞ることが出来ます。

【参考】
CloudWatch Logs Insights で監査ログの分析をしてみる

まとめ

さて、いかがでしたでしょうか?

AWS環境に構築したシステムが生成したログデータを
GCPのBigQueryで分析したいことって意外とあると思います。

加工をしないでそのままBigQueryにロードできるデータであれば
BigQuery Data Transfer Service for Amazon S3[4]を利用する案もあると思います。

私の経験上、地味にちょっとした加工が必要なケースがありますので
CDFを利用する方法をご紹介させて頂きました^^

脚注
  1. Cloud NATとは ↩︎

  2. 【Cloud Data Fusion】本格的なパイプライン作ってみた! ↩︎

  3. S3NとS3Aとは ↩︎

  4. BigQuery Data Transfer Service for Amazon S3 ↩︎