Azure Functions で BLOB ストレージに格納したログファイルを Sentinel に取り込む
はじめに
前の記事で BLOB ストレージに格納したテキスト ファイルを Azure Functions に取り込む方法を確認しました。こちらを用いて、当初の目的である BLOB ストレージに格納したログ ファイルを Sentinel に取り込みます。参考となるのはこちらです。下記ドキュメントの流れに基づいて、変更ポイントを説明していこうと思います。
設定
ワークスペースの詳細を収集する
これはドキュメントの通り取得しておきます。
Microsoft Entra アプリケーションを作成する
今回、Azure Functsions のマネージド ID を使用するため、設定不要です。
データ収集エンドポイントを作成する
これもドキュメントの通り作成します。
Log Analytics ワークスペースに新しいテーブルを作成する
ログ格納先のカスタムテーブルを作成します。Azure ポータルから作成できるのですが、不要なデータ収集ルールなどをセットで作成する必要があるため、REST API からカスタムテーブルを作成します。はじめに記載したドキュメントとともに以下も参考にします。カラムはとりあえず TimeGenerated
と RawData
が含まれていれば問題ありません。
データ収集ルールを作成する
Log Ingestion API 用のデータ収集ルールは Azure ポータルから作成できないため、ドキュメントを参考に ARM テンプレートでデプロイします。デプロイ後、JSON ビューから immutableId と streamDeclarations の名前をメモしておきます。
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"dataCollectionRuleName": {
"type": "String"
},
"location": {
"type": "String"
},
"workspaceResourceId": {
"type": "String"
},
"endpointResourceId": {
"type": "String"
},
"workspaceName": {
"type": "String"
},
"customTableName": {
"type": "String"
}
},
"variables": {},
"resources": [
{
"type": "Microsoft.Insights/dataCollectionRules",
"apiVersion": "2022-06-01",
"name": "[parameters('dataCollectionRuleName')]",
"location": "[parameters('location')]",
"properties": {
"dataCollectionEndpointId": "[parameters('endpointResourceId')]",
"streamDeclarations": {
"Custom-TableForLogIngesionAPI": {
"columns": [
{
"name": "TimeGenerated",
"type": "datetime"
},
{
"name": "RawData",
"type": "string"
}
]
}
},
"dataSources": {},
"destinations": {
"logAnalytics": [
{
"workspaceResourceId": "[parameters('workspaceResourceId')]",
"name": "[parameters('workspaceName')]"
}
]
},
"dataFlows": [
{
"streams": [
"Custom-TableForLogIngesionAPI"
],
"destinations": [
"[parameters('workspaceName')]"
],
"transformKql": "source",
"outputStream": "[concat('Custom-', parameters('customTableName'))]"
}
]
}
}
]
}
DCR にアクセス許可を割り当てる
今回マネージド ID を使用するため、以下の通りマネージド ID を有効化し、[監視メトリック発行者] のアクセス許可を割り当てます。
Azure Functions 設定
今回、Log Ingestion API をキックするためにトークンが必要になるので、マネージド ID からトークンを取得するために Az PowerShell の Az.Accounts モジュールを使用します。以下記事の 「3. Azure Functions に PowerShell モジュールを追加」を実施します。
※ 今回必要となるのは、Az のみです。
次に関数を作成していきます。以下を参考に Azure Storage Blob Trigger の関数を作成します。
設定するスクリプトサンプルは以下になります。$dceEndpoint
, $dcrImmutableId
, $streamName
を自身の環境に合わせて指定します。
# Input bindings are passed in via param block.
param([byte[]] $InputBlob, $TriggerMetadata)
# データ送信に必要な情報
$dceEndpoint = "https://dcep-xxxxxxx.japaneast-1.ingest.monitor.azure.com"
$dcrImmutableId = "dcr-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
$streamName = "Custom-TableForLogIngesionAPI"
# 一度に読み込む行数
$chunkSize = 5000
# Entra ID に接続し、monitor 用の Bearaer Token を取得
Connect-AzAccount -Identity
$token = Get-AzAccessToken -ResourceUrl "https://monitor.azure.com"
$bearerToken = $token.Token
# ヘッダと URI を準備
$headers = @{
"Authorization"="Bearer $bearerToken";
"Content-Type"="application/json"
}
$uri = "$dceEndpoint/dataCollectionRules/$dcrImmutableId/streams/$($streamName)?api-version=2021-11-01-preview"
#byte array を string へ変換
$blobContent = [System.Text.Encoding]::UTF8.GetString($inputBlob)
# 改行コード (LF) で分割
$lines = $blobContent -split "`n" | Where-Object { $_ -ne "" }
# 現在の時刻を取得
$currentTime = Get-Date ([datetime]::UtcNow) -Format O
# カウンターと chunk を初期化
$chunk = @()
$i = 0
# 各行で処理
foreach ($line in $lines) {
# ログを json に変換
$jsonObject = '{ "TimeGenerated": "' + $currentTime + '", "RawData": "' + $line + '"}'
$chunk += $jsonObject
$i++
# chunkSize もしくは最終ラインの場合
if ($i -eq 500 -or $line -eq $lines[-1]) {
# json を準備
$body = '[' + ($chunk -join ',') + ']'
# API 実行
$uploadResponse = Invoke-RestMethod -Uri $uri -Method "Post" -Body $body -Headers $headers
# カウンターと chunk をリセット
$chunk = @()
$i = 0
}
}
動作確認
対象のストレージ アカウントのコンテナにファイルを格納すると Azure Functions がトリガーされ、Log Analytics ワークスペースにログが保存されます。
現状、TimeGenerated はログ送信が処理された時間、ログはすべて RawData カラムに格納されますが、変換ルールを使用することでカラムを分割することが可能です。例えば、CSV 形式であれば以下のように Parse ができます。
source
| parse RawData with DetectionTime ','
fqdn ','
category ','
url ','
sourceIP ','
destinationIP ','
Action ','
Reason ','
| extend TimeGenerated = DetectionTime
注意点など
- Azure Functions の SKU にもよると考えられますが、処理に時間がかかります。おそらく Log Ingestion API は JSON 形式でログ送信する必要があるために、ログ 1 件ずつ JSON 変換している箇所の負荷が大きいと考えられます。可能であれば JSON フォーマットでストレージ アカウントに保存されるとよいと思います。
- API 実行失敗時のリトライなど、エラー ハンドリングを実装する必要があります。
- Log Ingestion API の制限を考慮する必要があります。
https://learn.microsoft.com/ja-jp/azure/azure-monitor/service-limits#logs-ingestion-api
Discussion