🧠
Sentinel で任意のログと脅威インテリジェンスを突合する
はじめに
Sentinel では脅威インテリジェンスを統合し、分析ルールなどに使用することが可能です。脅威インテリジェンス マッピング用の分析ルール テンプレート (TI map... の名称で始まる) が多数準備されていますが、それらをカスタマイズして、正規表現を使用して任意のログと脅威インテリジェンスをマッピングするサンプル クエリを作成します。
サンプルクエリ
let Extracted_Table = <テーブル名>
と | extend Target_Column = <カラム名>
の箇所にそれぞれチェックしたいログのテーブル名・カラム名を指定します。
指定したカラムのログから正規表現で IP アドレス (v4/v6)、メール アドレス、URL、ドメイン名を抽出し、脅威インテリジェンスと突合します。
IP アドレス
let dt_lookBack = 1h;
let ioc_lookBack = 14d;
//対象テーブルを指定
let Extracted_Table = <テーブル名>
//対象カラムを指定
| extend Target_Column = <カラム名>
| where TimeGenerated >= ago(dt_lookBack)
// IPv4 の抽出
| extend IPv4 = extract(@"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", 0, Target_Column)
// IP が存在するレコードを抽出
| where isnotempty(IPv4)
| extend ExtractedIP = IPv4
| extend Target_TimeGenerated = TimeGenerated;
let IP_Indicators = ThreatIntelIndicators
| extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))
| where isnotempty(IndicatorType) and IndicatorType == "ipv4-addr"
| extend NetworkSourceIP = toupper(ObservableValue)
| extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
| where isnotempty(NetworkSourceIP)
| where TimeGenerated >= ago(ioc_lookBack)
| extend TI_ipEntity = iff(isnotempty(NetworkSourceIP), NetworkSourceIP, NetworkSourceIP)
| extend TI_ipEntity = iff(isempty(TI_ipEntity) and isnotempty(NetworkSourceIP), NetworkSourceIP, TI_ipEntity)
| where ipv4_is_private(TI_ipEntity) == false and TI_ipEntity !startswith "fe80" and TI_ipEntity !startswith "::" and TI_ipEntity !startswith "127."
| summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id
| where IsActive == true and ValidUntil > now();
IP_Indicators
| join kind=innerunique (Extracted_Table) on $left.TI_ipEntity == $right.ExtractedIP
| where Target_TimeGenerated < ValidUntil
| summarize arg_max(Target_TimeGenerated, *) by Id
| extend ThreatType = parse_json(tostring(Data.indicator_types))[0]
| project TimeGenerated = Target_TimeGenerated, ThreatType, Confidence, ExtractedIP, Target_Column
メール アドレス
let dt_lookBack = 1h;
let ioc_lookBack = 14d;
//対象テーブルを指定
let Extracted_Table = <テーブル名>
//対象カラムを指定
| extend Target_Column = <カラム名>
| where TimeGenerated >= ago(dt_lookBack)
| extend ExtractedEmail = tolower(extract(@"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b", 0, Target_Column))
| where isnotempty(ExtractedEmail)
| extend Target_TimeGenerated = TimeGenerated;
let Mail_Indicators =ThreatIntelIndicators
//Filtering the table for Email related IOCs
| extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))
| where isnotempty(IndicatorType) and IndicatorType == "email-addr"
| extend EmailSenderAddress = ObservableValue
| extend EmailSourceDomain = substring(EmailSenderAddress, indexof(EmailSenderAddress, "@") + 1, strlen(EmailSenderAddress) - indexof(EmailSenderAddress, "@") - 1)
| extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
| extend IndicatorId = tostring(split(Id, "--")[2])
| where isnotempty(EmailSenderAddress)
| where TimeGenerated >= ago(ioc_lookBack)
| summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id
| where IsActive == true and ValidUntil > now()
| project-reorder *, TrafficLightProtocolLevel, EmailSenderAddress, EmailSourceDomain, Type;
Mail_Indicators
| join kind=innerunique (Extracted_Table) on $left.EmailSenderAddress == $right.ExtractedEmail
| where Target_TimeGenerated < ValidUntil
| summarize arg_max(Target_TimeGenerated, *) by Id
| extend ThreatType = parse_json(tostring(Data.indicator_types))[0]
| project timestamp = Target_TimeGenerated, ThreatType, Confidence, ExtractedEmail, Target_Column
URL
let dt_lookBack = 1h;
let ioc_lookBack = 14d;
//対象テーブルを指定
let Extracted_Table = <テーブル名>
//対象カラムを指定
| extend Target_Column = <カラム名>
| where TimeGenerated >= ago(dt_lookBack)
| extend ExtractedUrl = extract(@"(http[s]?://[^\s]+)", 1, Target_Column)
| where isnotempty(ExtractedUrl)
| extend Target_TimeGenerated = TimeGenerated;
let URL_Indicators = ThreatIntelIndicators
//extract key part of kv pair
| extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))
| where IndicatorType == "url"
| extend Url = ObservableValue
| extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
| where isnotempty(Url)
| where TimeGenerated >= ago(ioc_lookBack)
| extend Url = tolower(Url)
//| where Url in (DeviceNetworkEventUrls)
| summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id
| where IsActive == true and ValidUntil > now();
URL_Indicators
| join kind=innerunique (Extracted_Table) on $left.Url == $right.ExtractedUrl
| where Target_TimeGenerated < ValidUntil
| summarize arg_max(Target_TimeGenerated, *) by Id
| extend ThreatType = parse_json(tostring(Data.indicator_types))[0]
| project timestamp = Target_TimeGenerated, ThreatType, Confidence, ExtractedUrl, Target_Column
ドメイン名
let dt_lookBack = 1h;
let ioc_lookBack = 14d;
//対象テーブルを指定
let Extracted_Table = <テーブル名>
//対象カラムを指定
| extend Target_Column = <カラム名>
| where TimeGenerated >= ago(dt_lookBack)
| extend ExtractedDomain = extract(@"([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})", 1, Target_Column)
| where isnotempty(ExtractedDomain)
| extend Target_TimeGenerated = TimeGenerated;
let Domain_Indicators = ThreatIntelIndicators
| extend IndicatorType = replace(@"\[|\]|\""", "", tostring(split(ObservableKey, ":", 0)))
| where isnotempty(IndicatorType) and IndicatorType == "domain-name"
| extend DomainName = tolower(ObservableValue)
| extend TI_DomainEntity = DomainName
| where isnotempty(DomainName)
| where TimeGenerated >= ago(ioc_lookBack)
| extend TI_Domain = tolower(DomainName)
| extend IndicatorId = tostring(split(Id, "--")[2])
// | where TI_Domain in (DeviceNetworkEventDomains)
| summarize LatestIndicatorTime = arg_max(TimeGenerated, *) by Id
| where IsActive == true and ValidUntil > now()
| extend TrafficLightProtocolLevel = tostring(parse_json(AdditionalFields).TLPLevel)
| project-reorder *, IsActive, Tags, TrafficLightProtocolLevel, DomainName, Type, TI_Domain;
Domain_Indicators
| join kind=innerunique (Extracted_Table) on $left.DomainName == $right.ExtractedDomain
| where Target_TimeGenerated < ValidUntil
| summarize arg_max(Target_TimeGenerated, *) by Id
| extend ThreatType = parse_json(tostring(Data.indicator_types))[0]
| project timestamp = Target_TimeGenerated, ThreatType, Confidence, ExtractedDomain, Target_Column
実行結果
実行結果は以下のようになります。(Syslog テーブルと SyslogMessage カラムを使用しています)
Discussion