CloudWatch × Lambda で攻撃IPを自動でブラックリストに入れる
はじめに
WAFを導入しているチームは多いと思いますが、
基本的に ブラックリスト方式 & default action = "allow" の WAF を採用していると思います。
しかし、この構成では どんなにルールを磨いても完全防御は不可能。
巧妙なリクエストは、わずかな隙を突いて本番環境に入り込んできます。
SQLインジェクションなどを狙った攻撃は日常的にきますが、
マネージドルールやカスタムルールで全てを防げる訳ではありません。
攻撃してきたIPアドレスに対しては、ブラックリストに追加でブロックするのは最も効果的です。
しかし、そのアクセスが 業務時間外や休日 に来ることがよくあります。
それによって対応が遅れて被害が出てしまう可能性がありますし、
その度に休日稼働をしなければいけません。
休日にアラートが鳴らないか怯えながら(?)過ごしていたのですが、
今回の自動でIPをブラックリストに追加する方法によって、その悩みが解消されました。
構築
今回はLambdaで自動追加フローを構築していきます。
アーキテクチャは以下の通り

CloudWatchがログの中から一致した情報をLambdaに送信
それを受け取ったLambdaがWAFのブラックリストに追加する流れです。
1. IAMロール作成
Lambda関数を作成の前に IAMロールを作成していく
ロールには以下の内容のポリシーを作成
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"wafv2:GetIPSet",
"wafv2:ListIPSets",
"wafv2:UpdateIPSet"
],
"Resource": "<WAFのBlackList ARN>"
}
]
}
2. Lambda 作成
以下のコードを作成
import json
import boto3
import gzip
import base64
import os
waf_client = boto3.client("wafv2", region_name="ap-northeast-1")
IP_SET_NAME = os.environ["IP_SET_NAME"]
IP_SET_ID = os.environ["IP_SET_ID"]
SCOPE = os.environ["SCOPE"]
def lambda_handler(event, context):
print(f"Received event: {json.dumps(event)}")
try:
# CloudWatch Logs からのデータをデコード
compressed_data = base64.b64decode(event["awslogs"]["data"])
log_data = gzip.decompress(compressed_data)
log_json = json.loads(log_data)
print("Decoded CloudWatch Logs data:", json.dumps(log_json, indent=2))
# ログの中からクライアントIPを抽出
for log_event in log_json["logEvents"]:
message = json.loads(log_event["message"])
ip_address = message.get("httpRequest", {}).get("clientIp")
if ip_address:
update_ip_set(ip_address)
except KeyError as e:
print(f"KeyError: {e}")
except Exception as e:
print(f"Unexpected error: {str(e)}")
def update_ip_set(ip_address):
"""WAF の IP Set に IP を追加"""
try:
response = waf_client.get_ip_set(Name=IP_SET_NAME, Scope=SCOPE, Id=IP_SET_ID)
lock_token = response["LockToken"]
addresses = response["IPSet"]["Addresses"]
if f"{ip_address}/32" not in addresses:
addresses.append(f"{ip_address}/32")
waf_client.update_ip_set(
Name=IP_SET_NAME,
Scope=SCOPE,
Id=IP_SET_ID,
Addresses=addresses,
LockToken=lock_token
)
print(f"Added {ip_address} to {IP_SET_NAME}")
else:
print(f"{ip_address} is already in {IP_SET_NAME}")
except Exception as e:
print(f"Error updating IP Set: {str(e)}")
この内容で Lambda 関数用のファイルを作成
touch lambda_function.py
cat <<EOF > lambda_function.py
コード内容
EOF
今回 pythonでAWSの操作を行うので、boto3関連のライブラリもインストール
mkdir package
pip install --target package boto3
cp lambda_function.py package/
cd package
zip -r ../lambda_function.zip .
ここまで作成した ファイルとIAMロールをもとに Lambda デプロイ
aws lambda create-function \
--function-name RateLimitAddBlacklistHandler \
--runtime python3.13 \
--role <作成したRole> \
--handler lambda_function.lambda_handler \
--zip-file fileb://lambda_function.zip
Lambda環境変数の設定もする
IP_SET_NAME=<BlackList名>
IP_SET_ID=<BlackList ID>
SCOPE=REGIONAL # WAFをALBに設定している場合
3. CloudWatch サブスクリプションフィルター 作成
サブスクリプションフィルターは Kinesis や Lambda などをターゲットにし、
特定のログにマッチするイベントを送信できる機能
まずは CloudWatch -> Lambda へのアクセス許可のため リソースベースポリシー作成
aws lambda add-permission \
--function-name RateLimitAddBlacklistHandler \
--statement-id AllowCloudWatchLogsInvoke \
--action "lambda:InvokeFunction" \
--principal "logs.amazonaws.com" \
--source-arn "arn:aws:logs:<region>:<account>:log-group:<ロググループ名>:*" \
--source-account "<account>"
ポリシーを付与したあと、サブスクリプションフィルター作成
aws logs put-subscription-filter \
--log-group-name "<ロググループ名>" \
--filter-name "<任意のフィルタ名>" \
--filter-pattern '{ $.terminatingRuleId = "<WAFのルール名>" }' \
--destination-arn "<Lambda ARN>"
これにより、<WAFのルール名> で検知されたIPがLambdaに送信され
Lambdaにより IP Set にIPが追加されるようになる
動作テスト
IPが追加されるかテストをする
今回はテスト用のサーバを立てて サイトに WAFのルールに検知させるようなアクセスを送る
検証用サーバのEC2インスタンスを作成
海外からのサーバのアクセス攻撃と仮装し、オレゴンリージョン(us-east-2) にEC2作成
- OS : Amazon Linux2
- VPC : デフォルト使用
- サブネット : IGWがアタッチされているRouteTable の PublicSubnet
- ロール : EC2のインスタンスプロファイル
- セキュリティグループ : デフォルト使用
- キーペア : SSM使うので作成しない
作成したインスタンス
IP : 34.222.139.18
このIPをブラックリストに追加するテストを行う
テストツールインストール
Apache Bench(アパッチ ベンチ) でテストを行う
もろもろツールのインストール
sudo yum install -y httpd-tools
sudo yum install -y curl
以下のコマンドを打つ
ab -n 250 -c 20 https://example.jp/
10のクライアントで合計で250回のリクエストを送信するという意味
テスト実施
実際にテストを行う
実行前:ブラックリストに <span style="color: rgb(0, 0, 0)">34.222.139.18 </span>はなし

SSMでインスタンスにログインし例のコマンドを実行

250のリクエスト送信に成功
(これをもう1回ほど繰り返せば十分)

無事に追加されていた
所感
- DDosにある一定の効果は期待できそう
- IP Set の上限は基本 10,000 ブロックが多ければ良いものではないので サブスクリプションフィルターの設定の仕方をよく考える
- 正常なリクエストもブロックする可能性も考慮しなければいけない
- あらゆる攻撃パターンを想定し、しかるべきIPアドレスをブロックさせ、
誤検知・誤制御を限りなく減らす運用を目指す - これで休日は安心して寝れそう
Discussion