🗒️

VPC フローログをAthenaでselectできるようにしパーティション作成は自動化する

2024/03/29に公開

はじめに

AWSではVPCフローログを有効化することで、簡単にS3へ通信ログを保存することができます。

https://docs.aws.amazon.com/ja_jp/vpc/latest/userguide/flow-logs.html

出力先としてはS3, CloudWatch Logs, Kinesis Firehoseが利用できます。

コスパとしてはS3が最もよいと考えられ、S3を選択するケースが多いと思います。

簡単にS3に保存はできるのですが、いざログを参照したい、分析したい、といった場合に迅速に行いには、何らかの参照手段を事前に準備しておく必要があります。

本記事ではAthena+パーティション射影を利用した準備方法について記載しています。

パーティション射影を設定することで、自動的に日付別のパーティションを作成させることができ、メンテナンスなしでオンデマンドにログの参照だけできるようになります。

前提

VPCフローログ設定

  • VPCフローログをS3に出力する設定はすでに済んでいるものとします。
  • ログのフォーマットはデフォルトの形式を利用しているものとします。
  • 日単位の出力(デフォルト)設定をしているものとします。

https://docs.aws.amazon.com/ja_jp/vpc/latest/userguide/flow-logs-s3.html

Athena初期設定

Athenaを初めて利用する場合は、以下の「設定を編集」ボタンから、結果保存先のS3バケットを指定しておく必要があります。
この設定は完了しているものとします。

Athenaでフローログを参照できるようにする

基本的には以下のドキュメントに沿った内容ですがかいつまんでいきます。

https://docs.aws.amazon.com/ja_jp/athena/latest/ug/vpc-flow-logs.html

テーブルの作成

前提のとおりの設定であれば、VPCフローログは以下のパスに保存されているはずです。

<bucket>/AWSLogs/<account_id>/vpcflowlogs/<region>/<year>/<month>/<day>/

Athenaでクエリできるようにするには、テーブルを作成する必要があります。
テーブル名は何でもよいですが、vpc_flow_logsとします。

CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol bigint,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string,
  region string,
  az_id string,
  sublocation_type string,
  sublocation_id string,
  pkt_src_aws_service string,
  pkt_dst_aws_service string,
  flow_direction string,
  traffic_path int
)
PARTITIONED BY (`date` date)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://<bucket_name>/AWSLogs/<account_id>/vpcflowlogs/<region>/'
TBLPROPERTIES ("skip.header.line.count"="1");

これでvpc_flow_logというテーブルが作成されます。
ただ、これだけでは、日付を指定した検索ができません。
パーティションが作成されていないためです。

パーティションを作成する

以下のクエリを実行して、年月日のパーティションを作成します。

ALTER TABLE vpc_flow_logs
ADD PARTITION (`date`='<YYYY>-<MM>-<dd>')
LOCATION 's3://<bucket_name>/AWSLogs/<account_id>/vpcflowlogs/<region>/<YYYY>/<MM>/<dd>';

みてわかるとおり、これは日付ごとに実行する必要があります。

具体例
ALTER TABLE vpc_flow_logs
ADD PARTITION (`date`='2024-03-01')
LOCATION 's3://knziiy-example-bucket/AWSLogs/123412341234/vpcflowlogs/ap-northeast-1/2024/03/01';

上記のADD PARTITIONを実行することで、以下のように日付を指定したクエリが可能になります。

SELECT * 
FROM vpc_flow_logs 
WHERE date = DATE('2024-03-01') 
LIMIT 100;

パーティション作成を自動化する

パーティションを毎日手動で作成するわけにはいかないため、AWS Glueを利用したり、EventBridge+Lambdaで自動的にパーティションを作成させることが考えられます。

しかしAthenaが提供しているパーティション射影(Partition Projection)を利用すると、パーティションの作成を簡単に自動化することができます。

一度テーブルを削除してします。

DROP table vpc_flow_logs;

パーティション射影が有効になるように、テーブルを作成します。

CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol bigint,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string,
  az_id string,
  sublocation_type string,
  sublocation_id string,
  pkt_src_aws_service string,
  pkt_dst_aws_service string,
  flow_direction string,
  traffic_path int
)
PARTITIONED BY (accid string, region string, day string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://<bucket_name>/AWSLogs/<account_id>/vpcflowlogs/<region>/'
TBLPROPERTIES
(
"skip.header.line.count"="1",
"projection.enabled" = "true",
"projection.region.type" = "enum",
"projection.region.values" = "<REGION_1>,<{REGION_2>",
"projection.day.type" = "date",
"projection.day.range" = "<YYYY>/<MM>/<dd>,NOW",
"projection.day.format" = "yyyy/MM/dd",
"storage.location.template" = "s3://<bucket_name>/AWSLogs/<account_id>/vpcflowlogs/${region}/${day}"
)

これで、s3://<bucket_name>/AWSLogs/<account_id>/vpcflowlogs/${region}/${day} にフローログが生成されると自動的にパーティションが作成されるようになり、あとはクエリするだけでよくなります。

クエリ例

指定した期間のすべてのフィールド

SELECT *
FROM vpc_flow_logs
WHERE day >= '2024/03/01' AND day < '2024/03/05'
ORDER BY day ASC

指定された日付から現在までのログかつ形式指定

SELECT interface_id,
       srcaddr,
       srcport,
       dstaddr,
       dstport,
       action,
       protocol,
       to_iso8601(from_unixtime(start)) AS start_time,
       to_iso8601(from_unixtime("end")) AS end_time
FROM vpc_flow_logs
WHERE DAY >= '2024/03/01'
  AND DAY <  date_format(current_date, '%Y/%m/%d')
LIMIT 1000;

指定された期間に上位 10 個の HTTPS を受信したIPを表示

SELECT SUM(packets) AS packetcount, 
       dstaddr
FROM vpc_flow_logs
WHERE dstport = 443
  AND day >= '2024/03/01'
  AND day < '2024/03/14'
GROUP BY dstaddr
ORDER BY packetcount DESC
LIMIT 10

先頭列にJSTで日時を取得

SELECT 
    from_unixtime(start_epoch + 3600 * 9) as start_jst,
    *
FROM vpc_flow_logs
WHERE day >= '2024/03/28' 
  and dstport = 443
ORDER BY start_epoch ASC

Discussion