【Cloud Data Fusion】プライベートインスタンスでS3からデータ収集する方法
はじめに
以前、Cloud Data Fusion(以降、CDF)のパイプラインをプライベートインスタンスで構築し
Google Cloud Storage(以降、GCS)からログデータを収集しました。
しかし、データソースが必ずしもGCSにあるとは限りません。
欲しいデータがAWS上のS3に格納されているようなケースもあります。
今回は一旦プライベートインスタンスとして構築してしまったCDFで
AWSのS3からログデータを収集する方法を解説します。
実行環境
Product | version |
---|---|
CDAP | 6.4.0 (Developer) |
Amazon S3 Batch Source | 1.14.0 |
GCP Region | asia-northeast1 |
AWS Region | ap-northeast-1 |
【構成図】
全体構成
パイプライン構成
【補足】
・ CDFは事前にデプロイ済みとなります。
・ Amazon S3のバケットにすでにデータが格納されている状態としています。
・ 上記のバケットにdata_fusion_testフォルダを作成しています。
・ 利用するファイルはweblog_20210516000000_20210516235959.csv.zipです。
実施手順
- Cloud NATの構築
- IAMユーザの作成
- Amazon S3 Batch Sourceの設定
1. Cloud NATの構築
S3に対して、データアクセスするコンポーネントはDataprocになります。
しかしプライベート環境の場合、DataprocにはプライベートIPしか割り当てられません。
そのため、事前にCloud NAT[1]を構築します。
GCPコンソールにログインします。
ネットワーク サービス
> Cloud NAT
を開き、NATゲートウェイを作成をクリックします。
以下のパラメータでNATゲートウェイを作成します。
(NATゲートウェイは、Cloud Routerの機能として動作)
設定項目 | 設定値 | 備考 |
---|---|---|
ゲートウェイの名前 | test_natgw | 好きな名前を入力します。 |
VPCネットワーク | default | デプロイするネットワークを選択します。 |
リージョン | asia-northeast1 | デプロイするリージョンを選択します。 |
Cloud Router | 新しいルーターを作成 | デプロイ先のCloud Routerを指定します。 |
ルータの名前 | test_rt | 好きな名前を入力します。 |
ソース(内部) | すべてのサブネットのプライマリとセカンダリの範囲 | NAT ゲートウェイにマップするサブネットを選択します。 |
NAT IPアドレス | 自動(推奨) | インターネットに出ていくNAT外部IPを固定もしくは自動を指定します。 |
Stackdriver Logging | 変換とエラー | トラブルシュート時の切り分けのために有効化しました。 |
【補足】
・ 上記以外の設定項目はデフォルト値のままとしています。
2. IAMユーザの作成
Amazon S3 Batch Sourceでは、AWSのアクセスキーを利用してS3にアクセスします。
そのためデータが格納されているS3領域に対して読み取り可能なIAMユーザを作成します。
AWSマネージメントコンソールにログインします。
サービス
からIAM
をクリックし、以下の内容でユーザを追加します
設定項目 | 設定値 | 備考 |
---|---|---|
ユーザ名 | cdf_user | 好きな名前を入力します。 |
アクセスの種類 | プログラムによるアクセス | アクセスキーの利用のみです。 |
アクセス許可の設定 | ユーザ-をグループに追加 | どれを選択してもOKです。 |
ユーザーをグループに追加 | - | どのグループにも所属させない。 |
アクセスキーとシークレットキーが発行されるので、メモしておきます。
作成後、該当ユーザのアクセス権限
タブを開き、インラインポリシーの追加をクリックします。
以下の内容でIAMポリシーを追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:Get*",
"s3:List*"
],
"Resource": [
"arn:aws:s3:::cdf-test",
"arn:aws:s3:::cdf-test/*"
]
}
]
}
3. Amazon S3 Batch Sourceの設定
ここでようやくCDFのパイプラインの作成を行います。
本投稿では、Source
で選択するAmazon S3のプロパティのみ説明します。
設定項目 | 設定値 | 備考 |
---|---|---|
Reference Name | input_s3_weblog | 任意の処理名をつけます。 |
Path | s3a://cdf-test/data_fusion_test | S3バケットのPrefixまでを指定します。 |
Format | blob | ZIPファイルを読み込むためblobを指定します。 |
Authentication Method | Access Credentials | AWSアクセスキーを利用します。 |
Access ID | xxxxxxxxxxxxx | IAMユーザのアクセスキーを入力します。 |
Access Key | xxxxxxxxxxxxx | IAMユーザのシークレットキーを入力します。 |
Regex Path Filter | weblog_.*.csv.zip | 対象ファイル名を正規表現で指定します。 |
【補足】
・ 上記以外の設定項目はデフォルト値のままとしています。
・ 後段のパイプライン処理の設定手順は過去記事[2]を参照ください。
地味にハマったポイント
Amazon S3 Batch Sourceの設定において
Path
とRegex Path Filter
の書き方で少しつまずきました。
Path
の設定ではS3バケット名の指定のみではダメでした。
その下のPrefix(フォルダ)まで指定する必要がありました。
S3のURIの指定方法でs3aとS3n[3]を知りませんでした。
HadoopでS3のURIを指定する場合に利用する方法ということを今回初めて知りました。
また何が原因でDataprocインスタンスがAmazon S3にアクセス出来ていないのか
クラウドをまたぐこともあり、切り分け方法を予め考えておくことをお勧めします。
本投稿には記載していないものもありますが、今回は以下の準備をして望みました。
❶ Cloud NATのアドレス変換処理をCloud Loggingに出力する。
❷ S3アクセスログをCloudTrailのデータイベントで設定しCloudWatchLogsに出力する。
ちなみにCloud Loggingに出力されるログは以下のようになっています。
宛先(dest_ip
)にAWSが保有するグローバルIPが表示されていれば、OKそうですね。
{
"insertId": "1sxh6jvg6ikbcuw",
"jsonPayload": {
"allocation_status": "OK",
"vpc": {
"project_id": "xxxxxxxxxxxxxxx",
"subnetwork_name": "default",
"vpc_name": "default"
},
"gateway_identifiers": {
"gateway_name": "test_natgw",
"router_name": "test_rt",
"region": "asia-northeast1"
},
"destination": {
"geo_location": {
"region": "Tokyo",
"country": "jpn",
"asn": 16509,
"continent": "Asia"
}
},
"connection": {
"src_port": 60190,
"nat_port": 1027,
"src_ip": "10.146.15.193",
"dest_ip": "52.219.4.177",
"dest_port": 443,
"protocol": 6,
"nat_ip": "104.198.91.140"
},
"endpoint": {
"zone": "asia-northeast1-a",
"vm_name": "cdap-xxxxxxx-6870bced-c39e-11eb-9a20-66f85d5e5b8b-m",
"project_id": "xxxxxxxxxxxxxxx",
"region": "asia-northeast1"
}
},
"resource": {
"type": "nat_gateway",
"labels": {
"gateway_name": "test_natgw",
"project_id": "xxxxxxxxxxxxxxx",
"router_id": "3848806225329968252",
"region": "asia-northeast1"
}
},
"timestamp": "2021-06-02T12:34:10.613299818Z",
"labels": {
"nat.googleapis.com/nat_ip": "104.198.91.140",
"nat.googleapis.com/instance_zone": "asia-northeast1-a",
"nat.googleapis.com/instance_name": "cdap-vxxxxxxx-6870bced-c39e-11eb-9a20-66f85d5e5b8b-m",
"nat.googleapis.com/subnetwork_name": "default",
"nat.googleapis.com/network_name": "default",
"nat.googleapis.com/router_name": "test_rt"
},
"logName": "projects/xxxxxxxxxxxxxxx/logs/compute.googleapis.com%2Fnat_flows",
"receiveTimestamp": "2021-06-02T12:34:17.913137941Z"
}
以下は、CloudWatchLogsInsightでS3アクセスログを検索するクエリです。
fields @timestamp, @message
| filter eventSource = "s3.amazonaws.com"
and userIdentity.userName = "cdf_user"
and eventCategory = "Data"
and requestParameters.bucketName = "cdf-test"
| sort @timestamp desc
| limit 100
【参考】
・ CloudWatch Logs Insights で監査ログの分析をしてみる
まとめ
さて、いかがでしたでしょうか?
AWS環境に構築したシステムが生成したログデータを
GCPのBigQueryで分析したいことって意外とあると思います。
加工をしないでそのままBigQueryにロードできるデータであれば
BigQuery Data Transfer Service for Amazon S3[4]を利用する案もあると思います。
私の経験上、地味にちょっとした加工が必要なケースがありますので
CDFを利用する方法をご紹介させて頂きました^^
Cloud Data Fusion 関連投稿
- 本格的なパイプライン作ってみた!
- GCSバケット消えない問題
- インスタンス作成でどハマりした話
- パイプライン実行エラー検知方法
- ZIPファイルの取り込み時の苦戦とその解決方法
- パイプライン失敗時のSlack通知方法
Discussion