🥤

Locality Sensitive Hashingを用いた大規模コーパスの準重複文書排除

0. はじめに

こんにちは、株式会社D2Cデータサイエンティストの董です。

D2Cでは、広告配信を効率よく効果よく行うために様々な機械学習モデルを活用しています。
今回の記事では、大規模テキストコーパスを用いた言語モデルの学習にあたり、学習データにある準重複文書の除外テクニックについてご紹介します。

1. 開発環境

  • AWS EC2 (インスタンスタイプ: r5.8xlarge)
  • Python 3.10系

2. Pythonパッケージ

3. 広告文の準重複問題

テキスト広告では、キャッチコピーや宣伝文を少しだけ修正して複数回配信し、その効果を測定することがよくあります。また、シリーズ商品の説明文を同じテンプレートに従って大量に作成することも一般的です。
それゆえに、広告文を収集してテキストコーパスを作ると、準重複サンプル、つまり完全には重複しないけれども60%以上、場合によっては90%以上の言葉が一致してしまう例が大量に発生します。これはモデルの汎用性に悪影響を及ぼす可能性があります。

例えば、以下のキャンペーン宣伝文をご覧ください:

【各店先着30名】XXXX 1か月無料!お腹すっきり1か月無料体験チケット!10秒のかんたんアンケート回答だけでもらえます!受付は9月30日までなので、この機会にぜひ!残り枠あと少し!先着30名のためお急ぎ下さい
【各店先着30名】XXXX 5回無料!お腹すっきり5回無料体験チケット!10秒のかんたんアンケート回答だけでもらえます!受付は11月30日までなので、この機会にぜひ!かんたん1分申し込み、先着30名のためお急ぎ下さい

どれも基本同じ内容ですが、ところどころ違う文言になっているのはお分かりいただけると思います。
このように、準重複のパターンは無限に存在するため、全てをルール化するのは至って困難です。

従って、何かしらの類似度を定義し、閾値を設けて排除するしかありません。
しかし、簡単な方法では、大量のサンプルに対してペアごとの類似度を計算する必要があるため、時間複雑度と空間複雑度はどちらもO(N^2)となり、計算コストが非常に高くなります。

これらの問題点を考慮し、今回はハッシュ関数の力を借りてサンプルをグループに分ける、分割統治法的なアプローチを採用します。

4. Locality sensitive hashing(局所性鋭敏型ハッシュ)

ハッシュ関数と言えば、MD5やSHA256などの関数がよく知られています。これらのハッシュ関数は、異なるアイテムが同じハッシュバケットに入る確率(ハッシュ値の衝突)を最小限に抑えるように設計されています。
しかし、類似するデータポイントをなるべく同じバケットに格納することを目指すハッシュ関数も存在します。
「類似性」の定義によって使用される具体的な手法はさまざまですが、その関数が局所性を重視する性質を持っていれば、それは局所性鋭敏ハッシュ関数(LSH)と呼ばれます。
上記に従い、ここではLSHは特定のハッシュ関数名を指すわけではなく、特定の種類の関数を指しています。

LSHによって大量のデータをハッシュ化すれば、各バケットには高い確率で類似したデータが含まれます。
そのため、LSHは以下のような場面で効果的に使用できます:

  • 最近傍検索:特定データの最近傍は、高い確率でそのデータが属しているバケット内に存在する
  • 類似性検出:複数のデータが同じバケット、もしくは似たようなバケットに含まれているかどうかで、類似性は判定できる

全てのサンプルについて一度だけ処理すればハッシュバケットを作成できるため、ハッシュ計算の時間複雑度と空間複雑度はともにO(N)となります。
各ハッシュバケット内での類似度計算は、平均的にO(N^2/K^2)になります。
つまり、今回の準重複サンプル問題も、このLSHを使えば楽に解決することができます。

5. 大規模コーパス用のエンコーディング手法

文書のハッシュを計算する上でまず必要なのは、文書をベクトル形式に変換することです。
文書をバイト配列として扱い、その上でハッシュ関数を設計することも可能ではありますが、あらかじめ固定長のベクトルにしておくことで計算が楽になります。

BERTなどのTransformerモデルを使えば高品質なベクトル表現は得られますが、千万・億単位のサンプルを全部処理するには大量なGPU計算資源が必要になります。
よって、大規模コーパスを効率的にエンコーディングするためのおすすめの手順は以下の通りです:

  1. トークン化(Tokenization)
  2. TF-IDF(Term Frequency - Inverse Document Frequency)
  3. 切り捨てSVD(Truncated-SVD)

この3ステップのプロセスは、まず文書をトークン(単語)に分割し、そのトークンの頻度情報を使用してベクトルのドラフトを作成し、次にそのベクトルを低次元に変換するというものです。

TF-IDFは、文書中の単語の出現頻度で単語の重要度を評価する統計的手法で、情報検索やテキストマイニングで広く使用されています。
しかし、TF-IDFには以下のような欠点もあります:

  • 「単語」を事前に定義する必要がある
  • 結果として得られるベクトルの次元数が非常に大きくなる

特に日本語では、英語のようなスペース区切りがありません。したがって、「単語」の定義を事前に設定する必要があります。これをサポートしているプロセスがトークン化です。
また、高次元のベクトルのままだと「類似度」の計算が難しくなる恐れがあり、LSHの精度にも影響を及ぼすため、切り捨てSVDを使用して次元数を圧縮します。

5.1 Text cleaning

生の文書をクリーニングし、listに格納します。

import re
import pandas as pd
from unicodedata import normalize, category


def clean(text:str):
    if pd.isna(text):
        return ""
    text = normalize("NFKC", text).replace("\n", "")
    while re.search(r"<[^<>]*\/*[ \n]*>", text):
        text = re.sub(r"<[^<>]*\/*[ \n]*>", "", text)
    while re.search(r"([\u3000-\u30ff]|[\u4e00-\u9faf]|[\uff00-\uffef]) +(.)", text):
        text = re.sub(r"([\u3000-\u30ff]|[\u4e00-\u9faf]|[\uff00-\uffef]) +(.)", "\\1\\2", text)
    while re.search(r"(.) +([\u3000-\u30ff]|[\u4e00-\u9faf]|[\uff00-\uffef])", text):
        text = re.sub(r"(.) +([\u3000-\u30ff]|[\u4e00-\u9faf]|[\uff00-\uffef])", "\\1\\2", text)
    text = re.sub(r"(・{3,}|\.{3,}|。{3,})", "…", text)
    return text
    
    
corpus = [clean(t) for t in df.text]

5.2 Tokenization

multiprocessingモジュールでtransformersライブラリの日本語Tokenizerを並列化します。

from glob import glob
from transformers import BertJapaneseTokenizer
from multiprocessing import cpu_count, Process
from itertools import chain


def get_func(index):
    def func(corpus_slice, tokenizer):
        import pickle
        ret = []
        for text in corpus_slice:
            ret.append(tokenizer.tokenize(text)) # as tokens
        with open(f"data/interim/tokenized_corpus_part{index}.pkl", "wb") as f:
            pickle.dump(ret, f, protocol=5)
    return func
        
def tokenize_parallel(corpus):
    # Example-based.
    # Some of the processes will finish much faster than those that received larger average text size.
    num_processes = cpu_count() // 2
    batch_size = len(corpus) // num_processes
    slices = []
    for i in range(num_processes):
        if i < num_processes - 1:
            slices.append(slice(i * batch_size, (i+1) * batch_size))
        else:
            slices.append(slice(i * batch_size, None))
    # 1 tokenizer instance for each process to prevent race conditions
    tokenizers = {i:BertJapaneseTokenizer.from_pretrained(model_name_or_path) for i in range(num_processes)}
    pool = [Process(target=get_func(i), args=(corpus[slices[i]], tokenizers[i], )) for i in range(num_processes)]
    [p.start() for p in pool]
    [p.join() for p in pool]
    
    
tokenize_parallel(corpus)

# preserve order
tokenized_files = sorted(glob("data/interim/tokenized_*"), key=lambda x: int(re.search('part(\d+)\.pkl', x).group(1)))
tokenized_corpus = []
for file in tokenized_files:
    with open(file, "rb") as f:
        tokenized_corpus.append(pickle.load(f))
tokenized_corpus = list(chain.from_iterable(tokenized_corpus))

冒頭の例文のTokenization結果はこちらです:

['【', '各', '##店', '先着', '30', '名', '】', 'XX', '##X', '##X', '1', 'か月', '無料', '!', 'お', '##腹', 'す', '##っきり', '1', 'か月', '無料', '体験', 'チケット', '!', '10', '秒', 'の', 'かん', 'た', 'ん', 'アンケート', '回答', 'だけ', 'で', 'もらえ', 'ます', '!', '受付', 'は', '9', '月', '30', '日', 'まで', 'な', 'ので', '、', 'この', '機会', 'に', 'ぜ', '##ひ', '!', '残り', '枠', 'あと', '少し', '!', '先着', '30', '名', 'の', 'ため', 'お', '急', '##ぎ', '下さい']
['【', '各', '##店', '先着', '30', '名', '】', 'XX', '##X', '##X', '5', '回', '無料', '!', 'お', '##腹', 'す', '##っきり', '5', '回', '無料', '体験', 'チケット', '!', '10', '秒', 'の', 'かん', 'た', 'ん', 'アンケート', '回答', 'だけ', 'で', 'もらえ', 'ます', '!', '受付', 'は', '11', '月', '30', '日', 'まで', 'な', 'ので', '、', 'この', '機会', 'に', 'ぜ', '##ひ', '!', 'かん', 'た', 'ん', '1', '分', '申し込み', '、', '先着', '30', '名', 'の', 'ため', 'お', '急', '##ぎ', '下さい']

5.3 TF-IDF & Truncated-SVD

前節で使われたTokenizerは最大32000通りの「単語」を定義しています。
全ての単語が学習サンプルに出現することはないので、結果的に、ここでのtfidfベクトルは29900次元となりました。

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD


vectorizer = TfidfVectorizer(tokenizer=lambda x:x, use_idf=True, lowercase=False) # pre-tokenized, use identity function as tokenizer
tfidf = vectorizer.fit_transform(tokenized_corpus)
# PCA requires centering data before processing, and a dense input matrix.
# Our tfidf is a sparse, frequency based matrix that better suits TruncatedSVD
svd = TruncatedSVD(n_components=128, algorithm="arpack")
svd_array = svd.fit_transform(tfidf)

冒頭の「各店先着30名」の宣伝文のtfidfベクトルは下記になります:

matrix([[0.02397062, 0.        , 0.        , ..., 0.        , 0.        , 0.        ]])
matrix([[0.02457615, 0.        , 0.        , ..., 0.        , 0.        , 0.        ]])

このままでcosine類似度を計算してみると:

cosine_similarity(np.asarray(tfidf[12348].todense()), 
                  np.asarray(tfidf[12350].todense()))
>>> array([[0.94960952]])

高次元ベクトルのままでも似たもの同士のcosine類似度は高いです。
そしてSVDベクトルは:

array([[ 2.79604404e-01,  7.58619315e-02, -1.92817591e-01,
        -3.52833199e-01,  5.38037408e-01,  1.15110795e-01,
         9.77922234e-02,  4.13360342e-01,  4.46535270e-01,
         1.33619913e-01,  5.45200885e-02, -3.13688481e-02,
         ...
         8.18030884e-03,  1.41359004e-02,  1.39400246e-02,
        -8.32018568e-03,  6.99780418e-03]])
array([[ 2.38633145e-01,  5.99854623e-02, -1.85314429e-01,
        -3.43566073e-01,  5.65663506e-01,  1.30826111e-01,
         1.04960216e-01,  3.88130688e-01,  4.40069084e-01,
         1.44562342e-01,  8.95789818e-02, -1.68612216e-02,
         ...
        -3.64673899e-03, -9.81768509e-04,  9.47556247e-03,
         1.95639331e-02,  1.77177246e-03]])

cosine類似度を計算してみると:

cosine_similarity(svd_array[12348][None,:],
                  svd_array[12350][None,:])
>>> array([[0.98271723]])

低次元ベクトルに類似度の情報がちゃんと埋め込まれているのが分かりました。

6. Cosine類似度に対応したLSH関数で準重複を排除

Cosine類似度はベクトル間の角度を基に類似度を計算するため、角度情報を用いる局所性鋭敏ハッシュ(LSH)関数を使用する必要があります。この場合、「Random Binary Projections」が一つの選択肢となります。
この手法では、データ空間にランダムに平面(ハイパープレーン)を導入します。次に、各ベクトルがこの平面の「上側」に位置するか「下側」に位置するかを判定します。位置情報はバイナリ値(0または1)として表されます。
このプロセスを複数の平面に対して繰り返すことで、それぞれのベクトルに対するバイナリ値の系列(ハッシュ値)が得られます。これにより、高次元ベクトル間の類似度をバイナリハッシュ値で評価することが可能になります。

注:最後のバイナリ化処理を除外すればこれは通常の「Random Projections」という次元数削減手法になります。

import numpy as np


def LSH_np(vecs: np.array, n_bits=4, return_integer=True):
    """
    Locality sensitive hashing implemented as random binary projections.
    :param vecs: input row vectors of shape [n, d]
    :param n_bits: output number of binary bits, there are effectively 2 ** n_bits buckets
    :return: integer array of shape [n, n_bits]
    """
    vecs = vecs - vecs.mean()  # center vectors
    normals_shape = (
        1,
        vecs.shape[-1],
        n_bits)
    normal_vectors = np.random.randn(*normals_shape)  # randn is rotationally invariant
    inner_products = np.einsum('nd,ndr->nr', vecs, normal_vectors)  # projection, auto broadcast on n, r=n_bits
    buckets = (inner_products > 0).astype(int)  # binary orientation
    if return_integer:
        buckets = buckets.dot(1 << np.arange(buckets.shape[-1]))  # endianess does not matter
    return buckets
    
    
hashes = LSH_np(svd_array, 15) # 32768 bins

上記広告文のハッシュ値を確認してみると、ちゃんと同じバケットに入っているのが分かりました。

print(hashes[12348], hashes[12350])
>>> 468 468

このハッシュ情報を使えば、バケットごとにCosine類似度による準重複サンプルの排除が可能になります。
しかし、注意点が一つあります。LSHは、データ空間を概ね等分割していますが、データポイントの密度が非常に高い(特定データポイントに類似するものが極端に多い)場合、それら全てが一つのバケットに入ってしまう確率も高くなります。
したがって、バケット内の類似度計算は平均的にO(N^2/K^2)の複雑度となりますが、最悪の場合ではO(N^2)のままになる可能性があることを認識しておいてください。

from random import shuffle
from tqdm import tqdm


hash_table = pd.DataFrame({"hash":hashes}) # this has index 0,1,2,...
hash_groups = hash_table.groupby("hash")
hash_indices = hash_groups.indices
group_sizes = hash_groups.size()

threshold = 0.8
size_upperbound = group_sizes.quantile(0.999)
# ignore groups with top 0.1% size because it may be too large to compute a cosine similarity matrix

failed_indices = []
keep_indices = []

for h in tqdm(hash_indices.keys()):
    try:
        if len(hash_indices[h]) > size_upperbound:
            failed_indices.append(hash_indices[h])
            continue
        sim = cosine_similarity(svd_array[hash_indices[h]])
        upper = np.triu(sim) - np.identity(sim.shape[0])
        upper = upper > threshold
        mask = upper.any(axis=1)
        keep_indices.append(hash_indices[h][~mask])
    except MemoryError: # just a failsafe
        gc.collect()
        failed_indices.append(hash_indices[h])
        continue
	
	
out = []
for idx in tqdm(keep_idx_list):
    out.append(corpus[idx])
pd.DataFrame({"text":out}).to_parquet("data/processed/train.parquet")

7. 終わりに

私が実務で扱っていたデータセットは、1300万ドキュメントに及びます。
そこまでのデータ量でCosine類似度の計算や、最近傍検索(KNN)などを行うのは非常に困難で、TBオーダーのメモリが必要になります。
しかし、局所性鋭敏ハッシュ(LSH)関数を導入することで、同じハッシュバケット内でCosine類似度を計算する際のメモリ消費量を大幅に抑えることができます。実際、メモリ使用量はおよそ50~100GBと、TF-IDFのメモリ消費量以下にまで削減されます。
LSHは確率的な手法で、完全な精度を求める場合には適していませんが、学習データのクリーニングなどでは非常にうまく機能します。
実際、Metaが公開しているオープンソースライブラリFaissや他の多くの人気のあるテキスト検索ライブラリでは、インデックス作成時にLSHを活用しています。ご興味がある方は、ぜひ調査してみてください。

D2C m-tech

Discussion