1日で作るサプライチェーン攻撃対策!運用死しないコスト「ほぼゼロ」の通信監視
1. はじめに
こんにちは!はるぷです!サプライチェーン攻撃の対策してますか??
最近、サプライチェーン攻撃の話題が出るたび、「うちのサービスは本当に大丈夫だろうか」と社内がざわつくことはありませんか。依存ライブラリの棚卸しやSBOM整備に取り組んでいても、入り口が多すぎて全体を把握するのは至難の業です。
加えて、社内外から「サプライチェーン攻撃対策大丈夫ですよね?」と聞かれたとき、担当者として胸がキューっとなる辛い状態になりがちです。何かしらビシッと説明できるシステム的な一手が欲しい…。

そこで今回は、 「万が一、侵入を許してしまった後の早期発見」 に特化した対策を紹介します。
サプライチェーン攻撃によって不正コードが混入すると、多くの場合、外部のC&Cサーバ(攻撃者の司令塔)と通信を開始します。つまり、この「意図しない外向きの通信」をいち早くキャッチできれば、被害が拡大する前に食い止めることが可能になります。
しかし、従来のフルパケットキャプチャよる監視やプロセス監視などは、導入・運用ともに非常に高コストであり、多くのケースでうまく運用することは困難かと思います。高機能であればあるほど、ログが詳細であればあるほど全てを駆使して監視せねば、という気持ちになり、結果溢れてほとんど見ないという状態になってしまいます。特に、今すぐ何か対策を打ちたい!という状況に対しては考えることが多すぎてかなり難易度が高いと思います。
そこで、本記事では、「破綻して運用死しない」ということに重点をおいた通信監視システムの構築・運用方法を紹介します。具体的には、 「DNSログ」と「VPCフローログ」を突き合わせる ことで、以下の3点を実現する軽量な監視システムを紹介します。この「DNSログとVPCフローログを突き合わせる」という手法は、外向き通信監視の「正攻法」です。今回はAWSでの構築例を紹介しますが、GCPなどの他環境でも全く同じ考え方で応用が可能です。
- 構築が容易: AWSマネージドサービスのみで完結。
- 低コスト: 分析時のみ料金が発生するAthenaを利用。
- 運用負荷ほぼゼロ: 基本的には手放し、目標は最低でも1日数分程度に収まる作業量の運用。
実際に弊社のシステムで導入済みで、大きな変更がない限りほぼ新しい宛先への通信が発生しない状態で監視運用を実現できています。
2. システムの設計思想
本システムは「100点満点の完璧な検知」ではなく、「80点の検知を継続する」 ことに重点を置いています。
2.1 システム全体構成
監視の要となるログ集約・照合にはAthena(1.3Gくらいのスキャンで約1円)を活用します。ホワイトリスト(除外リスト)の登録はDynamoDBを利用し、更新作業はWebコンソールとCloudShell(aws dynamodb)で行う方法を紹介します。できる限り事前設定が少ない方法で作成しますが、慣れた操作環境に読み替えてもらえればと思います。
- S3: クラウド上ストレージ。VPCやDNSから吐き出されるログを安全に保管しておく場所として活用。
- Athena: S3に保存されたログファイルを、データベースにわざわざ移し替えることなく、そのままテーブルに見立ててSQLで解析できる便利な仕組み。S3に保管したデータの解析に活用。
- DynamoDB: 高速に読み書きできるマネージドのデータベース。許可していい通信をあらかじめ書いておく『ホワイトリスト』として活用。
処理の流れ(要約)
- Route 53 Resolverクエリログから、名前解決したドメインと回答IP(
answers)の関係を取得する。 - VPC Flow Logsで観測された宛先IPと突き合わせ、DNSログ上で許可ドメイン由来と判定できるIPは除外する。
- EventBridgeで一定間隔(例:1時間ごと)にLambdaを実行する。
- DynamoDBのホワイトリストを取得する。
- Athenaで上記の情報を結合して「未確認の通信」を抽出、Slackに通知する。
- 通知結果を見て問題なければDynamoDBコンソールまたはCloudShell、ドメイン/IPをホワイトリストに登録し、以降のLambda実行では通知されないようになる。
3. 環境構築手順
3.1 ログの集約(S3 と Athena)
S3 バケットの作成
分析対象ログを置くS3バケットを作成します(例:my-network-log)。デフォルト設定で問題ありませんが、非公開になっていることは確認しておきましょう。以降の手順では バケット名、アカウント ID、リージョン、PC ID を自環境の値に読み替えてください。
Route 53 Resolver クエリログの設定
- Route 53 コンソール左メニューから [VPC リゾルバー] > [クエリログの記録] を選択
-
[クエリログ設定を作成] をクリック
- 送信先: 「S3 バケット」を選び、ログ保存用バケットを指定
-
VPC を追加: 監視したいVPCを選択
これで「どのドメインを引いたか」がS3に蓄積されます。クエリログ設定の名前は xxx_vpc_querylog など、自分が分かりやすい名前をつけておきましょう。
VPC Flow Logs の設定
- VPC コンソールで監視対象の VPC を選択
- 下部タブ [フローログ] > [フローログの作成] をクリック
- 送信先: Route 53と同じ、または別のS3バケットを指定
-
形式: Athenaで解析しやすいよう AWS デフォルト形式 のまま設定
これで「どのIPと通信したか」がS3に蓄積されます。フローログ名についても xxx_vpc_flowlog など、自分が分かりやすい名前をつけておきましょう。
各ログが保存されていることを確認
s3://[バケット名]/AWSLogs/[アカウントID]/ 配下に各ログが保存されていることを確認します。デフォルトだと、それぞれ、vpcdnsquerylogs 、 vpcflowlogs になっていると思いますが、異なっていたら以降の手順では読み替えてください。

Athena:クエリエディタと結果出力先
- Athenaコンソールを開き、「使用を開始」から 「Athena コンソールでデータをクエリする」 を選び、「クエリエディタを開く」 をクリック
- クエリエディタ上部の [クエリ設定] > [管理] で、クエリ結果の場所 を指定(ログ本体と分けるなら
s3://[バケット名]/athena-results/など。未設定だと SQL が実行できません)
データベースと外部テーブルの作成
[エディタ] タブで、まずデータベースを作成します。ここでは monitoring_db という名前にします。
CREATE DATABASE IF NOT EXISTS monitoring_db;

左の「データベース」で monitoring_db を選択した上で、以下の2つのSQL文を LOCATIONおよび storage.location.template を自環境のS3パスに書き換えて 実行します。
VPC Flow Logs用テーブル(例)
クリックで展開
CREATE EXTERNAL TABLE `vpc_flow_logs`(
`version` int,
`account_id` string,
`interface_id` string,
`srcaddr` string,
`dstaddr` string,
`srcport` int,
`dstport` int,
`protocol` bigint,
`packets` bigint,
`bytes` bigint,
`start_time` bigint,
`end_time` bigint,
`action` string,
`log_status` string)
PARTITIONED BY (`date` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://[バケット名]/AWSLogs/[アカウント]/vpcflowlogs/[リージョン]'
TBLPROPERTIES (
'skip.header.line.count'='1',
'projection.enabled' = 'true',
'projection.date.type' = 'date',
'projection.date.range' = '2026/01/01,NOW',
'projection.date.format' = 'yyyy/MM/dd',
'projection.date.interval' = '1',
'projection.date.interval.unit' = 'DAYS',
'storage.location.template' = 's3://[バケット名]/AWSLogs/[アカウントID]/vpcflowlogs/[リージョン]/${date}'
);
Route 53クエリログ用テーブル(例)
クリックで展開
CREATE EXTERNAL TABLE `route53_query_logs`(
`timestamp` string,
`version` string,
`account_id` string,
`region` string,
`vpc_id` string,
`query_timestamp` string,
`query_name` string,
`query_type` string,
`query_class` string,
`rcode` string,
`answers` array<struct<Rdata:string,Type:string,Class:string>>,
`srcaddr` string,
`srcport` int,
`transport` string,
`srcids` struct<instance:string,resolver_endpoint:string>)
PARTITIONED BY (`date` string)
ROW FORMAT SERDE
'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://[バケット名]/AWSLogs/[アカウントID]/vpcdnsquerylogs/[VPC ID]/'
TBLPROPERTIES (
'projection.enabled' = 'true',
'projection.date.type' = 'date',
'projection.date.range' = '2026/01/01,NOW',
'projection.date.format' = 'yyyy/MM/dd',
'projection.date.interval' = '1',
'projection.date.interval.unit' = 'DAYS',
'storage.location.template' = 's3://[バケット名]/AWSLogs/[アカウントID]/vpcdnsquerylogs/[VPC ID]/${date}'
);
パーティションの読み込み
日付フォルダ配下のデータを認識させるため、作成後に一度実行します。実行しなくても読めましたが、LLMに聞いたらやった方がいいと言われたのでやります。
MSCK REPAIR TABLE vpc_flow_logs;
MSCK REPAIR TABLE route53_query_logs;
左のテーブル一覧に表示され、SELECTで中身が確認できれば、このステップは完了です。
3.2 ホワイトリストの管理
ホワイトリストは DynamoDB に置く構成で作成します。
テーブル設計(概念)
- パーティションキー相当: ドメインまたはIP(手順では
target_value) - 種別:
DOMAINまたはIP(手順ではソートキーtypeとしても利用可能) - ステータス:
Allowedなど
DynamoDB テーブルの作成
- DynamoDB コンソールで 左メニューの[テーブル]を選択し、 [テーブルの作成] を選択
-
テーブル名:
CommunicationWhitelistを指定 -
パーティションキー:
target_value(文字列)—ドメイン名またはIPを格納 -
ソートキー:
type(文字列)—DOMAIN/IPを区別するために使用(設計により省略可だが、本手順ではソートキーありを推奨) - テーブル設定はデフォルトで [テーブルの作成] を実行
3.3 ログの突き合わせと抽出(Lambda)
定期実行(EventBridge) するLambdaを作成します。※実際に定期実行するのはチューニングが終わってからで良いです。
Lambda の役割
- Athena を実行し、直近の VPC Flow Logs(宛先IP) と Route 53 ログ(回答 IP およびクエリ名) を JOIN し、「ドメイン名(表示用)—宛先IP」の関係で未承認通信を抽出
- DynamoDB のホワイトリストを参照し、登録済みドメイン/IPを除外
- 残った 未確認の通信 を Slack に通知(運用設計としては、同内容を S3 にJSON/CSVで追記する処理を足すことも可能)
- EventBridgeで定期的に実行し継続的な監視を実現(
rate(1 hour)または毎時0分に合わせるならcron(0 * * * ? *)を指定)
Lambda 関数コード例(Python 3.x / boto3・標準ライブラリのみ)
ちょっと長いですが、以下にLambdaのコードを示します。 monitoring_network_log などの名前で作成してください。環境依存のCIDRや除外条件がSQLに含まれています。自社ネットワーク設計に合わせて必ず見直し、REGION / ATHENA_DB / DYNAMODB_TABLE / ATHENA_OUTPUT / SLACK_URL / S3パスを置き換えてください。また、チューニングの結果に合わせて必要な除外条件等を記載してください。
若干SQL文が複雑なので、内容を変更する際にはLLMに相談しながら実施すると良いのではないかと思います。
クリックで展開
import json
import os
import re
import time
import traceback
import urllib.request
from datetime import datetime, timedelta
from string import Template
import boto3
REGION = "[リージョン]"
ATHENA_DB = "monitoring_db"
DYNAMODB_TABLE = "CommunicationWhitelist"
ATHENA_OUTPUT = "s3://[バケット名]/athena-results/"
SLACK_URL = os.environ.get("SLACK_WEBHOOK_URL", "https://hooks.slack.com/services/[WebhookのURL]")
athena = boto3.client("athena", region_name=REGION)
dynamo = boto3.resource("dynamodb", region_name=REGION)
_DATE_RE = re.compile(r"^\d{4}/\d{2}/\d{2}$")
# 例として 10.0.0.0/16 相当の送信元・除外レンジを記載(自VPCの CIDR / NAT / 監視対象サブネットに置換すること)
_SQL_OUTBOUND = Template("""
WITH normalized_dns AS (
SELECT
srcaddr AS dns_srcaddr,
CASE
WHEN query_name LIKE '%.in-addr.arpa' OR query_name LIKE '%.in-addr.arpa.' THEN 'reverse-dns-lookup.arpa'
WHEN regexp_like(query_name, '^ip-[0-9-]+') THEN '*.compute.internal'
ELSE rtrim(query_name, '.')
END as display_name,
COALESCE(ans.Rdata, 'UNRESOLVED_' || CAST(rcode AS VARCHAR)) as resolved_ip
FROM route53_query_logs
LEFT JOIN UNNEST(answers) AS t(ans) ON TRUE
WHERE date IN ($dns)
AND ((ans.Type IS NOT NULL AND ans.Type = 'A') OR cardinality(answers) = 0)
),
allowed_ips AS (
SELECT DISTINCT resolved_ip FROM normalized_dns
WHERE display_name IN ($wl) AND resolved_ip IS NOT NULL AND resolved_ip <> ''
),
pure_source AS (
SELECT srcaddr, dstaddr FROM vpc_flow_logs
WHERE date IN ($vpc) AND action = 'ACCEPT'
AND srcport NOT IN (22, 80, 443) AND dstport NOT IN (123)
-- 特定のサーバを除外する場合ここに書く
-- AND srcaddr <> '10.0.0.1'
),
filtered_traffic AS (
SELECT srcaddr, dstaddr FROM pure_source
WHERE srcaddr LIKE '10.0.%' AND dstaddr NOT LIKE '10.%' AND dstaddr NOT LIKE '172.16.%'
AND dstaddr NOT LIKE '192.168.%' AND dstaddr <> '169.254.169.254'
),
classified_traffic AS (
SELECT srcaddr, dstaddr,
-- 例えば特定のサーバからの、特定のAWSのレンジ・特定のサービスは除外するなどの場合正規表現で記載
(regexp_like(dstaddr, '^(x\\.x\\.x\\.x)')) as is_known_service,
(regexp_like(dstaddr, '^(x\\.x\\.x\\.x)')) as is_known_service_for_filtered_target,
(srcaddr LIKE '10.0.xx.%') as is_filter_target
FROM filtered_traffic
)
SELECT DISTINCT 'CRITICAL_UNKNOWN_OUTBOUND' as reason,
COALESCE(dn.display_name, t.dstaddr, dns_srcaddr) as detail
FROM classified_traffic t
LEFT JOIN allowed_ips a ON t.dstaddr = a.resolved_ip
LEFT JOIN normalized_dns dn ON t.dstaddr = dn.resolved_ip
WHERE a.resolved_ip IS NULL AND t.dstaddr NOT IN ($wl)
AND ((t.is_filter_target AND NOT t.is_known_service_for_filtered_target AND NOT t.is_known_service)
OR (NOT t.is_filter_target AND NOT t.is_known_service))
-- 特定のサーバを除外する場合ここに書く
-- AND dns_srcaddr <> '10.0.0.1'
""")
_SQL_DNS_FAILURE = Template("""
WITH normalized_dns AS (
SELECT
CASE
WHEN query_name LIKE '%.in-addr.arpa' OR query_name LIKE '%.in-addr.arpa.' THEN 'reverse-dns-lookup.arpa'
WHEN regexp_like(query_name, '^ip-[0-9-]+') THEN '*.compute.internal'
ELSE rtrim(query_name, '.')
END as display_name, rcode
FROM route53_query_logs
WHERE date IN ($dns)
AND (rcode = 'NXDOMAIN' OR (rcode = 'NOERROR' AND cardinality(answers) = 0))
-- 特定のサーバを除外する場合ここに書く
-- AND srcaddr <> '10.0.0.1'
)
SELECT DISTINCT 'DNS_RESOLUTION_FAILURE' as reason, display_name as detail
FROM normalized_dns
WHERE display_name NOT IN ($wl)
AND NOT regexp_like(display_name, '^[0-9]{8}-[0-9]{4}-[0-9.]+.*')
""")
def _sql_partition_date_in_list(dates):
for d in dates:
if not _DATE_RE.match(d):
raise ValueError(f"partition date must be yyyy/MM/dd from strftime, got: {d!r}")
return ", ".join("'" + d.replace("'", "''") + "'" for d in dates)
def _execute_athena(query, params=None):
kw = dict(
QueryString=query,
QueryExecutionContext={"Database": ATHENA_DB},
ResultConfiguration={"OutputLocation": ATHENA_OUTPUT},
)
if params:
kw["ExecutionParameters"] = [str(p) for p in params]
qid = athena.start_query_execution(**kw)["QueryExecutionId"]
while True:
st = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"]["State"]
if st in ("SUCCEEDED", "FAILED", "CANCELLED"):
break
time.sleep(2)
if st != "SUCCEEDED":
r = athena.get_query_execution(QueryExecutionId=qid)["QueryExecution"]["Status"].get("StateChangeReason")
raise RuntimeError(f"Athena query failed: {r}")
rows = athena.get_query_results(QueryExecutionId=qid)["ResultSet"]["Rows"][1:]
return [
f"{r['Data'][0].get('VarCharValue', '')}: {r['Data'][1].get('VarCharValue', '')}"
for r in rows
]
def _slack_post(text):
req = urllib.request.Request(
SLACK_URL, data=json.dumps({"text": text}).encode(), headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req):
pass
def _append_alerts(dst, rows, skip=None):
for a in rows:
reason, detail = a.split(": ", 1)
dst.append(f"*{reason}*: `{detail.rstrip('.').replace('.', '[.]')}`")
def lambda_handler(event, context):
try:
now = datetime.utcnow()
dns_dates = [(now - timedelta(days=i)).strftime("%Y/%m/%d") for i in range(3)]
vpc_dates = [(now - timedelta(days=i)).strftime("%Y/%m/%d") for i in range(2)]
dns_lit, vpc_lit = _sql_partition_date_in_list(dns_dates), _sql_partition_date_in_list(vpc_dates)
raw_wl = {i["target_value"] for i in dynamo.Table(DYNAMODB_TABLE).scan().get("Items", [])}
wl = ["__WHITELIST_EMPTY__"] if not raw_wl else [v if isinstance(v, str) else str(v) for v in raw_wl]
wl_ph = ", ".join(["CAST(? AS VARCHAR)"] * len(wl))
wl_params = list(wl) * 2
outbound = _SQL_OUTBOUND.substitute(dns=dns_lit, vpc=vpc_lit, wl=wl_ph)
dns_fail = _SQL_DNS_FAILURE.substitute(dns=dns_lit, wl=wl_ph)
new_alerts = []
_append_alerts(new_alerts, _execute_athena(outbound, wl_params))
_append_alerts(new_alerts, _execute_athena(dns_fail, list(wl)))
if new_alerts:
n = len(new_alerts)
head = f"🚨 【警告】未承認の外向き通信を検知しました(合計: {n} 件)\n"
if n > 20:
head += "※件数が多いため、先頭の20件のみ表示しています。\n"
_slack_post(head + "\n".join(new_alerts[:20]))
return {"status": "success", "found_count": len(new_alerts)}
except Exception as e:
try:
_slack_post(f"❌ 【致命的エラー】監視 Lambda が失敗しました。\n\n```\n{traceback.format_exc()}\n```")
except Exception as err:
print(f"Failed to send error notification to Slack: {err}")
raise e
補足:
is_known_service / is_known_service_for_filtered_target は必要に応じて除外する対象を記載してください(後述の初期チューニングの箇所で記載します)。実運用では、許容するクラウド・CDNの宛先IP帯を regexp_like 等で定義しないとノイズが増えます。送信元の 10.0.* なども 自社 VPC の設計に合わせて置換してください。
また、 *.compute.internal で除外している箇所は、内部的なホスト名の命名規則のレコードの除外です。各自の環境に合わせて設定してください。
上記サンプルは Slack 通知までを行っています。運用で JSON/CSV を S3 に保管したい場合は、例えば new_alerts を組み立てた直後に以下のように 同じバケットの別プレフィックスへ put_object します(実行ロールに対象プレフィックスへの s3:PutObject が必要です)。
クリックで展開
# lambda_handler 内の new_alerts 生成後の例(import はファイル先頭に boto3 済み)
if new_alerts:
s3 = boto3.client("s3", region_name=REGION)
findings_key = f"monitoring-findings/{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}.json"
s3.put_object(
Bucket="[バケット名]",
Key=findings_key,
Body=json.dumps({"alerts": new_alerts}, ensure_ascii=False).encode("utf-8"),
ContentType="application/json; charset=utf-8",
)
IAM ポリシー(例)
Lambda の実行ロールに、Athena・S3(本手順用に用意したバケット内のログとクエリ結果)・DynamoDB・Glue(メタデータ)・CloudWatch Logs へのアクセスを付与します。ARN のリージョン・アカウント ID・バケット名・Lambda 名は自環境に合わせて置換してください。
クリックで展開
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
"Resource": "arn:aws:logs:[リージョン]:[アカウントID]:log-group:/aws/lambda/[Lambda関数名]:*"
},
{
"Sid": "AthenaQueryExecution",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": "arn:aws:athena:[リージョン]:[アカウントID]:workgroup/primary"
},
{
"Sid": "S3LogAndResultAccess",
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::[バケット名]",
"arn:aws:s3:::[バケット名]/*"
]
},
{
"Sid": "DynamoDBReadAccess",
"Effect": "Allow",
"Action": ["dynamodb:Scan", "dynamodb:GetItem", "dynamodb:Query"],
"Resource": "arn:aws:dynamodb:[リージョン]:[アカウントID]:table/CommunicationWhitelist"
},
{
"Sid": "GlueMetadataAccess",
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetPartitions",
"glue:GetDatabase",
"glue:GetDatabases"
],
"Resource": [
"arn:aws:glue:[リージョン]:[アカウントID]:catalog",
"arn:aws:glue:[リージョン]:[アカウントID]:database/monitoring_db",
"arn:aws:glue:[リージョン]:[アカウントID]:table/monitoring_db/*"
]
}
]
}
Lambda のタイムアウトとトリガー
-
タイムアウト: デフォルト3秒ではAthenaの実行が完了できません。[設定タブ]の[一般設定]から 1分以上に延長
-
トリガー: [設定タブ]の[トリガー]から[トリガーを追加]をクリックし以下を設定
- ソース: EventBridge(CloudWatch Events)
- Rule: Create a new rule
- Rule name:
Every-Hour-Checkなど任意の値 - Rule type: Schedule expression
- Schedule expression:
rate(1 hour)
いざ、Lambda実行!
初回はテストをデフォルトの内容で作成し、テスト実行(テストボタンやinvokeボタンをクリック)を行いどの程度ヒットするかをみます。百件以上ヒットする場合がありますが正常です(環境によっては最初から数十件程度に収まっていることもあります)。ここでエラーが出たり1件も検出していない場合は、設定が間違っているのでエラーの内容やSQLの編集箇所を見直してください。ログ記録開始直後は DNS キャッシュの影響で名前解決とFlowの対応が取りづらいことがあります。

4. 初期チューニング手順:大量アラートを制するコツ
構築直後は、OSのアップデートやAWS内部の通信など、大量のアラートが出る場合があります。ここを乗り越えるのが最大のポイントです。ここで手を抜くと運用が地獄になります。数時間気合いを入れて頑張りましょう。
すべてがSlackに飛ぶと通知が溢れるため、まず 安全なドメイン をDynamoDBに登録し、通知をほぼゼロまで下げます。以下のあたりが除外追加の観点になります。
-
クリーンな初期状態を確認していく:
構築直後の正常系で出ている通信は、基本的に大丈夫なはずなので、とにかく用途が分かる範囲(例:[バケット名].s3.amazonaws.com,*.ubuntu.comなど)から順にホワイトリストに登録していきます。
また、内部通信や逆引きなどはSQLでreverse-dns-lookup.arpaや*.compute.internalに置き換えているのでこれらはホワイトリストに追加して下さい。 -
重複しているケースも多い:
アラートが数百件あっても、特定のドメインを1つホワイトリストに入れるだけで、関連するIP通信が一気に減ることがあります。 -
100点を目指さずに80点を目指す:
クラウドのIP帯は広大で100%ドメインだけでは縛れない場合があります。例えば東京リージョンのIPを主軸にした攻撃の検知は諦める、などで 80 点を狙い、運用が止まらないことを優先します。IPアドレス帯を除外しても名前解決が走る攻撃の場合は検知可能です。
特に、「正確な判断ができない」というもので、不正な通信ではない(=無視しても実害がない)と言えるものは、ホワイトリストに入れましょう。残しておくと、「通知されるが常に無視するアラート」として運用が形骸化し破綻する原因になります。
弊社では、複数の環境を運用しているため、外部へのイレギュラーな通信がない環境をメインの観測ターゲットとして基本的に全ての通信を監視できる状態にし、必ずその環境を通してからリリースできる状態を作っています。また、その他環境については運用上ノイズになるIPレンジを除外した上で可能な限り監視できる範囲を広く取りつつほぼノイズ0の状態を実現しています。
特定のIPレンジの除外方法
SQL内の filtered_traffic の以下の3行で対応できるようにしてあります。特定のIPアドレス帯での検知が多すぎて一件ずつ精査できないような状態の時に利用してください。
(regexp_like(dstaddr, '^(x\\.x\\.x\\.x)')) as is_known_service,
(regexp_like(dstaddr, '^(x\\.x\\.x\\.x)')) as is_known_service_for_filtered_target,
(srcaddr LIKE '10.0.xx.%') as is_filter_target
システム全体として、特定のIPアドレス帯(例として 198.51.100.0/24 を使用)について、除外したい場合は以下のように記載します。regexp_likeで指定している箇所は正規表現なので複数のパターンを「|」で繋いで列挙可能です。
(regexp_like(dstaddr, '^(198\\.51\\.100\\.)')) as is_known_service,
また、例えば、AWSの特定のリージョン・サービスのIPアドレス帯(例として 198.51.100.0/24 を使用)について、特定のサーバ群(例として10.0.1.0/24を使用)を除外したい場合は以下のように記載します。
(regexp_like(dstaddr, '^(198\\.51\\.100\\.)')) as is_known_service_for_filtered_target,
(srcaddr LIKE '10.0.1.%') as is_filter_target
特定の送信元(サーバ)の除外方法
_SQL_OUTBOUND内の pure_source と末尾、 _SQL_DNS_FAILUREの normalized_dns の以下の行で対応できるようにしてあります。特定のIPアドレスをsrcaddrとして指定することで通知から除外されます。該当のIPアドレスの全てが除外されるので、同じ構成のサーバが存在しているなど問題ないことを確認しながら設定してください。
-- 特定のサーバを除外する場合ここに書く
-- AND srcaddr <> '10.0.0.1'
-- 特定のサーバを除外する場合ここに書く
-- AND dns_srcaddr <> '10.0.0.1'
DynamoDBコンソールからのホワイトリスト登録方法
- DynamoDBコンソールへアクセスし、左メニュー [項目の探索] 内の
CommunicationWhitelistを選択し、[項目の作成] をクリック -
target_valueにドメイン(例:www.example.com)、typeにDOMAIN(ソートキー利用時)を指定して作成
- 保存後、Lambdaを再テストし found_count が減ることを確認
CloudShell からのホワイトリスト一括登録方法
一括登録はCloudShellから batch-write-item する方が楽な場合が多いです。行数が多いときはGemini等のLLMにコマンド案を作らせるとよいでしょう。以下にプロンプト例を示します。できる限り機械的に追加可能なものを抽出するため、例えば対象のサービスのポータルサイトが別ドメインになっている場合にNGとなるなど、多少厳しめに判定される可能性はあります。
プロンプト例
以下のドメインについて、C&Cサーバ等の悪用リスクを確認してください。
【重要ルール】
1. 対象のサービス名から、検索等を用いて「誰もが認める正当な公式サイトURL」を特定し、まずそれを提示してください。
2. 調査対象ドメインが、その公式サイトURLとドメイン部分(TLD含む)まで完全に一致するか確認してください。
3. もし「ハイフンの挿入」「1文字違い(typo)」「TLDの違い(.jpと.com等)」があれば、攻撃用の模倣ドメインとみなし、即座に「NG」と判定してください。
4. 推測は禁止します。公式な文書(マニュアルやリリース)でそのドメインの利用が明記されていない限り、信頼しないでください。
5. 攻撃者が攻撃用のコンテンツを置いたり、情報の持ち出しに利用できる場合はNGとして下さい。
6. 誤った判断をした場合、マルウェアの侵入を気付くことができず重大な被害につながるため最大限厳格に判定して下さい。
検証の結果、問題ないドメインについてのみ以下のCloudShellの書式に沿って登録用のコマンドを作成してください。CloudShellへの登録は、サブドメインを省略せずに記載して下さい。
test.example.com
www.google.com
www.examp1e.com
www.aeye-sec.jp
CloudShellコマンドサンプル
aws dynamodb batch-write-item --request-items '{
"CommunicationWhitelist": [
{"PutRequest": {"Item": {"target_value": {"S": "example.com"}, "type": {"S": "DOMAIN"}, "status": {"S": "Allowed"}}}},
{"PutRequest": {"Item": {"target_value": {"S": "192.0.2.1"}, "type": {"S": "IP"}, "status": {"S": "Allowed"}}}}
]
}'
ホワイトリストを登録しながら found_count が減るかを確認し、見つかったものを順次登録して行ってください。この時に、 一定リスクがあるがリスクを許容して除外した場合、ドキュメントに残す などをして後々リスクを把握できる状態にしましょう。
5. 動作確認
この時点で一通りシステムが完成し、アラートが出なくなったはずです。ここで、 SQLやホワイトリストの登録を間違えて全て許可 という状態になってしまっていないかを確認します。もし、アラートが上がらないシステムになってしまっていたらここまでの作業が全て無駄になってしまいます。
意図的にアラートを鳴らす(正検知テスト)
ホワイトリストに絶対に入れないような、適当な名前でテストします。以下のコマンドを監視対象のサーバで実行し、数分後にLambdaを手動で実行します。
DNS失敗パターン:
nslookup alert-test.example.com
この結果、DNS_RESOLUTION_FAILUREがドメイン名とともにSlackに通知されることを確認します。
外部通信パターン:
ping 192.0.2.1
この結果、CRITICAL_UNKNOWN_OUTBOUNDがIPアドレスとともにSlackに通知されることを確認します。

ホワイトリストの有効性を確認(非検知テスト)
先ほど正検知テストで利用した、alert-test.example.com、192.0.2.1をDynamoDBに登録します。そして、Lambdaを手動で実行し、 found_count が0になることを確認します。
6. 運用手順:日々の作業は短時間で
お疲れ様でした!これで通信の監視システムが完成です。基本的に運用負荷をかけない前提で構築しているので、ほとんどの仕事はこれでおしまいです。
- Slack 通知の確認: 通知が無ければ「直近スケジュールでは未承認通信なし」と判断。不安な場合は、「特定の時間のみ0件でも通知する」という仕組みを入れるといいかもしれません。いつの間にか止まっていた、ということを防げます。
- 新規通信の判定: 通知に載ったドメインまたはIPが、新規SaaS/ツール/インフラかを確認。
- ホワイトリスト登録: 正当と判断したらDynamoDBに追加。以降、除外されたドメイン/IP は検知リストに上がらず Slack にも出なくなります。
6.1 未承認外向き通信の調査・運用マニュアル
Slackに未承認の通信のアラートが届いたときの、調査からホワイトリスト登録までの標準手順です。
一次調査:送信元と宛先の特定
Athenaで、VPCからの通信とDNSの対応を切り分けます(IP・日付は通知内容に合わせて置換)。
ドメイン名を検索するときは、末尾にドットがつくので、 www.example.com. のように指定するか、likeを利用して %example.com% のように指定してみてください。
SELECT DISTINCT
srcaddr AS internal_ip,
dstaddr AS external_ip,
dstport,
interface_id
FROM vpc_flow_logs
WHERE dstaddr = '通知されたIP'
AND action = 'ACCEPT';
SELECT
query_timestamp,
query_name,
transform(answers, x -> x.Rdata) AS resolved_ips
FROM route53_query_logs
WHERE date IN ('20xx/xx/xx', '20xx/xx/xx')
AND query_name LIKE '%example.com%'
ORDER BY query_timestamp DESC
LIMIT 100;
SELECT
query_timestamp,
query_name,
answers
FROM route53_query_logs
CROSS JOIN UNNEST(answers) AS t(ans)
WHERE date IN ('20xx/xx/xx', '20xx/xx/xx')
AND ans.Rdata = '調査したいIPアドレス'
ORDER BY query_timestamp DESC
LIMIT 100;
二次調査:宛先IPの正体特定(重要)
ドメイン名が紐付いている場合は問題ないですが、IPしかわからないケースでの宛先の正体の確認方法です。
優先順位の目安:
-
SSL/TLS証明書(決定打になりやすい)
curl -vI https://[調査対象IP] --connect-timeout 5
または
openssl s_client -connect [調査対象IP]:443 -servername [調査対象IP] </dev/null 2>/dev/null | openssl x509 -text | grep "Subject: CN"
コマンドでなくとも、ブラウザでアクセスしエラー画面でドメイン名を確認するという手段も取れます。 -
逆引き(rDNS)と WHOIS
nslookup [調査対象IP]、whois [調査対象IP]。クラウド事業者名だけでは不十分なことが多いので証明書での確認を優先します。
判定とアクション
| 判定結果 | アクション |
|---|---|
| 業務に必要なサービス |
CommunicationWhitelist に登録 |
| OS/AWS標準の通信(NTPやOSリポジトリ等)で安全と判断 | 同様に登録可 |
| 正体不明・不要 | 登録せず静観。頻発する場合は送信元のプロセス調査 |
ホワイトリスト登録ルール(再掲)
-
target_value: IP(例:192.0.2.1)またはドメイン(例:example.com) -
type:DOMAIN/IP、または運用メモとしてサービス名を別属性に持たせるなど、チームで統一した文言を記載
CloudShellでの一括登録例(項目が多いときは Gemini等のLLM に生成させると楽です)
aws dynamodb batch-write-item --request-items '{
"CommunicationWhitelist": [
{"PutRequest": {"Item": {"target_value": {"S": "example.com"}, "type": {"S": "DOMAIN"}, "status": {"S": "Allowed"}}}},
{"PutRequest": {"Item": {"target_value": {"S": "192.0.2.1"}, "type": {"S": "IP"}, "status": {"S": "Allowed"}}}}
]
}'
7. まとめ
本記事では、AWSの標準機能を組み合わせ、コストを最小限に抑えつつも実用的な「外向き通信監視システム」を構築する方法を解説しました。
最後に、このシステムを成功させるための3つのポイントを振り返ります。
-
「予防」の限界を「検知」で補う
サプライチェーン攻撃を100%防ぐことは困難です。だからこそ、「侵入された後の動き(C&C通信)」をいち早くキャッチする仕組みを持つことが、実戦的な防御の要となります。 -
「80点主義」で運用を殺さない
全ての通信を細かく把握しようとすれば、運用は必ず破綻します。最初は「明らかに怪しいもの」から手をつけ、時間をかけて育てていく。この「ゆるさ」こそが、継続的な監視を人的コストを抑えつつ継続するための秘訣です。 -
クラウドネイティブな機能を使い倒す
S3、Athena、DynamoDBなど、これらを組み合わせることで、かつては数百万規模の投資が必要だった監視基盤が、今や「1日の工数」と「月数百円程度のコスト」で手に入ります。
最後に
セキュリティ対策は、一度導入して終わりではありません。今回構築したシステムが、大事なインフラを無理なく、しかし着実に守り続けるための頼れる土台となれば幸いです。
8. Q&A
Q. この仕組みで本当にサプライチェーン攻撃を検知できるのですか?
A. 独自のC&Cサーバ(攻撃サーバ)へアクセスするタイプの攻撃に対して有効です。例えば、Trivyへの攻撃は scan[.]aquasecurtiy[.]org 、 LiteLLMへの攻撃は models[.]litellm[.]cloud 、axiosへの攻撃は sfrclak[.]com などのドメインが通知されるはずです。その時に、「それっぽいからOK」という判断にならないように普段から通知をクリーンな状態にし、通知が来た際にLLMなどを駆使しつつ(LLMのハルシネーションに気をつけつつ)精査できる状態にしておきましょう。
特に、今まで出ていなかったのに何故?と思える状態を維持しましょう。
Q. Route 53 Resolverを通らず8.8.8.8などで直接名前解決されたらすり抜けませんか?
A. いいえ。Resolverログに載らない通信は、 「名前解決のない外向きIP」 として残り、不審系としてアラートが上がります。
Q. どうしても数件/日くらいはアラートが上がってしまいます。
A. ノイズと見逃しのバランスをチューニングし続ける前提で捉えるのが現実的です。基本的に1日の運用にかかる時間が10分を超え始めたらホワイトリストの範囲を見直しましょう。
Q. 宛先ドメインを限定できないほど、日々新しいドメインへアクセスします。
A. 監視対象の見直しが必要です。任意のドメインが必須のサーバは監視対象から外す、監視対象をアクセスシナリオを限定した環境に変更する(テスト環境など)、インストール済みモジュールの棚卸しなど、別レイヤーでリスクを下げる組み合わせを検討しましょう。
Q. 特定クラウド/CDNへの通信が正常運用で必須だが、除外するとリスクが残ります。
A. どうしても外せない通信は 諦める判断 もあり得ます。そのうえでリージョンやサービスタイプを最小化できるなら試み、ノイズを減らさないと見逃しが増えることのトレードオフを意識し、この仕組みではクリーンな状態を維持することを優先しましょう。同じ構成でアプリケーションを動作させない環境(インストール作業のみの環境)でも監視が有効な範囲はあります。1か0かではなく、現実的にできるかつ効果のあるラインを模索し、アラート上がりすぎて運用が破綻することだけは避けましょう。
Discussion