🎍

AWS WAFV2のログを見るときによく使うクエリ集

2022/01/06に公開

🎍明けましておめでとうございます🎍
今年もよろしくお願いします。

今回はAWS WAFV2の調査のときによく使うクエリを紹介します。
WAFV2を運用に載せたり、調査するときAmazon Athenaで利用するクエリです。

前提

この記事の前提は以下になります。

  • AWS WAFV2を使っていること
  • AWS WAFV2のログをS3に保存していること
  • AWS公式のAWS Glueテーブル設定をしていること

設定

ちょっと脱線するのですが、Glueに入れている公式の設定やクエリはこちらを利用しています。

CREATE EXTERNAL TABLE `waf_logs`(
  `timestamp` bigint,
  `formatversion` int,
  `webaclid` string,
  `terminatingruleid` string,
  `terminatingruletype` string,
  `action` string,
  `terminatingrulematchdetails` array<
                                  struct<
                                    conditiontype:string,
                                    location:string,
                                    matcheddata:array<string>
                                        >
                                     >,
  `httpsourcename` string,
  `httpsourceid` string,
  `rulegrouplist` array<
                     struct<
                        rulegroupid:string,
                        terminatingrule:struct<
                           ruleid:string,
                           action:string,
                           rulematchdetails:string
                                               >,
                        nonterminatingmatchingrules:array<
                                                       struct<
                                                          ruleid:string,
                                                          action:string,
                                                          rulematchdetails:array<
                                                               struct<
                                                                  conditiontype:string,
                                                                  location:string,
                                                                  matcheddata:array<string>
                                                                     >
                                                                  >
                                                               >
                                                            >,
                        excludedrules:array<
                                         struct<
                                            ruleid:string,
                                            exclusiontype:string
                                               >
                                            >
                           >
                       >,
  `ratebasedrulelist` array<
                        struct<
                          ratebasedruleid:string,
                          limitkey:string,
                          maxrateallowed:int
                              >
                           >,
  `nonterminatingmatchingrules` array<
                                  struct<
                                    ruleid:string,
                                    action:string
                                        >
                                     >,
  `requestheadersinserted` string,
  `responsecodesent` string,
  `httprequest` struct<
                      clientip:string,
                      country:string,
                      headers:array<
                                struct<
                                  name:string,
                                  value:string
                                      >
                                   >,
                      uri:string,
                      args:string,
                      httpversion:string,
                      httpmethod:string,
                      requestid:string
                      >,
  `labels` array<
             struct<
               name:string
                   >
                  >
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
 'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://athenawaflogs/WebACL/'

ちょっとアレンジ入っているのですが、Terraformで書いた例は以下のようになります。

resource "aws_s3_bucket" "waf_log_bucket" {
  # bucket名が'aws-waf-logs-'から始まらないといけない
  bucket = "aws-waf-logs-waf-log"
  acl    = "private"

  lifecycle_rule {
    id      = "15months-lifecycle-rule"
    enabled = true

    expiration {
      days = 456
    }
  }
}

resource "aws_s3_bucket_public_access_block" "waf_log" {
  bucket                  = aws_s3_bucket.waf_log_bucket.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

resource "aws_glue_catalog_database" "log_catalog_database" {
  name = "waf-log"
}

resource "aws_glue_catalog_table" "waf_log" {
  database_name = aws_glue_catalog_database.log_catalog_database.name
  name          = "waf_log"
  parameters = {
    "EXTERNAL"                     = "TRUE"
    "projection.enabled"           = "true"
    "projection.day.type"          = "date"
    "projection.day.range"         = "2022/01/01/00,NOW"
    "projection.day.format"        = "yyyy/MM/dd/HH"
    "projection.day.interval"      = "1"
    "projection.day.interval.unit" = "HOURS"
    "storage.location.template"    = "s3://aws_s3_bucket.waf_log_bucket.name/AWSLogs/xxxxxxxxxx/WAFLogs/ap-northeast-1/waf-test/$${day}"
  }
  table_type = "EXTERNAL_TABLE"

  partition_keys {
    name = "day"
    type = "string"
  }

  storage_descriptor {
    location      = "s3://aws_s3_bucket.waf_log_bucket.name/AWSLogs/xxxxxxxxxx/WAFLogs/ap-northeast-1/waf-test/"
    compressed    = "true"
    input_format  = "org.apache.hadoop.mapred.TextInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"

    columns {
      name = "timestamp"
      type = "bigint"
    }
    columns {
      name = "formatversion"
      type = "int"
    }
    columns {
      name = "webaclid"
      type = "string"
    }
    columns {
      name = "terminatingruleid"
      type = "string"
    }
    columns {
      name = "terminatingruletype"
      type = "string"
    }
    columns {
      name = "action"
      type = "string"
    }
    columns {
      name = "terminatingrulematchdetails"
      type = "array<struct<conditionType:string,location:string,matchedData:array<string>>>"
    }
    columns {
      name = "httpsourcename"
      type = "string"
    }
    columns {
      name = "httpsourceid"
      type = "string"
    }
    columns {
      name = "rulegrouplist"
      type = "array<struct<ruleGroupId:string,terminatingRule:struct<ruleId:string,action:string,ruleMatchDetails:string>,nonTerminatingMatchingRules:array<struct<ruleId:string,action:string,ruleMatchDetails:array<struct<conditionType:string,location:string,matchedData:array<string>>>>>,excludedRules:string>>"
    }
    columns {
      name = "ratebasedrulelist"
      type = "array<struct<rateBasedRuleId:string,limitKey:string,maxRateAllowed:int>>"
    }
    columns {
      name = "nonterminatingmatchingrules"
      type = "array<struct<ruleId:string,action:string>>"
    }
    columns {
      name = "requestheadersinserted"
      type = "string"
    }
    columns {
      name = "responsecodesent"
      type = "string"
    }
    columns {
      name = "httprequest"
      type = "struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string>"
    }
    columns {
      name = "labels"
      type = "array<struct<name:string>>"
    }

    ser_de_info {
      name = "JsonSerDe"
      parameters = {
        "serialization.format" = "1"
      }
      serialization_library = "org.openx.data.jsonserde.JsonSerDe"
    }
  }
}

partition等は自由に入れて頂いて問題ないです。クエリを適宜変更してください。

よく使うクエリ集

とりあえず一覧をみる

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23';

BLOCKされたものをみる

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK';

BLOCKされてものを見るときはterminatingrulematchdetailsでBLOCKされた理由をみて推測をはじめます。
httprequestで運良くわかることもあれば、APIのLogを見に行ったりします。

BLOCKされたものなおかつ特定のURIで絞る

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httpRequest.uri = '/index.php'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;

BLOCKされたものなおかつ特定のIPで絞る

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httprequest.clientip = '153.242.5.8'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;

BLOCKされたものなおかつ特定のルールで絞る

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE groupList.terminatingRule.ruleId = 'SQLi_BODY'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;

XSS/SQLiの詳細ログを出力する

SELECT
  from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST,
  groupList.ruleGroupId,
  groupList.terminatingRule.ruleId,
  ruleMatchDetails.conditionType,
  ruleMatchDetails.location,
  ruleMatchDetails.matchedData,
  httpRequest
FROM
  waf_log,
  UNNEST(terminatingRuleMatchDetails) t(ruleMatchDetails),
  UNNEST(ruleGroupList) t(groupList)
WHERE action = 'BLOCK' and groupList.terminatingRule.action = 'BLOCK'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
ORDER BY timestamp;

運用前のチェックで使うクエリ

COUNTされたものをみる

COUNTはactionではなくnonTerminatingMatchingRulesに出てくるので注意。

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
ORDER BY timestamp;

COUNTされた上位100のルールを取得する

SELECT COUNT(groupList.terminatingRule.ruleId) AS count, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule), UNNEST(ruleGroupList) t(groupList)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;

COUNTされた上位100のルールとURIを取得する

SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule), UNNEST(ruleGroupList) t(groupList)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;

BLOCKされた上位100のルールとURIを取得する

SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE action = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;

運用開始後の確認で使うクエリ

1週間でBLOCKされたURIとルールのTop100

SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE
  "action" = 'BLOCK' AND
  timestamp >= TO_UNIXTIME(cast(CURRENT_DATE as TIMESTAMP) - interval '7' day) * 1000
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;

BLOCKされた上位100のIPアドレスを取得する

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country"
FROM waf_log
WHERE "action" = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY "httprequest"."clientip", "httprequest"."country"
ORDER BY "ipcount" DESC
LIMIT 100;

たまに使うクエリ集

AWSManagedIPReputationListでBLOCKされたIPアドレスのカウント

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country", groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE "action" = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23' AND groupList.terminatingRule.ruleId = 'AWSManagedIPReputationList'
GROUP BY "httprequest"."clientip", "httprequest"."country", groupList.terminatingRule.ruleId
ORDER BY "ipcount" DESC
LIMIT 100;

指定された用語(今回はhogehoge.jp)を含むリファラーの数を数える

WITH DATASET AS 
  (SELECT header FROM waf_log
    CROSS JOIN UNNEST(httprequest.headers) AS t(header)
    WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23')
SELECT COUNT(*) referer_count 
FROM DATASET 
WHERE LOWER(header.name)='referer' AND header.value LIKE '%hogehoge.jp%';

過去10日間に除外ルールに一致したすべての一致したIPアドレスをカウントする

WITH dataset AS 
  (SELECT * FROM waf_log
    CROSS JOIN UNNEST(ruleGroupList) AS t(allRuleGroups))
SELECT COUNT(*) AS
  count, 
  "httpRequest"."clientIp", 
  "allRuleGroups"."excludedRules",
  "allRuleGroups"."ruleGroupId"
FROM dataset 
WHERE allRuleGroups.excludedRules IS NOT NULL AND from_unixtime(timestamp/1000, 'Asia/Tokyo') > now() - interval '10' day
GROUP BY "httpRequest"."clientIp", "allRuleGroups"."ruleGroupId", "allRuleGroups"."excludedRules"
ORDER BY count DESC;

指定した規則の種類によってBLOCKされた上位100個のIPアドレスを抽出する

SELECT COUNT(httpRequest.clientIp) as count,
httpRequest.clientIp
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule)
WHERE nonTermRule.ruleid='AWS-AWSManagedRulesCommonRuleSet' AND day >= '2021/12/01/07' AND day <= '2021/12/01/08' AND action='BLOCK'
GROUP BY httpRequest.clientIp
ORDER BY count DESC
LIMIT 100;

日本から来ていてBLOCKされたログ

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httpRequest.country = 'JP' AND action = 'BLOCK'
AND day >= '2021/12/01/07' AND day <= '2021/12/01/08'
ORDER BY timestamp;

国毎のログ数の集計

SELECT httpRequest.country, COUNT(httpRequest.country) AS count, day
FROM waf_log
WHERE day >= '2021/12/01/07' AND day <= '2021/12/01/08'
GROUP BY httpRequest.country
ORDER BY count DESC;

リクエストがブロックされた回数をカウントし、特定の属性でグループ化する

SELECT COUNT(*) AS
  count,
  webaclid,
  terminatingruleid,
  httprequest.clientip,
  httprequest.uri
FROM waf_log
WHERE action='BLOCK' AND day >= '2021/12/01/07' AND day <= '2021/12/01/08'
GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;

他にもあった気がするけれど、思い出したら追記します。

最後に

AWS WAFV2運用でよく使っているクエリを紹介しました。
もしこれ便利だよというクエリあれば教えてください。

この記事がどなたかの役に立てれば嬉しいです。

GitHubで編集を提案

Discussion