ElixirでAho Corasick法を書いてみた
仕事でAho Corasick法のコードレビューをする機会があり、調べてみたところ面白かったのでElixirで実装してみました✨
Aho Corasickとは
エイホ–コラシック法は、入力テキストについて有限の文字列群(辞書)の各要素を探す辞書式マッチングアルゴリズムの一種である。全パターンのマッチングを一斉に探索するため、そのアルゴリズムの計算量はパターン群の長さに対しても対象テキストの長さに対しても線形であり、さらに見つかったマッチングの数に対しても線形である。全てのマッチングを検出するため、パターン群にサブ文字列があれば、重複して検出される点に注意されたい(つまり、辞書が a, aa, aaa, aaaa で、入力テキストが aaaa の場合など)。
https://ja.wikipedia.org/wiki/エイホ–コラシック法
例えば Zennはプログラマーのための新しい情報共有コミュニティです。
という文章があったときに、任意の辞書に載っている プログラマー
情報
情報共有
コミュニティ
といった単語を出現位置と共に抽出することができます。
計算量はO(N+M)となり、辞書データが増えても、高速で単語の抽出が可能になります。
過去のISUCONでも出題されて、話題になったこともありますね。
Trie木
Aho CorasickはTrie木を用いたパターンマッチングオートマトン(PMA)を構築し
文字列を探索しています。
例えば BEAM
BEACH
CHECK
といった3単語からTrie木を作ると以下のようなTrie木を作成することができます。
青字はそのノードまで遷移したときの出力文字列を表しています。
このTrie木に BEACHECK
という文字列を入力すると以下の様に文字列を探索します。
- 根→B→E→A→C→Hで遷移し、その次のEで遷移失敗。
BEACH
を出力し、根に移動して、次の入力文字へ。 - 根→Eで遷移失敗。次の入力文字へ。
- 根→Aで遷移失敗。次の入力文字へ
- 根→C→H→E→C→Kで遷移し、
CHECK
を出力
これで BEACH
CHECK
を出力できます。
次にAho Corasick法で構築した遷移失敗時の遷移先を変えたTrie木を見てみます
BEAC → CHECK
BEAH → CHECK
と遷移失敗したときの矢印を追加しました。
このときの移動は以下のようになります
- 根→B→E→A→C→Hで遷移し、その次のEで遷移失敗。
BEACH
を出力し、CHに移動。 - CH→E→CKで遷移し、
CHECK
を出力
先程あった一文字ずつずらす手順が消えています。
BEACH と CHECK でサフィックスとプレフィックスが同一なため、ノード遷移の手間が減らせるわけですね✨
実装
今回Aho Corasickを実装した上で主な要素は以下の3つになります
- State構造体: ノードを表す
- make_goto関数: Trie木を作成する
- make_failure関数: 作成したTrie木から遷移失敗時の遷移先を決定する
State構造体
defmodule State do
defstruct id: 0, next: %{}, failure: 0, outputs: []
def update_failure(%State{} = state, new_id) do
Map.update!(state, :failure, fn _ -> new_id end)
end
def add_next(%State{} = state, character, id) do
Map.update!(state, :next, &Map.put(&1, character, id))
end
def add_outputs(%State{} = state, output) when is_binary(output) do
Map.update!(state, :outputs, &[output | &1])
end
def add_outputs(%State{} = state, outputs) when is_list(outputs) do
Map.update!(state, :outputs, &(&1 ++ outputs))
end
def next_id(%State{} = state, character) do
state.next
|> Map.get(character)
end
end
-
id
Trie木はStateの配列になり、そのインデックスを格納する -
next
通常の遷移先。文字をキー, 遷移先のIDを値として持つ -
failure
遷移失敗時の遷移先ID -
outputs
遷移してきたとこの出力文字 (先程の図の青文字部分)
make_goto関数
# termsに辞書文字列が入力されます
def make_goto(terms, states \\ [%State{}])
def make_goto([], states), do: states
def make_goto([term | tail], states) do
{cur_id, states} =
term
|> String.codepoints()
|> Enum.reduce({0, states}, fn char, acc ->
make_goto_process(char, acc)
end)
states = List.update_at(states, cur_id, &State.add_outputs(&1, term))
make_goto(tail, states)
end
defp make_goto_process(char, {cur_id, states}) do
cur = Enum.at(states, cur_id)
unless Map.has_key?(cur.next, char) do
new = %State{id: length(states)}
states = List.update_at(states, cur.id, &State.add_next(&1, char, new.id))
{new.id, states ++ [new]}
else
cur_id = Map.get(cur.next, char)
{cur_id, states}
end
end
割とやっていることは単純で辞書キーワードを一文字ずつ見ていき、next
にその文字に対応した遷移先がない場合はnext
に新しくノードを追加する、ということをしていっています。
Elixirでは配列などはImmutableなので、配列を引き回して更新していくようにしています。
この辺がもう少し綺麗にかけるようになりたいですね。。
make_failure関数
def make_failure(states) do
q =
states
|> List.first()
|> Map.fetch!(:next)
|> Map.values()
|> Enum.reduce(:queue.new(), fn index, queue ->
:queue.in(index, queue)
end)
make_failure_process(:queue.out(q), states)
end
def make_failure_process({:empty, _}, states), do: states
def make_failure_process({{:value, i}, next_q}, states) do
cur_state = Enum.at(states, i)
{new_states, q} =
cur_state
|> Map.fetch!(:next)
|> Map.keys()
|> Enum.reduce({states, next_q}, fn character, {states, q} ->
target_state_id = State.next_id(cur_state, character)
q = :queue.in(target_state_id, q)
f = trace_states(cur_state.failure, states, character)
new_states =
List.update_at(states, target_state_id, fn s ->
s
|> State.update_failure(f.id)
|> State.add_outputs(f.outputs)
end)
{new_states, q}
end)
make_failure_process(:queue.out(q), new_states)
end
def trace_states(id, states, character) when is_integer(id) do
cur = Enum.at(states, id)
case State.next_id(cur, character) do
nil ->
case id do
0 -> Enum.at(states, 0)
_ -> trace_states(cur.failure, states, character)
end
next_id ->
Enum.at(states, next_id)
end
end
こちらの関数では先程の BEACH → CHECK を見つける処理を行っていきます。
こちらでは
- 根ノードの遷移先ノードすべてをキューに追加する
- キューからノードを取り出す。(ノードAとする)
- ノードAの遷移先をキューに追加する(ノードBとする)
- ノードBについて、ノードAの
failure
から辿れる一番深いノードを見つける(ノードFとする) - 見つけたノードFを新たにノードBの
failure
に追加する - ノードAの遷移先すべてに 3, 4, 5の処理を行ったら2にもどる
といった処理を行っています。
ここまででAho CorasickによるPMAの構築が出来ました。
match関数
構築したPMAを用いた実際の文字列探索を行うロジックが以下になります
def match(states, word) do
{_, _, matches} =
(word <> <<0>>)
|> String.codepoints()
|> Enum.reduce({0, 0, []}, fn character, {cur_id, i, matches} ->
s = Enum.at(states, cur_id)
next_id =
case State.next_id(s, character) do
nil ->
case Enum.at(states, s.failure) |> State.next_id(character) do
nil -> 0
next_id -> next_id
end
next_id ->
next_id
end
matches =
case s.outputs do
[] ->
matches
outputs ->
matches ++
Enum.reduce(outputs, [], fn o, acc -> [{i - String.length(o), i - 1, o} | acc] end)
end
{next_id, i + 1, matches}
end)
matches
end
最後に必ず遷移失敗させるために終端文字を <<0>>
を追加しています
(余談ですが、Elixirのこの辺の文字列操作大好きです)
一文字ずつ見ていき、現在のノードのnext
に対応する遷移先がなければ、failureの遷移先を見るようにしています。
あまり洗練できていませんが、結構単純な作りになっていますね。
ExAhocora.build(["beam", "beach", "check"])
|> ExAhocora.match("thebeamtargethistisbeacheck")
[{3, 6, "beam"}, {18, 22, "beach"}, {21, 25, "check"}]
実際に動作させてみるとこんな感じで出力することができました🎉
まとめ
Aho Corasick法、最初はめっちゃ難しいアルゴリズムだと思いましたが、実際に調べながら実装することで意外とコード量はそこまで多くなく実装することができました✨
Elixirにまだまだ慣れていないため、変な書き方しているところやもっとこうしたほうがいいとかありましたら、コメントいただけると嬉しいです!
(あとAho Corasickの説明でおかしいところもあれば・・・🙇♂️)
コード全文はこちらになります
Discussion