Juliaで遊ぶセキュリティエンジニアのための機械学習その2
最近zennをあえてvim上で書いているのですが,ショートカット使いこなせないため書きづらいです.
以前書いた記事の続きです.
やったこと(概要)
前回同様にPythonで実装された書籍の内容をJuliaでやってみたという内容になります.
前回はフィッシングサイトの分類でしたが,今回はSQLインジェクションの分類です.
今回もですが,SQLインジェクションについて詳しく書いているわけでも,Juliaの仕様について詳しく書いているわけでも,機械学習について詳しく書いているわけでもないです.現在自分も絶賛勉強中なので…
実施に至った経緯
- Julia最近書いてないなぁ
- 前回の記事の続きをやろうと思って,やってなかったなぁ
- 年末年始にSQLの勉強をしようと思った(けど,SQLの勉強にならなかったし,結局気づいたら連休終わってた)
環境
Julia 1.11.2
Jupyter lab上で実施
MacBook Air(前回Windows10のノートPC上で実施したけど液晶壊れてしまったのとWin10もサポート終わりそうなので)
SQLインジェクション検出器
前処理
サンプルデータを落としてきます.
#データ取得
cm = @cmd "git clone https://github.com/Morzeux/HttpParamsDataset"
run(cm)
CSVとDataFramesとPlotsとScikitLearnを使います.
using CSV
using DataFrames
using Plots
using ScikitLearn
データを読み込みます.落としてきたCSVはいくつかあるけど,最初は以下のtrainデータ.
※ipynbファイルと同じ階層に落としてきたフォルダがある想定.
df = CSV.read("HttpParamsDataset/payload_train.csv",DataFrame)
この状態のデータだと,HTTPクエリにあたるpayloadとその文字列の長さ,攻撃の種類のattack_type,異常か普通か0,1に分類できるlabelの4列だけです.
書籍の通りに特徴量を増やします.使用するのは平均情報量です.お久しぶりですね.学生時代以来です.
ということで,定義しました.
function H_entropy(x)
N = length(x)
P = [count(==(c),x) for c in Set(x)]./N
H = -sum([p * log2(p) for p in P])
return H
end
引数にはHTTPクエリの文字列が入る予定です.SQLインジェクションの場合,SQLという文法に則ったクエリの文字列が入るから,平均情報量が通常の文字列と異なるという仮説に基づいたものらしいです.
書籍だとPythonのdict.fromkeys(list(x))が使われていたのですが,馴染みがなさすぎたのと,Python環境でいじってみた感じ,Setでもいいのでは?と思いSetにしました.
書籍に記載されていたデータセットに対する算出値とJuliaで算出した値が一致していたので,Setを使って大丈夫だと思います.
df_norm = df[df[!,:attack_type].=="norm",:]
norm_entropies = []
for i in df_norm[!,:payload]
push!(norm_entropies,H_entropy(i))
end
sum(norm_entropies)/length(norm_entropies)
Juliaのデータフレームの書き方に相変わらず慣れないですが,通常のHTTPクエリ文字列の行を取り出し,payload列について平均情報量を計算していき,最後に平均をとっています.
同様にSQLインジェクションのデータについても計算します.
df_sqli = df[df[!,:attack_type].=="sqli",:]
sqli_entropies = []
for i in df_sqli[!,:payload]
push!(sqli_entropies,H_entropy(i))
end
sum(sqli_entropies)/length(sqli_entropies)
平均値だけではデータの特徴は掴めないとのことで,書籍通りにヒストグラムにします.
JuliaではPlotsを使います.バックエンドでmatplotlibも選べるとのことですが,今回はデフォルトで.
histogram(norm_entropies,bin=range(0,6,length=30))
title!("Entropies of nromal HTTP query string")
xlabel!("Entropy")
ylabel!("Numbers")
histogram(sqli_entropies,bin=range(0,6,length=30))
title!("Entropies of SQLi HTTP query string")
xlabel!("Entropy")
ylabel!("Numbers")
SQLインジェクションと通常のクエリでは,ヒストグラムも異なりました.
書籍ではSQLインジェクションの場合,クエリ内に閉じカッコ")"があると仮説を立て,その有無の特徴量を追加しているので真似します.
余談ですがセミコロンがクエリ内にあるのかなと思って,数えてみたんですけど,今回のデータセットでは閉じカッコの方が通常のクエリとの違いになりそうでした.
function func_preprocessing(df)
df = df[(df[!,:attack_type].=="sqli") .|| (df[!,:attack_type].=="norm"),:]
entropies = Float64[]
closing_parenthesis = Int64[]
for i in df[!,:payload]
push!(entropies,H_entropy(i))
if ')' ∈ i
push!(closing_parenthesis,1)
else
push!(closing_parenthesis,0)
end
end
#作成した平均情報量と閉じカッコの有無についてデータフレームに追加
insertcols!(df,:entropy=>entropies)
insertcols!(df,:closing_parenthesis=>closing_parenthesis)
replace!(df[!,:label], "norm"=>"0")
replace!(df[!,:label], "anom"=>"1")
return df
end
df = func_preprocessing(df)
この関数もJuliaのデータフレームに慣れてなさすぎて,Pythonから書きかえるのに時間がかかりました.ロジックはシンプルなのですが…ぐぬぬ
学習および予測
ここからは,前回の記事と同じ感じになります.書籍だとoptunaでハイパーパラメータの探索をしていますが,今回も特にせず.(やり方含めて調べる気力が足りず)
test_data = CSV.read("HttpParamsDataset/payload_test.csv",DataFrame)
test_data = func_preprocessing(test_data)
df_x = Array(df[!,[:length,:entropy,:closing_parenthesis]])
test_x = Array(test_data[!,[:length,:entropy,:closing_parenthesis]])
df_y = parse.(Int64,df[!,:label])
test_y = parse.(Int64,test_data[!,:label])
検証用データを読み込み,学習データとともに整備していきます.
データフレームのままだとエラーが出てしまったため,Arrayにします.label列もこのまま使うと型がstringでエラーになってしまうため,Int64に変換します.
でもって学習させます.
@sk_import linear_model: LogisticRegression
classifier = LogisticRegression(solver="lbfgs")
classifier.fit(df_x,df_y)
ちなみに重みを見てみると以下のようで,おそらく閉じカッコが強そうです…
終わったら予測して,正答率を確認.
ŷ = classifier.predict(test_x)
accuracy = 100 * count(ŷ .== test_y) / length(test_y)
書籍の決定木による分類だと正答率は96%くらいで,こっちは98%…
用意されたデータが良いのか,正答率高すぎない?と身構えてしまいつつ,セキュリティ系も医療ほどではないと思いますが,誤検知とかで結構デカめの問題になりかねないと最近思っており.
偽陽性の場合でも,アラートからメールならまだしも夜中に電話引っ切り無しに鳴られてもしんどいですね…
ということで,前回では算出しなかった混同行列も算出してみます.
探せばScikitLearnの中に算出してくれる関数があるかもしれませんが,見つけることができず.
正答率と同じく,自前で用意します.
function confusionmatrix(ŷ,y)
TP = count(y .== ŷ .== 1)
TN = count(y .== ŷ .== 0)
FP = count(y .!== ŷ .== 1)
FN = count(y .!== ŷ .== 0)
[TN FP; FN TP]
end
confusionmatrix(ŷ,test_y)
もっといい書き方あったかなと思いつつ…
countの使い方を忘れておりしばらく書いてないと思い出せないものですね…
思い出せたのですが,業務で別言語でreturn書き忘れて想定通り動かないみたいなことやらかしていたのですが,Juliaの場合returnを端折ってもちゃんと返してくれるはずです.上記で動きます.
偽陰性の方が偽陽性より多そうということはわかりました.
どっちもないに越したことはないですけど,個人的には偽陰性の方がまずいと思っているので,うーん…
Ngram
書籍と同じくNgramを使います.今回はユニグラムです.
閉じカッコがポイントのような仮説を立てていたのですが,必ずしもデータを分析するデータサイエンティストや機械学習エンジニアがセキュリティドメインの知識を持っているわけでもないとのことでNgramです.
書籍だとTF-IDFによるベクトル表現を使っていて,Juliaにもパッケージ等はあるようでしたが,学生時代の講義の課題でスクラッチでPythonで実装したなぁと思い手組みしました.正しく実装できている自信はないので,悪しからず…
なお,colabに提出した課題のコードが残っているかと思ったら,残ってなかったです.さて単位は取れていたのか?
上ではtrainとtestが使用データのCSVの時点で分けられていましたが,書籍では合体させた後にランダムに分割していたので,自分も真似します.
前回の記事でも分割を手組みしましたが,今回も結局Scikit-Learnの機能を使いこなせなかったため,手組みしました.
以上よりRandomを使います.
using Random
前処理です.
df = CSV.read("HttpParamsDataset/payload_train.csv",DataFrame)
df_test = CSV.read("HttpParamsDataset/payload_test.csv",DataFrame)
#SQLインジェクション通常のクエリの行を抽出
train_rows = df[(df[!,:attack_type].=="sqli") .|| (df[!,:attack_type].=="norm"),:]
test_train_rows = df_test[(df_test[!,:attack_type].=="sqli") .|| (df_test[!,:attack_type].=="norm"),:]
#ラベルを0,1に変換
replace!(train_rows[!,:label], "norm"=>"0")
replace!(train_rows[!,:label], "anom"=>"1")
df_y = parse.(Int64,train_rows[!,:label])
replace!(test_train_rows[!,:label], "norm"=>"0")
replace!(test_train_rows[!,:label], "anom"=>"1")
test_y = parse.(Int64,test_train_rows[!,:label])
#使用するのはHTTPクエリのみのためpayloadのみ抽出
train_rows = train_rows[!,:payload]
test_train_rows = test_train_rows[!,:payload]
#分割されていたCSVをまとめる
X = [train_rows;test_train_rows]
y = [df_y;test_y]
TF-IDFを組みます.min_dfなるものが書籍では設定されていました.ChatGPTに聞いたところ,文書内の単語の出現割合の最小閾値,それ以下の場合削る?ようなので,filter関数でごまかすことにしました.
function tfidf(X)
#重複無くすためSetを用意
Y = Set(Char[])
for x in X
for w in Set(x)
push!(Y, w)
end
end
#文字と今回のデータ内の出現頻度を辞書で格納
wordcount = Dict{Char,Int64}()
for y in Y
cnt = 0
for x in X
cnt += y ∈ x ? 1 : 0
end
push!(wordcount,y=>cnt)
end
#出現頻度の割合でフィルター
wordcount = filter(x->x.second/length(Y)>0.1, wordcount)
# 全文書数
N = length(X)
Y = zeros(N,length(wordcount))
for i in range(1,N)
idf = log.(N ./ values(wordcount))
tf = count.(keys(wordcount),X[i])
Y[i,:] = tf .* idf
end
return Y
end
X = tfidf(X)
学習用データとテストデータに分割していきます.
N = length(y)
#ランダムに8:2に分割
θ = Int(floor(N*0.8))
Shuffle_Index = shuffle(1:N)
n1 = Shuffle_Index[1:θ]
n2 = Shuffle_Index[θ+1:end]
x_train = Array(X[n1,:])
y_train = Array(y[n1])
x_test = Array(X[n2,:])
y_test = Array(y[n2])
上記と同じくLogisticRegressionをこすりました.
@sk_import linear_model: LogisticRegression
classifier = LogisticRegression(solver="lbfgs")
classifier.fit(x_train,y_train)
予測します.
ŷ = classifier.predict(x_test)
結果を見ます
accuracy = 100 * count(ŷ .== y_test) / length(y_test)
ほぉ…99%…
書籍でもLightGBMつかって99%出ていましたので,上のコード間違っていなかったのか?
はたまた偶然変換していったらうまくできてしまったのか?
confusionmatrix(ŷ,y_test)
混同行列も偽陽性0!
ヨシ!
感想
@sk_importしてしまったので,Juliaの機械学習パッケージを活用して,別モデル使ってみるのもありだなと思いました.とはいえ,99%の正答率を出されちゃうとモチベ的に…ですが.
あとはドメイン知識なくしても予測できるのはセキュリティエンジニアが不在の現場だと良いと感じました.
シチュエーション的にセキュリティシステムに対して,こういう手組みで実装することはセキュリティベンダーでもない限りあまり想像できないのと,今回のようなシステムはWAFや次世代ファイアウォールなどに組み込まれている気がしますし,そのベンダーでは機械学習に明るい人材とセキュリティに明るい人材両方確保できていると思うので,メリットってほどメリットになるかな?とも思いつつ…
こういうゼロから実装するパターンではないにしても,以前現場でセキュリティ,機械学習どちらにも明るい人材がいない中,機械学習を使ったセキュリティシステムを構築していたところを見たことがあり,なかなか悲惨だったので,現場の負担を少しはやわらげる可能性があると言う意味でも,Ngramによる分類で精度が出ることは良いなと思いました.
最後にコーディング面について.
型システムをもっと使っていこうと思いつつ,今回も使ったり使わなかったりで統一取れなかったです.
可読性もですね.
業務でも感じているのですが,他の人が読みやすいコードにするために,むやみにコメント入れるのがベストでもないし,グローバル変数は作りすぎないとかリーダブルコードにある内容を実践しようと思いつつ,なかなか上手くいかないという感じです.個人で書いていく際にも今後もっと意識的に書けていけたらなぁと思います.
Discussion