🥬

続・MeCabの分かち書きを並列処理で高速化する

2022/11/16に公開約15,500字

まとめ

  • Pythonから巨大なテキストファイルを並列に読み込み・処理・書き込みする方法を紹介
  • 読み込み: テキストファイルをバイト列として見て、プロセスごとにファイルの読み込み区間を割り振る
  • 処理: multiprocessingを用いた並列処理
  • 書き込み: プロセスごとにtmpファイルへ並列に書き込み & catによる結合

はじめに

日本語形態素解析器であるMeCabを用いると、日本語のテキストに対する解析や処理が簡単に実行できます。
特に最近は、BERTをはじめとする深層学習モデルへの入力のための前処理として、MeCabなどを用いて文を単語単位に分割する「分かち書き」を行う機会が多くなっています。

MeCabはコマンドラインから実行することもできますし、Pythonなどからプログラム的に呼び出すことも可能です。
特にコマンドラインから実行する場合は、インストールさえ終わっていれば以下のように簡単にテキストファイルに対する分かち書きを実行できて便利です。

mecab -Owakati ./in.txt -o ./out.txt

しかしこの方法だと、入力テキストファイルが非常に大きかった場合に長い時間がかかってしまいます。

ネット上にも、解析対象のテキストファイルが大量に存在する場合についての記事はいくつかあるのですが、単一の巨大なファイルに対する解析を高速化する話はあまり見当たりませんでした。

これについて、以前入力ファイルを分割して並列処理することで処理する記事を書きました。

https://zenn.dev/hpp/articles/255de192c6d4c3

この記事では、主にシェルスクリプトを書くことでいい感じに分かち書きの並列化を行いました。
しかし、分かち書き後のテキストファイルをさらにPythonで処理したい場合や、そもそもMeCabではなく他のツールを用いて巨大なテキストファイルを並列で処理したい場合について対応できていませんでした。

そこで今回は、Pythonプログラムのみで巨大なテキストファイルに対してMeCabの分かち書きを並列で実行する方法をいくつか紹介し、それらの速度を実験的に比較します。
また、MeCab以外のツールを使う場合にも汎用的に利用可能なコードスニペットを提供します。

本記事のコードと実験結果は以下のリポジトリで見ることができます。

実験設定

本記事では巨大なテキストファイルに対する処理の高速化手法を比較検討します。
そのため、ある程度大きなテキストファイルを用意したほうがより役立つ知見が得られるでしょう。
また、実行環境ごとの違いも気になります。

そこで今回は、まずテキストファイルとして、前処理済みのWikipediaデータセットであるWiki40Bの日本語テキストを用いました。
ファイルサイズは1.8GiBです。

また、実験環境として、コア数が異なる2つのマシンを用意しました。
一つ目はM1 MacBook proで、コア数は10です。補足ですが、M1系列のMacBookはファイルIOがかなり速いと言われています。
二つ目はUbuntuのサーバで、コア数は40です。注意点として、このサーバでの実験はNFS(Network File System)に載った状態で行っています。NFSを用いると、一般にファイルIOは遅くなります。

さらに、実験ではmultitimeコマンドを用いて各手法による処理を10回実施し、その統計値を用いて評価します。

前置きが長くなりましたが、早速並列処理の具体例について実際のコードを示しながら紹介します。

手法1: naive

まずはPythonからMeCabを用いてナイーブに分かち書きを行うコードを示します。
大雑把には、**「ファイルを順に読み込み、分かち書きも順に行い、書き込みも順に行う」**というコードです。

naive.py
from pathlib import Path

import MeCab

IN = Path("./in.txt")
OUT = Path("./out.txt")


def main():
    mecab = MeCab.Tagger("-Owakati")

    with IN.open() as f, OUT.open("w") as out:
        for line in f:
            line: str = mecab.parse(line.strip()).strip()
            if not line.isspace():
                out.write(f"{line}\n")


if __name__ == "__main__":
    main()

ここで、in.txtが先ほど述べた本記事で用いる巨大なテキストファイル(1.8GiB)です。

コードは非常にシンプルで、テキストファイルを開いてそれを行ごとに読み込み(for line in fの部分)ながら、MeCabによる分かち書きを実行しています。

for line in f:形式でファイルの処理を行うと、テキストファイル全体をメモリに載せず、行ごとに処理を行えるのでメモリの節約になります。

手法2: serial_read

次に、MeCabによる処理を並列化するコードを紹介します。
大雑把には、「ファイルを順に読み込み、分かち書きを並列で行い、書き込みを順に行う」 というコードです。

serial_read.py
import os
from multiprocessing import Pool
from pathlib import Path

import MeCab
from more_itertools import divide

IN = Path("./in.txt")
OUT = Path("./out.txt")


def run(text_iter):
    mecab = MeCab.Tagger("-Owakati")
    results = []

    for line in text_iter:
        line: str = mecab.parse(line.strip()).strip()
        if not line.isspace():
            results.append(line)

    return results


def main():
    num_procs = os.cpu_count()
    with IN.open() as f, OUT.open("w") as w:
        with Pool(processes=num_procs) as pool:
            for sentences in pool.map(run, divide(num_procs, f)):
                w.write("".join(f"{s}\n" for s in sentences))


if __name__ == "__main__":
    main()

more-itertoolsというライブラリのdivideという関数を用いることで、イテラブル(今回はf)を指定した数に分割することができます。
今回はコア数分にイテレータを分割し、コア数分だけプロセスを用意、それらのプロセスが並列にMeCabの処理を行うという流れです。

脱線しますが、このコードはそこまで複雑なことはしていないので、並列化処理のコードスニペットとしても使ってもらえるかなと思います。

ちなみに、MeCab.Taggerインスタンスの初期化は結構重たいので、もしMeCabを用いた並列処理をやりたい方がいたらその点に注意するとよさそうです。

今回は大丈夫ですが、例えばmultiprocessingを使っても文ごとにMeCab.Taggerの初期化をするようなコードを書いてしまうと非常に遅くなると思います。
その対策として、このコードではdivideを使ってMeCab.Taggerの初期化がコア数分しか走らないようにしています。

手法3: parallel_read

次に、分かち書きだけでなく、テキストファイルの読み込みも並列化するコードを紹介します。
大雑把には、「ファイルを並列に読み込み、分かち書きを並列で行い、書き込みを順に行う」 というコードです。

parallel_read.py
import os
from multiprocessing import Pool
from pathlib import Path

import MeCab

IN = Path("./in.txt")
OUT = Path("./out.txt")


def run(start: int, end: int):
    mecab = MeCab.Tagger("-Owakati")
    results = []

    current = start
    with IN.open() as f:
        f.seek(start)

        for line in f:
            current += len(line.encode())
            line: str = mecab.parse(line.strip()).strip()

            if not line.isspace():
                results.append(line)

            if current >= end:
                break

    return results


def main():
    file_size = IN.stat().st_size
    num_procs = os.cpu_count()

    chunk_size = file_size // num_procs
    chunks = []
    start, end = 0, chunk_size

    with IN.open(encoding="utf-8", errors="ignore") as f:
        while end < file_size:
            f.seek(end)
            f.readline()

            end = f.tell()
            chunks.append((start, end))
            start, end = end, end + chunk_size

        chunks.append((start, file_size))

    with OUT.open("w") as out:
        with Pool(processes=num_procs) as pool:
            for sentences in pool.starmap(run, chunks):
                out.write("\n".join(sentences) + "\n")


if __name__ == "__main__":
    main()

少しコードが複雑になりましたが、大事なポイントは以下です。

  • テキストファイルはバイト単位で読むこともできる
  • テキストファイルをプロセスの数だけのバイト列の区間に分割
    • まずテキストファイルのバイト数を計測して並列数で割ることで大雑把な区間(始点と終点)を算出
    • が、分かち書き処理をするにあたり、解析対象文字列の改行は保存されていてほしい
    • ので、中途半端な(改行じゃない)位置に区間の終点が来てしまったら、改行位置まで終点をずらす
  • 各プロセスに対象ファイルの読み込むべき区間(始点と終点)のみを渡す
    • テキストファイル全体の読み込みはしていない
    • 区間情報のみ
  • 各プロセスが同じテキストファイルの異なる区間を並列に読み込む

特に、f.seek()によってファイルの読み出し位置をバイト単位で指定できるので、これをうまく活用しています。
今回のコードは以下の記事を参考にしています。

https://nurdabolatov.com/parallel-processing-large-file-in-python

手法4: parallel_read_write

さらに、テキストファイルの読み込みと分かち書きだけでなく、テキストファイルの書き込みも並列化するコードを紹介します。
大雑把には、「ファイルを並列に読み込み、分かち書きも並列で行い、書き込みも並列に行う」 というコードです。

parallel_read_write.py
import os
import subprocess
import tempfile
from multiprocessing import Pool
from pathlib import Path

import MeCab

IN = Path("./in.txt")
OUT = Path("./out.txt")


def run(start: int, end: int):
    mecab = MeCab.Tagger("-Owakati")
    current = start

    with IN.open() as f, tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp:
        f.seek(start)

        for line in f:
            current += len(line.encode())
            line: str = mecab.parse(line.strip()).strip()

            if not line.isspace():
                tmp.write(f"{line}\n")

            if current >= end:
                break

        return tmp.name


def main():
    file_size = IN.stat().st_size
    num_procs = os.cpu_count()

    chunk_size = file_size // num_procs
    chunks = []
    start, end = 0, chunk_size

    with IN.open(encoding="utf-8", errors="ignore") as f:
        while end < file_size:
            f.seek(end)
            f.readline()

            end = f.tell()
            chunks.append((start, end))
            start, end = end, end + chunk_size

        chunks.append((start, file_size))

    with Pool(processes=num_procs) as pool:
        paths = list(pool.starmap(run, chunks))
        paths_cat = [f'"{path}"' for path in paths]
        cmd = f"cat {' '.join(paths_cat)} > {OUT}"
        subprocess.call(cmd, shell=True)
        for path in paths:
            os.remove(path)


if __name__ == "__main__":
    main()

先ほどのコード(手法3)では、分かち書き結果をテキストファイルに書き込む部分は、順に行うようになっていました。

今回のコードでは、tmpファイルを利用することで、各プロセスが別々にファイルへ書き込みを行えるようにしつつ、最終的な結果ファイルはtmpファイルを結合することで得ています。

こうすると結構メモリも節約できて(テキストがメモリに載ってしまうようなことがなくて)良いかと思います。

手法5: parallel_read_write_at_once

さらにさらに、テキストファイルの読み込みと書き込みを並列・一気に行うコードを示します。
これは、実験環境にもよりますが、テキストファイルを順に処理するよりも、最初に一気に読み込み・最後に一気に書き込みをしてしまった方が、IOが早いのではないかと考えたためです。

大雑把には、「ファイルを並列で一気に読み込み、分かち書きを並列で行い、書き込みを並列で一気に行う」 というコードです。

parallel_read_write_at_once.py
import os
import subprocess
import tempfile
from multiprocessing import Pool
from pathlib import Path

import MeCab

IN = Path("./in.txt")
OUT = Path("./out.txt")


def run(start: int, end: int):
    mecab = MeCab.Tagger("-Owakati")

    with IN.open("rb") as f, tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp:
        f.seek(start)

        results = []
        for byte in f.read(end - start).splitlines():
            line: str = mecab.parse(byte.decode().strip()).strip()
            if not line.isspace():
                results.append(line)

        tmp.write("".join(f"{line}\n" for line in results))
        return tmp.name


def main():
    file_size = IN.stat().st_size
    num_procs = os.cpu_count()

    chunk_size = file_size // num_procs
    chunks = []
    start, end = 0, chunk_size

    with IN.open(encoding="utf-8", errors="ignore") as f:
        while end < file_size:
            f.seek(end)
            f.readline()

            end = f.tell()
            chunks.append((start, end))
            start, end = end, end + chunk_size

        chunks.append((start, file_size))

    with Pool(processes=num_procs) as pool:
        paths = list(pool.starmap(run, chunks))
        paths_cat = [f'"{path}"' for path in paths]
        cmd = f"cat {' '.join(paths_cat)} > {OUT}"
        subprocess.call(cmd, shell=True)
        for path in paths:
            os.remove(path)


if __name__ == "__main__":
    main()

内容としては、手法4とほぼ変わらず、順に読み込み・書き込みを行うか、一気に読み込んでリストを作成し、結果をリストに溜めて一気に書き込むかといった違いのみです。

実験

それでは、以上5つの手法について、1.8GiBのテキストファイルを対象に速度比較を行った結果を示します。

実験環境はM1 MacBook pro (10 cores)と、Ubuntu Server (40 cores, NFS)の二つです。
これらでの実験結果を順に示します。

M1 MacBook pro (10 cores)

まず、10コアのM1 MacBook proで実験した結果が以下の通りです。

M1 MacBook pro (10 cores) mean std min median max improvement
手法1: naive 162.568 0.895 161.700 162.386 164.721 x1.000
手法2: serial_read 42.620 0.620 42.148 42.428 44.357 x3.814
手法3: parallel_read 35.527 0.106 35.345 35.511 35.732 x4.576
手法4: parallel_read_write 20.826 0.255 20.598 20.732 21.523 x7.806
手法5: parallel_read_write_at_once 20.678 0.146 20.493 20.645 21.036 x7.862

最左の列がそれぞれの手法を示しています。
また、improvement手法1: naiveと比較した時の、それぞれの手法の速度向上具合を示しています。
特に、meanimprovementに注目して表を見ていただけるとよいと思います。

結果としては、手法1: naiveが平均して162秒程度かかっていたのに対して、手法4・手法5では20秒程度で1.8GiBのテキストファイルの分かち書きを行えており、大雑把に7.8倍の高速化に成功しました。

また、MeCabによる分かち書き部分のみを並列化した手法2: serial_readと比較して、ファイルの読み込み・書き込みも並列化することで、約2倍も高速化できています。

Ubuntu Server (40 cores, NFS)

次に、40コアのマシンで実験した結果が以下の通りです。

Ubuntu Server (40 cores, NFS) mean std min median max improvement
手法1: naive 444.190 47.739 404.738 429.977 579.257 x1.000
手法2: serial_read 350.161 207.625 139.832 276.194 749.342 x1.269
手法3: parallel_read 278.844 229.585 61.472 179.473 719.069 x1.593
手法4: parallel_read_write 52.463 2.525 49.139 51.760 56.053 x8.467
手法5: parallel_read_write_at_once 56.320 11.061 46.864 52.601 82.210 x7.887

M1 MacBookと比較してファイルIOが重たいこともあり、手法1: naiveは平均して444秒程度かかってしまっていますが、それと比較して手法4: parallel_read_writeは約8.5倍高速化できています。

残念な点として、M1 MacBook proよりもコア数が増えているので、手法1: naiveと比較したときの各並列化手法の速度向上はより大きくなると思ったのですが、今回はファイルIOがボトルネックとなり、そこまでの速度向上ができなかったと思われます。

これについては、そもそもNFSに載っていない普通の計算機環境であれば、純粋に高速化しそうではあります。

また、手法5: parallel_read_write_at_onceがそこまで高速化していませんでした。
今回は、あまり一気に読み込み・書き込みをしても効果が無かったようです。

全体として、二つの実験環境のうちのどちらでも、読み込み・分かち書き・書き込みのすべてを並列化することによる大幅な高速化が確認できました。

まとめ (改)

本記事ではMeCabを用いた分かち書き処理を高速化するために、並列化手法をいくつか検討し、実際に二つの実験環境で速度比較実験を行いました。

結果としては、テキストファイルの読み込み・分かち書き・ファイルの書き込みをすべて並列化する手法が最も高速化し、並列化をしない場合と比較して7~8倍の速度向上が確認できました。

今回は純粋なPythonによる処理をいくつか比較してみましたが、より高速化するのであれば、Rustで実装されたMeCab互換の形態素解析器であるVibratoを用いたり、並列処理部分をC/C++やRustなど他の言語で実装する、などの方法が考えられそうです。

ここまでお読みいただき大変ありがとうございました。
再掲になりますが、今回のコードはGitHub上で公開しています。

https://github.com/hppRC/mecab-parallel-experiment

もし何か質問やさらなる改善案がおありでしたら、気兼ねなくコメントやTwitterでのリプライ、DMをいただけると嬉しいです。

最後に、いろいろな状況に対応できるように、巨大テキストファイルへの並列処理をある程度抽象化したコードスニペットを示して終わりにします。

コードスニペット

example.py
import MeCab

from process_file_parallel import process_file_parallel, process_file_parallel_and_save

mecab = MeCab.Tagger("-Owakati")


def fn(line):
    return mecab.parse(line.strip()).strip()


def main():
    results = process_file_parallel(input_path="in.txt", fn=fn)
    print(results[0])
    print(len(results))

    process_file_parallel_and_save(input_path="in.txt", output_path="out.txt", fn=fn)


if __name__ == "__main__":
    main()
process_file_paralle.py
import os
import subprocess
import tempfile
from multiprocessing import Pool
from pathlib import Path
from typing import Callable, List, Optional, TypeVar, Union

T = TypeVar("T")
Fn = Callable[[str], Optional[T]]


def run_and_return(
    input_path: Path,
    fn: Fn,
    start: int,
    end: int,
) -> List[T]:
    results = []
    current = start
    with input_path.open() as f:
        f.seek(start)
        for line in f:
            current += len(line.encode())
            ret: T = fn(line.strip())
            if ret is not None:
                results.append(ret)
            if current >= end:
                break

    return results


def run_and_save(
    input_path: Path,
    fn: Fn,
    start: int,
    end: int,
) -> List[T]:
    current = start
    with input_path.open() as f, tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp:
        for line in f:
            current += len(line.encode())
            ret: T = fn(line.strip())

            if ret is not None:
                tmp.write(f"{str(ret)}\n")

            if current >= end:
                break

        return tmp.name


def generate_file_chunks(input_path: Path, num_chunks: int):
    file_size = input_path.stat().st_size

    chunk_size = file_size // num_chunks
    start, end = 0, chunk_size

    with input_path.open(encoding="utf-8", errors="ignore") as f:
        while end < file_size:
            f.seek(end)
            f.readline()

            end = f.tell()
            yield (start, end)
            start, end = end, end + chunk_size

        yield (start, file_size)


def process_file_parallel(
    input_path: Union[str, Path],
    fn: Fn,
    num_procs: int = None,
) -> List[T]:
    input_path: Path = Path(input_path)
    num_procs: int = num_procs or os.cpu_count()

    with Pool(processes=num_procs) as pool:
        chunks = [
            (input_path, fn, start, end)
            for start, end in generate_file_chunks(input_path, num_chunks=num_procs)
        ]
        return list(zip(*pool.starmap(run_and_return, chunks)))


def process_file_parallel_and_save(
    input_path: Union[str, Path],
    output_path: Union[str, Path],
    fn: Fn,
    num_procs: int = None,
) -> None:
    input_path: Path = Path(input_path)
    output_path: Path = Path(output_path)
    num_procs: int = num_procs or os.cpu_count()

    with Pool(processes=num_procs) as pool:
        chunks = [
            (input_path, fn, start, end)
            for start, end in generate_file_chunks(input_path, num_chunks=num_procs)
        ]
        paths = list(pool.starmap(run_and_save, chunks))
        paths_cat = [f'"{path}"' for path in paths]
        cmd = f"cat {' '.join(paths_cat)} > {str(output_path)}"
        subprocess.call(cmd, shell=True)
        for path in paths:
            os.remove(path)

Discussion

ログインするとコメントできます