インフラエンジニアが生成AIを活用してログ解析してみた
はじめに
@mujyun_furu こと古林 信吾(ふるばやし しんご)です。
今回は、2024/5/29 の「OpsJAWS Meetup29 ロギング、ログ管理」に登壇した際お話した、「インフラエンジニアが生成AIを活用してログ解析してみた」の詳細内容です。
発端
ある日のこと
PM「お客様から踏み台サーバ(EC2)の監査対応依頼がありました。
リモートからログインしてきた各ユーザに対して、
ログインしてきた時間がおかしくないか
ログインしてきた場所がおかしくないか
を確認します。
踏み台サーバの audit ログを S3 に格納しているので、解析方法をまとめてください。」
🦔「S3 に格納されているログの解析なら Amazon Athena で出来ると思うので検証します。」
既存のログ格納方法
まず、既存のログ格納方法を確認し、検証環境を作成します。
EC2 から S3 に直接ではなく、CloudWatch Logs を中継して、Amazon Data Firehose 経由で S3 に格納していました。
既存環境の構成を検証環境に作成しました。
s3://<auditlog bucket>/<prefix>/yyyy/MM/dd/HH/<logfile>
の形式でログ出力されました。
Glue Crawler による Data Catalog 化
Athena でテーブル作成する際の参考のために Glue Crawler で Data Catalog 化してみました
Glue Crawler で作成されたスキーマ
[
{
"Name": "messagetype",
"Type": "string"
},
{
"Name": "owner",
"Type": "string"
},
{
"Name": "loggroup",
"Type": "string"
},
{
"Name": "logstream",
"Type": "string"
},
{
"Name": "subscriptionfilters",
"Type": "array<string>"
},
{
"Name": "logevents",
"Type": "array<structid:string,timestamp:bigint,message:string>"
},
{
"Name": "partition_0",
"Type": "string",
"PartitionKey": "Partition (0)"
},
{
"Name": "partition_1",
"Type": "string",
"PartitionKey": "Partition (1)"
},
{
"Name": "partition_2",
"Type": "string",
"PartitionKey": "Partition (2)"
},
{
"Name": "partition_3",
"Type": "string",
"PartitionKey": "Partition (3)"
}
]
Glue Crawler で Data Catalog 化して作成されたテーブルのデータを Athena でプレビュー実施します。
JSON の形式が正しくないとエラーになりました。
エラー原因
エラー原因を調べてみました。
Cloudwatch Logs から Firehose サブスクリプションフィルター使った場合、データ無変換だと JSON がネスト(入れ子)になるそうです。
そして、JSON がネストしていると Athena で解析するのに適していないそうです。
AWS Solutions Architect ブログ 「AWS Black Belt Online Seminar「Amazon Athena」の資料およびQA公開」
Q8. Amazon Kinesis Firehose を利用してCloudWatch LogsをS3に転送してそれをAthena で分析したいのですが、Kinesis Firehoseを通すと{json}{json}のように1行に複数のJSONオブジェクトが保存されるよです。このデータを効率的にAthenaで分析するにはどういった方法がありますか??
A8. Amazon Kinesis FirehoseにはData TransformationをAWS Lambdaで行う機能がございますので,こちらを使って所望の形式に変換すると良いです.
http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
Amazon CloudWatch Logsからのサブスクリプションであれば、Lambda functionに渡ってくる各レコードを、Base64デコード=>gunzip=>logEventsを取り出す=>中のmessageに改行文字を追加して結合=>Base64エンコードして返り値として渡せば、FirehoseがAmazon S3にmessageを改行付きで配置してくれるので、スキーマを定義してAthenaからクエリ可能です.
後でどう活用するか考えずに、ただログ保存しただけだったことが判明しました。
対処
これまでのログ保存方法はともかくとして、今後のために Firehose の設定で Lamba によるデータ変換する方法を確認します。
しかしながら、非推奨になっていました。
Amazon Data Firehose Data Transformation
・Process CloudWatch logs sent to Firehose
This blueprint is deprecated. For information on processing CloudWatch Logs sent to Firehose, see Writing to Firehose Using CloudWatch Logs.
※このブループリント(設計図)は非推奨です。
何故、非推奨になっているか調べたところ、2024/2/27 のアップデートで Firehose 自体にメッセージ抽出機能が追加されていました。
Amazon Data Firehose に解凍された CloudWatch Logs のメッセージ抽出機能を追加
Firehose の設定で簡単に選ぶことができ、解凍後の再圧縮も出来ました。
これで、正しい形式で S3 にログ保存出来るようになりました。
改めて Glue Crawler で Data Catalog 化してみたところ、パーティションしかない状態になりました。
これは、自力で正規表現使って Athena でテーブル作成するしかないかな、、、
プログラミングもコーディングもわからないし、SQL 苦手だし、正規表現わからないけど、生成AI があればやれるはず!!!
無料枠で使用可能な3つの生成AI を活用してログ解析してみます。
- Copilot in Bing (Microsoft)
- Gemini (Google)
- Claude 3 Sonnet (Anthropic)
生成AI の活用(Athena)
Athena でのログ解析については、以下の記事で詳細記述しています。
※補足
- TBLPROPERTIES('skip.header.line.count'='1')
先頭行をスキップする設定(CSVなどで先頭行に列名入っているファイルなどで有用な設定)は、今回不要なので削除 - STORED AS INPUTFORMAT および OUTPUTFORMAT は入力フォーマットと出力フォーマットを指定するものですが、今回不要なので削除
生成AI の活用(Glue)
データの格納方法を修正することで、Athena でログ解析できるようになりましたが、これまで蓄積された既存のデータも活用したいです。
既存のデータを Glue で ETL 処理する方法を生成AI を活用して探ります。
まず、Glue Crawler した Data Catalog があるので、これを入力にした Glue Job を作成するため、生成AI にひたすら尋ねます。
生成AI への質問内容
AWSを使ったデータアナリストとして教えてください。
CloudWatch Logs からサブスクリプションフィルターで Amazon Data Firehose を使い、Amazon S3 にデータを格納しています。
データはFirehose では、Lambda や解凍処理をしていないため、CloudWatch Logs の不要な情報が含まれています。
また、JSON がネストしているため、そのままでは Amazon Athena で分析できません。
S3 に格納されたデータを AWS Glue の Glue Crawlers でカタログ化しました。
この後、Glue Job で、Athena で解析しやすい形に変換して、別の S3 に格納したいです。
Glue 4.0 Spark 3.3を使います。
Python Script ではなく、Spark Script を使います。
入力するGlue カタログは以下です。
input_df = glueContext.create_dynamic_frame.from_catalog(
database="audit_database",
table_name="audit_logs"
)
出力先の S3 は以下です。
s3://backetname2/glue/
入力1レコードに対して、テキスト形式の1ファイルで出力したいです。
Glue Job のスクリプトを作成してください。
また、Glue カタログのパーティションに合わせてフォルダを作成し、
s3://backetname2/glue/
そのフォルダに gzip で圧縮して格納したい。
返ってきた内容を Glue Job で試しては、エラー内容を生成AI に追加質問しを繰り返し数日間試行錯誤するものの、求める形の出力ができませんでした。
〇できたこと
- CloudWatch Logs の不要部分を削除し、message 部分だけ抽出
- テキスト形式の gzip 圧縮ファイルとして出力
×できなかったこと
- 元データのパーティション(フォルダ構成)を維持
yyyy/MM/dd/HH/ の数字のみで構成されたフォルダ構成をパーティションキーとして処理する際に型の問題などで上手くいきません。
最終的に Claude 3 Sonnet に以下金言をいただきました。
「申し訳ありません。
パーティションキーの型を正しく処理するのが難しいようです。
代替案として、パーティション情報を完全に削除し、単一のテキストファイルに出力する方法を提案します。」
パーティションを考慮しない形であれば、以下の Glue Job で ETL 可能でした。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import posexplode, col
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Glue Crawler で作成した Data Catalog を入力に指定
input_datasource = glueContext.create_dynamic_frame.from_catalog(
database="audit_database",
table_name="audit_logs"
)
# データソースを DataFrame に変換
input_df = input_datasource.toDF()
# message 部分を展開
messages_df = input_df.selectExpr("posexplode(logevents)")
messages_df = messages_df.selectExpr("col.message as message")
output_path = "s3://backetname2/glue/"
# パーティション無しでテキスト形式の圧縮ファイルとして出力
messages_df.repartition(1).write.mode("overwrite").option("compression", "gzip").text(output_path)
job.commit()
ただし、出力結果はパーティションを考慮しない塊になっているので、何年何月何日のデータのみパーティション指定といったことができません。
パーティションにこだわらなければ ETL できるとはいえ、これまでに蓄積された本番データは膨大になっています。
膨大な本番データをすべて Glue Crawler するのも、その後の ETL にも金額が気になります。
そこで、Glue Crawler による Data Catalog を使わず、S3 から直接 ETL して、パーティションを維持できないか生成AI を活用して探ります。
しかしながら、生成AI とひたすらやり取りしてもエラーから抜け出せません。
Data Catalog を使ったときと同様に、yyyy/MM/dd/HH/ の数字のみで構成されたフォルダ構成がネックになりました。
切り分けのため、入力を s3://backetname/prefix から、s3://backetname/prefix/yyyy/MM/dd/HH としてみたところ、Glue Job に成功しました。
成功しましたが、出力が何もありませんでした。
成功するのに出力がないということは、どこかがおかしいので、生成AI にデバッグ方法を質問します。
デバッグして、どこまでデータがあって、どこからデータが無くなるのか調べたところ、以下がわかります。
デバッグしたところ、入力パス配下のファイルは、gzip で圧縮されたデータではあるが、拡張子の .gz が付いていないと判明しました。
file_pattern = "*" に変更したところ、圧縮されていることが正しく判断されず、value の中身が文字化けします。
生成AI に確認すると以下の返答でした。
「ありがとうございます。
この情報は非常に助かります。
入力ファイルがgzip圧縮されているが、拡張子が.gz
ではない場合は、Sparkが適切に処理できないようです。」
テストとして、入力パスに存在するファイルのファイル名を変更して .gz 拡張子を付けたところ、Glue Job で正しく出力がありました。
本番データは .gz 拡張子が付いていない大量のデータであるため、ファイル名を変更せずに処理できないか生成AI に尋ねます。
そうすると、色々な関数を案内してくれるのですが、ことごとくエラーになります。
デバッグとエラー内容に対する修正を延々繰り返すも解決しないため、心折れました。
Glue での挑戦はいったん終わりにします!
※補足
検証を実施していたタイミングではまだでしたが、Amazon Q data integration in AWS Glue が 2024/4/30 にGAされました。
Amazon Q data integration in AWS Glue であれば、Amazon Q がきっと解決してくれるはずですが、未検証です。
生成AI の活用(S3 Select)
既存のデータを Glue で ETL するのには心折れたたけど、S3 Select なら中身を見れることが確認出来ていました。
そのため、S3 Select を AWS CLI でスクリプト化してぶん回せばいいのではと思いつき、生成AI に SQL を作ってもらいます。
生成AI への質問内容
AWSを使ったデータアナリストとして教えてください。
CloudWatch Logs からサブスクリプションフィルターで Amazon Data Firehose を使い、Amazon S3 にデータを格納しています。
データはFirehose では、Lambda や解凍処理をしていないため、CloudWatch Logs の不要な情報が含まれています。
S3 select を使い、データを抽出したいです。
GZIPで圧縮され、JSON形式で、JSONコンテンツタイプはドキュメントを選び、以下のようにクエリを実行することで必要な行は取得できます。
select * from S3Object[].logEvents[].message s where s like '%USER_START%' and s like '%sshd%' and s like '%session_open%';
出力結果は以下のようになります。
"type=USER_START msg=audit(1713174820.352:3261): pid=3237 uid=0 auid=1000 ses=1 msg='op=PAM:session_open grantors=pam_selinux,pam_loginuid,pam_selinux,pam_namespace,pam_keyinit,pam_keyinit,pam_limits,pam_systemd,pam_unix,pam_lastlog acct=""ec2-user"" exe=""/usr/sbin/sshd"" hostname=XX.XX.XX.XX addr=XX.XX.XX.XX terminal=ssh res=success'"
この出力結果から、以下のデータのみ、CSV形式で , 区切りで取得したいです。
msg=audit( の後ろは UNIXタイムになっているので、JSTで時間取得
acct= の後ろからログインユーザ名取得
addr= の後ろから接続元IPアドレス取得
どのように S3 Select のクエリを修正すればいいか教えてください。
生成AI からはそれっぽい SQL が提案されますが、S3 Select では、ことごとくエラーになります。
最終的に、生成AI から以下の回答があります。
「なるほど、S3 Select のクエリでは予期しない問題が発生しているようですね。
S3 Select には制限があり、文字列操作関数が限られているため、JSON データから特定の値を抽出するのが難しい場合があります。
そこで、別のアプローチを試してみましょう。
まず、S3 Select ではなく AWS Lambda または AWS Glue でデータを加工し、必要な情報のみを含む CSV ファイルを生成することをお勧めします。
Lambda関数またはGlueジョブでは、Python、Java、Node.js などの一般的なプログラミング言語を使用できるため、JSON データから任意の値を抽出し、CSV 形式で出力することが可能です。」
生成AI から、他の方法考えろと言われました。
Glue は心折れたところだけど、1つの方法に固執するのではなく、複数の方法組み合わせればいいんだと気づきます。
S3 Select で必要な行は取得できるし、AWS CLI で処理を回すので、データの整形は awk コマンドでやればいいだけだったんです。
#!/bin/bash
bucket=$1
prefix=$2
outpath=$3
echo stratdate $(date "+%Y/%m/%d/%H:%M:%S")
if [ ! -d "$outpath" ]; then
echo "Creating output directory: $outpath"
mkdir -p "$outpath"
fi
aws s3api list-objects-v2 --bucket $bucket --prefix $prefix \
--query 'Contents[].[Key]' --output text 2>/dev/null |\
xargs -n1 bash -c "aws s3api select-object-content \
--bucket=\"$bucket\" --key=\"\$0\" \
--input-serialization '{\"JSON\": {\"Type\": \"DOCUMENT\"}, \"CompressionType\": \"GZIP\"}' \
--output-serialization '{\"CSV\":{}}' \
--expression \"SELECT * FROM S3Object[*].logEvents[*].message s WHERE s LIKE '%USER_START%' AND s LIKE '%sshd%' AND s LIKE '%session_open%'\" \
--expression-type SQL \"$outpath/\$(basename \$0).select\" || true"
cat $outpath/*.select > $outpath/login.csv
echo select file num $(ls -1 $outpath/*.select | wc -l)
rm $outpath/*.select
cat $outpath/login.csv | awk -F' ' -v OFS=',' '{
# Extract unixtimestamp
match($0, /msg=audit\(([^)]+)\)/, timestamp)
unixtimestamp = timestamp[1]
# Convert to JST
jsttimestamp = strftime("%Y/%m/%d/%H:%M:%S", unixtimestamp + 32400)
# Extract username
match($0, /acct=""([^"]+)"/, user)
username = user[1]
# Extract IPaddress
match($0, /addr=([^ ]+)/, ip)
IPaddress = ip[1]
# Print CSV format
print jsttimestamp, username, IPaddress
}' > $outpath/kansa.csv
echo enddate $(date "+%Y/%m/%d/%H:%M:%S")
./s3select_awk.sh s3://<auditlog bucket> <prefix> /tmp
シェルスクリプト実行時の入力として バケット名($1)、プレフィックス(フォルダ)名($2)、出力先($3)を指定します。
処理の中で、まず aws s3api list-objects-v2 を使い、指定したバケットのフォルダ配下をリスト化します。
リストアップした内容について、S3 Select で 以下の SQL により、取得したい行のみ抽出します。
"SELECT * FROM S3Object[].logEvents[].message s WHERE s LIKE '%USER_START%' AND s LIKE '%sshd%' AND s LIKE '%session_open%'"
抽出結果を1つのファイルにまとめて、awk コマンドにより、UNIXタイムを抽出し、 strftime 関数で yyyy/MM/dd/HH:mm:ss の形式にしつつ、+9時間して JST にします。
ユーザ名と接続元IPアドレスも抽出し、新しいファイルに結果をまとめます。
プレフィクス(フォルダ)名を分けて指定し、出力先も変えることで、抽出元ごとに出力ファイルを作ることができます。
ファイル数に応じて処理時間がかかりますが、S3 Select のスキャンデータ量と戻りデータ量だけの課金で済みます。
新規のデータは Athena で使いやすいようにしておき、既存データは S3 Select でまとめておけば、今後の監査にも活用しやすくなります。
取得したデータは CSV になっているので、BI 化しやすいです。
まとめ
ログ保管する際に、
- 保管されたログがどういう形式になっているか
- その形式は後で使うときに適しているか
検証して、使いやすい形式で保管することをお勧めします。
最低限不要な情報(CloudWatch Logs のヘッダとか)は含まないよう考慮しておきましょう。
ETL やデータの抽出はデータアナリストだけでなく、インフラエンジニアもできるようになるといいです。
その際に生成AI が助けてくれます。
検証時に利用した生成AI についての個人的感想
- Copilot in Bing (Microsoft)
検証時は入力文字数が少な目で、コードの改修等には少々不便でした。
しかし、その後入力文字数の問題は解消したことを確認済です。 - Gemini (Google)
適度なバランスで回答してくれるので普段使いに丁度いいです。
しかしながら、特定の機能の制限などには疎いので、ハルシネーションに注意が必要です。 - Claude 3 Sonnet (Anthropic)
日本語でのやり取りでは、3つの生成AIの中で一番回答精度が高かったです。
ただし、無料枠の使用制限が厳しいので、繰り返し質問し続けることができませんでした。
他の生成AI と組み合わせるとイイ感じでした。
後、代替案の提案スゴク助かりました。
Discussion