ElixirでAho Corasick法を書いてみた

11 min読了の目安(約6800字TECH技術記事

仕事でAho Corasick法のコードレビューをする機会があり、調べてみたところ面白かったのでElixirで実装してみました✨

Aho Corasickとは

エイホ–コラシック法は、入力テキストについて有限の文字列群(辞書)の各要素を探す辞書式マッチングアルゴリズムの一種である。全パターンのマッチングを一斉に探索するため、そのアルゴリズムの計算量はパターン群の長さに対しても対象テキストの長さに対しても線形であり、さらに見つかったマッチングの数に対しても線形である。全てのマッチングを検出するため、パターン群にサブ文字列があれば、重複して検出される点に注意されたい(つまり、辞書が a, aa, aaa, aaaa で、入力テキストが aaaa の場合など)。
https://ja.wikipedia.org/wiki/エイホ–コラシック法

例えば Zennはプログラマーのための新しい情報共有コミュニティです。 という文章があったときに、任意の辞書に載っている プログラマー 情報 情報共有 コミュニティ といった単語を出現位置と共に抽出することができます。

計算量はO(N+M)となり、辞書データが増えても、高速で単語の抽出が可能になります。
過去のISUCONでも出題されて、話題になったこともありますね。

Trie木

Aho CorasickはTrie木を用いたパターンマッチングオートマトン(PMA)を構築し
文字列を探索しています。

例えば BEAM BEACH CHECK といった3単語からTrie木を作ると以下のようなTrie木を作成することができます。

青字はそのノードまで遷移したときの出力文字列を表しています。

このTrie木に BEACHECK という文字列を入力すると以下の様に文字列を探索します。

  1. 根→B→E→A→C→Hで遷移し、その次のEで遷移失敗。BEACH を出力し、根に移動して、次の入力文字へ。
  2. 根→Eで遷移失敗。次の入力文字へ。
  3. 根→Aで遷移失敗。次の入力文字へ
  4. 根→C→H→E→C→Kで遷移し、 CHECK を出力

これで BEACH CHECK を出力できます。

次にAho Corasick法で構築した遷移失敗時の遷移先を変えたTrie木を見てみます

BEACCHECK
BEAH → CHECK

と遷移失敗したときの矢印を追加しました。

このときの移動は以下のようになります

  1. 根→B→E→A→C→Hで遷移し、その次のEで遷移失敗。BEACH を出力し、CHに移動。
  2. CH→E→CKで遷移し、CHECK を出力

先程あった一文字ずつずらす手順が消えています。
BEACHCHECK でサフィックスとプレフィックスが同一なため、ノード遷移の手間が減らせるわけですね✨

実装

今回Aho Corasickを実装した上で主な要素は以下の3つになります

  • State構造体: ノードを表す
  • make_goto関数: Trie木を作成する
  • make_failure関数: 作成したTrie木から遷移失敗時の遷移先を決定する

State構造体

defmodule State do
  defstruct id: 0, next: %{}, failure: 0, outputs: []

  def update_failure(%State{} = state, new_id) do
    Map.update!(state, :failure, fn _ -> new_id end)
  end

  def add_next(%State{} = state, character, id) do
    Map.update!(state, :next, &Map.put(&1, character, id))
  end

  def add_outputs(%State{} = state, output) when is_binary(output) do
    Map.update!(state, :outputs, &[output | &1])
  end

  def add_outputs(%State{} = state, outputs) when is_list(outputs) do
    Map.update!(state, :outputs, &(&1 ++ outputs))
  end

  def next_id(%State{} = state, character) do
    state.next
    |> Map.get(character)
  end
end

  • id Trie木はStateの配列になり、そのインデックスを格納する
  • next 通常の遷移先。文字をキー, 遷移先のIDを値として持つ
  • failure 遷移失敗時の遷移先ID
  • outputs 遷移してきたとこの出力文字 (先程の図の青文字部分)

make_goto関数

  # termsに辞書文字列が入力されます
  def make_goto(terms, states \\ [%State{}])
  def make_goto([], states), do: states

  def make_goto([term | tail], states) do
    {cur_id, states} =
      term
      |> String.codepoints()
      |> Enum.reduce({0, states}, fn char, acc ->
        make_goto_process(char, acc)
      end)

    states = List.update_at(states, cur_id, &State.add_outputs(&1, term))

    make_goto(tail, states)
  end

  defp make_goto_process(char, {cur_id, states}) do
    cur = Enum.at(states, cur_id)

    unless Map.has_key?(cur.next, char) do
      new = %State{id: length(states)}
      states = List.update_at(states, cur.id, &State.add_next(&1, char, new.id))
      {new.id, states ++ [new]}
    else
      cur_id = Map.get(cur.next, char)
      {cur_id, states}
    end
  end

割とやっていることは単純で辞書キーワードを一文字ずつ見ていき、nextにその文字に対応した遷移先がない場合はnextに新しくノードを追加する、ということをしていっています。

Elixirでは配列などはImmutableなので、配列を引き回して更新していくようにしています。
この辺がもう少し綺麗にかけるようになりたいですね。。

make_failure関数

  def make_failure(states) do
    q =
      states
      |> List.first()
      |> Map.fetch!(:next)
      |> Map.values()
      |> Enum.reduce(:queue.new(), fn index, queue ->
        :queue.in(index, queue)
      end)

    make_failure_process(:queue.out(q), states)
  end

  def make_failure_process({:empty, _}, states), do: states

  def make_failure_process({{:value, i}, next_q}, states) do
    cur_state = Enum.at(states, i)

    {new_states, q} =
      cur_state
      |> Map.fetch!(:next)
      |> Map.keys()
      |> Enum.reduce({states, next_q}, fn character, {states, q} ->
        target_state_id = State.next_id(cur_state, character)
        q = :queue.in(target_state_id, q)

        f = trace_states(cur_state.failure, states, character)

        new_states =
          List.update_at(states, target_state_id, fn s ->
            s
            |> State.update_failure(f.id)
            |> State.add_outputs(f.outputs)
          end)

        {new_states, q}
      end)

    make_failure_process(:queue.out(q), new_states)
  end

  def trace_states(id, states, character) when is_integer(id) do
    cur = Enum.at(states, id)

    case State.next_id(cur, character) do
      nil ->
        case id do
          0 -> Enum.at(states, 0)
          _ -> trace_states(cur.failure, states, character)
        end

      next_id ->
        Enum.at(states, next_id)
    end
  end

こちらの関数では先程の BEACHCHECK を見つける処理を行っていきます。

こちらでは

  1. 根ノードの遷移先ノードすべてをキューに追加する
  2. キューからノードを取り出す。(ノードAとする)
  3. ノードAの遷移先をキューに追加する(ノードBとする)
  4. ノードBについて、ノードAのfailureから辿れる一番深いノードを見つける(ノードFとする)
  5. 見つけたノードFを新たにノードBのfailureに追加する
  6. ノードAの遷移先すべてに 3, 4, 5の処理を行ったら2にもどる

といった処理を行っています。

ここまででAho CorasickによるPMAの構築が出来ました。

match関数

構築したPMAを用いた実際の文字列探索を行うロジックが以下になります

  def match(states, word) do
    {_, _, matches} =
      (word <> <<0>>)
      |> String.codepoints()
      |> Enum.reduce({0, 0, []}, fn character, {cur_id, i, matches} ->
        s = Enum.at(states, cur_id)

        next_id =
          case State.next_id(s, character) do
            nil ->
              case Enum.at(states, s.failure) |> State.next_id(character) do
                nil -> 0
                next_id -> next_id
              end

            next_id ->
              next_id
          end

        matches =
          case s.outputs do
            [] ->
              matches

            outputs ->
              matches ++
                Enum.reduce(outputs, [], fn o, acc -> [{i - String.length(o), i - 1, o} | acc] end)
          end

        {next_id, i + 1, matches}
      end)

    matches
  end

最後に必ず遷移失敗させるために終端文字を <<0>> を追加しています
(余談ですが、Elixirのこの辺の文字列操作大好きです)

一文字ずつ見ていき、現在のノードのnextに対応する遷移先がなければ、failureの遷移先を見るようにしています。

あまり洗練できていませんが、結構単純な作りになっていますね。

ExAhocora.build(["beam", "beach", "check"])
|> ExAhocora.match("thebeamtargethistisbeacheck")

[{3, 6, "beam"}, {18, 22, "beach"}, {21, 25, "check"}]

実際に動作させてみるとこんな感じで出力することができました🎉

まとめ

Aho Corasick法、最初はめっちゃ難しいアルゴリズムだと思いましたが、実際に調べながら実装することで意外とコード量はそこまで多くなく実装することができました✨

Elixirにまだまだ慣れていないため、変な書き方しているところやもっとこうしたほうがいいとかありましたら、コメントいただけると嬉しいです!
(あとAho Corasickの説明でおかしいところもあれば・・・🙇‍♂️)

コード全文はこちらになります