🎍
AWS WAFV2のログを見るときによく使うクエリ集
🎍明けましておめでとうございます🎍
今年もよろしくお願いします。
今回はAWS WAFV2の調査のときによく使うクエリを紹介します。
WAFV2を運用に載せたり、調査するときAmazon Athenaで利用するクエリです。
前提
この記事の前提は以下になります。
- AWS WAFV2を使っていること
- AWS WAFV2のログをS3に保存していること
- AWS公式のAWS Glueテーブル設定をしていること
設定
ちょっと脱線するのですが、Glueに入れている公式の設定やクエリはこちらを利用しています。
CREATE EXTERNAL TABLE `waf_logs`(
`timestamp` bigint,
`formatversion` int,
`webaclid` string,
`terminatingruleid` string,
`terminatingruletype` string,
`action` string,
`terminatingrulematchdetails` array<
struct<
conditiontype:string,
location:string,
matcheddata:array<string>
>
>,
`httpsourcename` string,
`httpsourceid` string,
`rulegrouplist` array<
struct<
rulegroupid:string,
terminatingrule:struct<
ruleid:string,
action:string,
rulematchdetails:string
>,
nonterminatingmatchingrules:array<
struct<
ruleid:string,
action:string,
rulematchdetails:array<
struct<
conditiontype:string,
location:string,
matcheddata:array<string>
>
>
>
>,
excludedrules:array<
struct<
ruleid:string,
exclusiontype:string
>
>
>
>,
`ratebasedrulelist` array<
struct<
ratebasedruleid:string,
limitkey:string,
maxrateallowed:int
>
>,
`nonterminatingmatchingrules` array<
struct<
ruleid:string,
action:string
>
>,
`requestheadersinserted` string,
`responsecodesent` string,
`httprequest` struct<
clientip:string,
country:string,
headers:array<
struct<
name:string,
value:string
>
>,
uri:string,
args:string,
httpversion:string,
httpmethod:string,
requestid:string
>,
`labels` array<
struct<
name:string
>
>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://athenawaflogs/WebACL/'
ちょっとアレンジ入っているのですが、Terraformで書いた例は以下のようになります。
resource "aws_s3_bucket" "waf_log_bucket" {
# bucket名が'aws-waf-logs-'から始まらないといけない
bucket = "aws-waf-logs-waf-log"
acl = "private"
lifecycle_rule {
id = "15months-lifecycle-rule"
enabled = true
expiration {
days = 456
}
}
}
resource "aws_s3_bucket_public_access_block" "waf_log" {
bucket = aws_s3_bucket.waf_log_bucket.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
resource "aws_glue_catalog_database" "log_catalog_database" {
name = "waf-log"
}
resource "aws_glue_catalog_table" "waf_log" {
database_name = aws_glue_catalog_database.log_catalog_database.name
name = "waf_log"
parameters = {
"EXTERNAL" = "TRUE"
"projection.enabled" = "true"
"projection.day.type" = "date"
"projection.day.range" = "2022/01/01/00,NOW"
"projection.day.format" = "yyyy/MM/dd/HH"
"projection.day.interval" = "1"
"projection.day.interval.unit" = "HOURS"
"storage.location.template" = "s3://aws_s3_bucket.waf_log_bucket.name/AWSLogs/xxxxxxxxxx/WAFLogs/ap-northeast-1/waf-test/$${day}"
}
table_type = "EXTERNAL_TABLE"
partition_keys {
name = "day"
type = "string"
}
storage_descriptor {
location = "s3://aws_s3_bucket.waf_log_bucket.name/AWSLogs/xxxxxxxxxx/WAFLogs/ap-northeast-1/waf-test/"
compressed = "true"
input_format = "org.apache.hadoop.mapred.TextInputFormat"
output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
columns {
name = "timestamp"
type = "bigint"
}
columns {
name = "formatversion"
type = "int"
}
columns {
name = "webaclid"
type = "string"
}
columns {
name = "terminatingruleid"
type = "string"
}
columns {
name = "terminatingruletype"
type = "string"
}
columns {
name = "action"
type = "string"
}
columns {
name = "terminatingrulematchdetails"
type = "array<struct<conditionType:string,location:string,matchedData:array<string>>>"
}
columns {
name = "httpsourcename"
type = "string"
}
columns {
name = "httpsourceid"
type = "string"
}
columns {
name = "rulegrouplist"
type = "array<struct<ruleGroupId:string,terminatingRule:struct<ruleId:string,action:string,ruleMatchDetails:string>,nonTerminatingMatchingRules:array<struct<ruleId:string,action:string,ruleMatchDetails:array<struct<conditionType:string,location:string,matchedData:array<string>>>>>,excludedRules:string>>"
}
columns {
name = "ratebasedrulelist"
type = "array<struct<rateBasedRuleId:string,limitKey:string,maxRateAllowed:int>>"
}
columns {
name = "nonterminatingmatchingrules"
type = "array<struct<ruleId:string,action:string>>"
}
columns {
name = "requestheadersinserted"
type = "string"
}
columns {
name = "responsecodesent"
type = "string"
}
columns {
name = "httprequest"
type = "struct<clientIp:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpVersion:string,httpMethod:string,requestId:string>"
}
columns {
name = "labels"
type = "array<struct<name:string>>"
}
ser_de_info {
name = "JsonSerDe"
parameters = {
"serialization.format" = "1"
}
serialization_library = "org.openx.data.jsonserde.JsonSerDe"
}
}
}
partition等は自由に入れて頂いて問題ないです。クエリを適宜変更してください。
よく使うクエリ集
とりあえず一覧をみる
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23';
BLOCKされたものをみる
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK';
BLOCKされてものを見るときはterminatingrulematchdetails
でBLOCKされた理由をみて推測をはじめます。
httprequest
で運良くわかることもあれば、APIのLogを見に行ったりします。
BLOCKされたものなおかつ特定のURIで絞る
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httpRequest.uri = '/index.php'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;
BLOCKされたものなおかつ特定のIPで絞る
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httprequest.clientip = '153.242.5.8'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;
BLOCKされたものなおかつ特定のルールで絞る
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE groupList.terminatingRule.ruleId = 'SQLi_BODY'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
AND action = 'BLOCK'
ORDER BY timestamp;
XSS/SQLiの詳細ログを出力する
SELECT
from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST,
groupList.ruleGroupId,
groupList.terminatingRule.ruleId,
ruleMatchDetails.conditionType,
ruleMatchDetails.location,
ruleMatchDetails.matchedData,
httpRequest
FROM
waf_log,
UNNEST(terminatingRuleMatchDetails) t(ruleMatchDetails),
UNNEST(ruleGroupList) t(groupList)
WHERE action = 'BLOCK' and groupList.terminatingRule.action = 'BLOCK'
AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
ORDER BY timestamp;
運用前のチェックで使うクエリ
COUNTされたものをみる
COUNTはaction
ではなくnonTerminatingMatchingRules
に出てくるので注意。
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
ORDER BY timestamp;
COUNTされた上位100のルールを取得する
SELECT COUNT(groupList.terminatingRule.ruleId) AS count, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule), UNNEST(ruleGroupList) t(groupList)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;
COUNTされた上位100のルールとURIを取得する
SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule), UNNEST(ruleGroupList) t(groupList)
WHERE nonTermRule.action = 'COUNT' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;
BLOCKされた上位100のルールとURIを取得する
SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE action = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;
運用開始後の確認で使うクエリ
1週間でBLOCKされたURIとルールのTop100
SELECT COUNT(groupList.terminatingRule.ruleId) AS count, httpRequest.uri, groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE
"action" = 'BLOCK' AND
timestamp >= TO_UNIXTIME(cast(CURRENT_DATE as TIMESTAMP) - interval '7' day) * 1000
GROUP BY httpRequest.uri, groupList.terminatingRule.ruleId
ORDER BY count DESC
LIMIT 100;
BLOCKされた上位100のIPアドレスを取得する
SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country"
FROM waf_log
WHERE "action" = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23'
GROUP BY "httprequest"."clientip", "httprequest"."country"
ORDER BY "ipcount" DESC
LIMIT 100;
たまに使うクエリ集
AWSManagedIPReputationListでBLOCKされたIPアドレスのカウント
SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country", groupList.terminatingRule.ruleId
FROM waf_log, UNNEST(ruleGroupList) t(groupList)
WHERE "action" = 'BLOCK' AND day >= '2022/01/01/01' AND day <= '2021/01/01/23' AND groupList.terminatingRule.ruleId = 'AWSManagedIPReputationList'
GROUP BY "httprequest"."clientip", "httprequest"."country", groupList.terminatingRule.ruleId
ORDER BY "ipcount" DESC
LIMIT 100;
指定された用語(今回はhogehoge.jp)を含むリファラーの数を数える
WITH DATASET AS
(SELECT header FROM waf_log
CROSS JOIN UNNEST(httprequest.headers) AS t(header)
WHERE day >= '2022/01/01/01' AND day <= '2021/01/01/23')
SELECT COUNT(*) referer_count
FROM DATASET
WHERE LOWER(header.name)='referer' AND header.value LIKE '%hogehoge.jp%';
過去10日間に除外ルールに一致したすべての一致したIPアドレスをカウントする
WITH dataset AS
(SELECT * FROM waf_log
CROSS JOIN UNNEST(ruleGroupList) AS t(allRuleGroups))
SELECT COUNT(*) AS
count,
"httpRequest"."clientIp",
"allRuleGroups"."excludedRules",
"allRuleGroups"."ruleGroupId"
FROM dataset
WHERE allRuleGroups.excludedRules IS NOT NULL AND from_unixtime(timestamp/1000, 'Asia/Tokyo') > now() - interval '10' day
GROUP BY "httpRequest"."clientIp", "allRuleGroups"."ruleGroupId", "allRuleGroups"."excludedRules"
ORDER BY count DESC;
指定した規則の種類によってBLOCKされた上位100個のIPアドレスを抽出する
SELECT COUNT(httpRequest.clientIp) as count,
httpRequest.clientIp
FROM waf_log, UNNEST(nonTerminatingMatchingRules) t(nonTermRule)
WHERE nonTermRule.ruleid='AWS-AWSManagedRulesCommonRuleSet' AND day >= '2021/12/01/07' AND day <= '2021/12/01/08' AND action='BLOCK'
GROUP BY httpRequest.clientIp
ORDER BY count DESC
LIMIT 100;
日本から来ていてBLOCKされたログ
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, *
FROM waf_log
WHERE httpRequest.country = 'JP' AND action = 'BLOCK'
AND day >= '2021/12/01/07' AND day <= '2021/12/01/08'
ORDER BY timestamp;
国毎のログ数の集計
SELECT httpRequest.country, COUNT(httpRequest.country) AS count, day
FROM waf_log
WHERE day >= '2021/12/01/07' AND day <= '2021/12/01/08'
GROUP BY httpRequest.country
ORDER BY count DESC;
リクエストがブロックされた回数をカウントし、特定の属性でグループ化する
SELECT COUNT(*) AS
count,
webaclid,
terminatingruleid,
httprequest.clientip,
httprequest.uri
FROM waf_log
WHERE action='BLOCK' AND day >= '2021/12/01/07' AND day <= '2021/12/01/08'
GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;
他にもあった気がするけれど、思い出したら追記します。
最後に
AWS WAFV2運用でよく使っているクエリを紹介しました。
もしこれ便利だよというクエリあれば教えてください。
この記事がどなたかの役に立てれば嬉しいです。
Discussion