💬

【週末研究】05. 学習済word2vecモデルの構築 - word2vec の追加学習と記事のクロール

2023/02/13に公開

学習済word2vecモデルの構築

今回は、前回 使った word2vec モデルを作る方法を解説していきます。

ざっくり、以下のような構成でバッチを構成しています。

コードは、こちら に配置しています。

  • UPDATE: 記事のクロールとクロールした記事での追加学習について更新しました!

Word2vec の Wikipedia train dataset のミニバッチ学習

まずは、word2vec モデルを、 Wikipedia train dataset を使ってミニバッチ学習する方法を紹介します。

Wikipedia dataset のデータベース化

ミニバッチ学習するためのデータを整備していきましょう。

実施することは、TensorFlow Datasets として利用可能な wiki40b/ja をパースして、sqlite3 に保存しておきます。

github のコード では、

cd backend && make wikidata

で動作確認ができます。

では、実際に、どんなコードで実現しているかを見ていきましょう。
尚、コードのすべてを確認されたい方は、github のコード を参照ください。

backend/app/auto_topic/executable/make_wikidata.py WikiDbMaker
class WikiDbMaker(object):
    def __init__(self, mode: str) -> None:
        self.mode = mode  # train|valid|test

        self.n_intervals = 10000
        self.db = WikiDb(mode)
        self._init_db()

    def _init_db(self) -> WikiDbMaker:
        self.db.connect()
        self.db.create_tables()
        return self

    def execute(self, ds):
        try:
            self._populate(ds)
        except Exception as e:
            print(e, tb.format_exc())
        finally:
            self.db.close()
        return self

    def _populate(self, ds):
        records = []
        for rec in ds.as_numpy_iterator():
            lines = rec["text"].decode().split("\n")
            parsed = self.parse(lines)

            doc_id = build_ulid(prefix="Doc")
            recs = [
                WikiRecord(document_id=doc_id).from_dict(prg).to_record()
                for prg in parsed
            ]
            records.extend(recs)
            if len(records) >= self.n_intervals:
                self.db.insert_many(records)
                records = []
        if len(records) > 0:
            self.db.insert_many(records)
        return self

    def parse(self, lines: list[str]) -> WikiDbMaker:
        """parse to each paragraph"""
        parsed = []
        status = "none"
        record: dict[str, str] = dict(
            article="_NO_ARTICLE_",
            section="_NO_SECTION_",
            paragraph="_NO_PARAGRAPH_",
        )

        for line in lines:
            if line.strip() == "":
                continue
            if line == "_START_ARTICLE_":
                status = "article"
                continue
            if line == "_START_SECTION_":
                # end of paragraph
                if status == "paragraph":
                    parsed.append(record)

                # initialize record
                assert record["article"]
                record = dict(
                    article=record["article"],
                    section="_NO_SECTION_",
                    paragraph="_NO_PARAGRAPH_",
                )

                # update status
                status = "section"
                continue
            if line == "_START_PARAGRAPH_":
                status = "paragraph"
                continue

            assert status != "none"
            if status == "paragraph":
                line = line.replace("_NEWLINE_", "\n")
            record[status] = line

        assert status == "paragraph"
        parsed.append(record)
        return parsed

これは、TensorFlow Datasets をパラグラフ単位にパースして、データベースに保存するクラスです。
メインは、parse メソッドです。
_populate メソッドでは、データセットの文書単位で処理し、行に分割して parse し、結果をDBにINSERTします。
parse では、"START_ARTICLE", "START_SECTION", "START_PARAGRAPH" を参考に、処理対象の行がどこの部分かを把握しながら、データレコード化していきます。

このクラスのメソッドを実行することで、TensorFlow Datasets の wiki40b/ja をデータベースに保存できます。

Wikipedia データのミニバッチ学習

次は、Wikipedia データの学習について見ていきましょう。

github のコード では、

cd backend && make wordvector

で動作確認ができます。

さて、実際にどうやってミニバッチをしているかを見ていきます。
(各処理の実行時間を計測するために、ログ出力を混ぜています。)

backend/app/auto_topic/executable/train_wordvector.py main
def main(
    n_limit: int = -1,
    mode: str = "train",
    batch_size: int = 10000,
    pipe_file: str = "data/pipe_wikivec.gz",
):
    log_info("Start", "train_wordvector")

    # pickup wiki data
    wdb = WikiDb(mode=mode)
    log_info("Start", "Select WikiDb")
    records = wdb.select(n_limit=n_limit)
    log_info("End", "Select WikiDb")

    log_info("Start", "Make WikiRecord")
    X: TextSequences = [WikiRecord(*rec).paragraph.splitlines() for rec in records]
    log_info("End", "Make WikiRecord")

    log_info("Start", "Create Pipeline")
    pipe_vectorizer = Pipeline(
        steps=[
            (
                TokenizerWord(
                    use_stoppoes=False, filterpos=["名詞", "動詞"], use_orgform=True
                ),
                None,
            ),
            (VectorizerWord2vec(min_count=1), None),
        ]
    )
    log_info("End", "Create Pipeline")

    log_info("Start", "Fit Wiki data")
    n = len(X)
    for bch_idx, offset in enumerate(tqdm(range(0, n, batch_size))):
        log_info("Processing ...", f"{bch_idx=}")
        bch = X[offset : offset + batch_size]
        pipe_vectorizer.fit(bch)
    log_info("End", "Fit Wiki data")

    log_info("Start", "Dump Pipeline for Wiki vectorizer")
    joblib.dump(pipe_vectorizer, pipe_file, compress=("gzip", 3))
    log_info("End", "Dump Pipeline for Wiki vectorizer")

    log_info("End", "train_wordvector")

ざっくりと説明しますと、DBに保存していたWikipedia データのパラグラフを行分割して、TextSequence のリストとしてデータ化します。
次に、ベクトル化するためのパイプラインを構築します。特に、「名詞」、「動詞」(原形に正規化)だけに絞ってトークナイズするようにしています。
そして、バッチサイズ単位でループし、追加学習を繰り返します。

ここで、追加学習(word2vecの追加学習)をどうやってやっているのかを見ておきましょう。

backend/app/auto_topic/component/models/vectorizer.py VectorizerWord2vec
class VectorizerWord2vec(Vectorizer):
    def __init__(
        self,
        vector_size=128,
        sg=1,
        max_vocab_size=1000 * 1000,
        min_count=10,
        window=7,
        epochs=5,
    ) -> None:
        super().__init__()  # must be called at first

        self.params = dict(
            vector_size=vector_size,
            sg=sg,
            max_vocab_size=max_vocab_size,
            min_count=min_count,
            window=window,
            epochs=epochs,
        )

        self.__dict__.update(self.params)
        self.model = None

    def clear_model(self) -> Self:
        # NOTE: if you would like to train as the first take
        self.model = None
        return self

    def fit(self, X: TextSequences, **kwargs) -> Tensor:
        if self.model is None:
            # the first training
            print("[DEBUG] train firstly")
            self.model = word2vec.Word2Vec(X, **self.params)
        else:
            # the updating training
            print("[DEBUG] train updately")
            self.model.build_vocab(X, update=True)
            self.model.train(
                X, total_examples=self.model.corpus_count, epochs=self.model.epochs
            )
        return self

    def transform(self, X: TextSequences, **kwargs) -> Tensor:
        ws = self.window
        y = []
        for s in X:
            for idw, w in enumerate(s):
                v = None
                if w in self.model.wv:
                    v = self.model.wv[w]
                else:
                    # 辞書にないトークンは、モデルのwindowサイズを前後の文脈語彙として類似ベクトルを推定
                    context = s[max(idw - ws, 0) : idw + ws]
                    tokens = [tkn for tkn in context if tkn in self.model.wv]
                    try:
                        v = self.model.wv.most_similar(tokens)
                    except Exception as e:
                        # most_similar を取得できなかったら、完全な unknown としてゼロベクトルをセット
                        # # このケースは、対象の文(s) のすべてのトークンが min_count 未満の時に発生する
                        print(
                            f"[WARNING] {e.args[0]} : couldn't get most_similar({tokens=}) / {context=}"
                        )
                        v = numpy.zeros(self.vector_size)
                assert v is not None
                y.append(v)

        return y

    def __getstate__(self):
        state = super().__getstate__()
        state.update(dict(model=self.model))
        return state

    def __setstate__(self, state):
        model = state.pop("model", None)
        super().__setstate__(state)
        self.model = model

今回のミニバッチ学習のポイントは、fit メソッドです。

fit()
    def fit(self, X: TextSequences, **kwargs) -> Tensor:
        if self.model is None:
            # the first training
            print("[DEBUG] train firstly")
            self.model = word2vec.Word2Vec(X, **self.params)
        else:
            # the updating training
            print("[DEBUG] train updately")
            self.model.build_vocab(X, update=True)
            self.model.train(
                X, total_examples=self.model.corpus_count, epochs=self.model.epochs
            )
        return self

model は、word2vec.Word2Vec クラスのインスタンスであることに注意しておきましょう。
else 分岐の build_vocab() メソッドに、update=True を指定して、語彙集合を再構築しておくことが追加学習のポイントです。
そのあとに、train メソッドをコールすることで、これまで未知語だった語彙に対してもベクトル化ができるように学習する、という構成になっています。

以上の仕組みにより、Wikipedia データセットを使って、学習済の word2vec モデルを構築(保存)することができます。


Scrapy によるニュース記事のクロール

次に、Wikipedia のように時流に依存しないデータの学習に加えて、時流に合わせたトレンド情報も学習させるようにしましょう。
ここでは、scrapy を使ってニュース記事をクロールする方法を紹介します。

github のコード では、

cd backend && make news

で動作確認ができます。

さて、クロールするのに必要なコードを見ていきましょう。

まずは、パイプラインクラス (NewsPipeline) を定義します。
このパイプラインでは、sqlite3 データベースに記録するコードになっています。
今回の記事では、使ってないです(⌒-⌒; )

backend/app/scrapy/newspy/pipelines.py NewsPipeline
class NewsPipeline:
    def process_item(self, item: NewsItem, spider) -> NewsItem:
        self._init_db()
        try:
            self._store_item(item)
        except sqlite3.IntegrityError as e:
            # possibly for unique constraint
            item["html"] = "(omitted)"  # for more simplification of log output
            raise DropItem(f"{e}: skipped [{item['url']}]")
        finally:
            self._term_db()
        return item

    def _init_db(self) -> Self:
        self.cnn = sqlite3.connect("data/news.db")

        # テーブル作成
        self.csr = self.cnn.cursor()
        self.csr.execute(
            """CREATE TABLE IF NOT EXISTS news_data(
                url TEXT PRIMARY KEY,
                doc_id TEXT UNIQUE NOT NULL,
                html TEXT NOT NULL,
                created_at DATE NOT NULL,
                updated_at DATE
            );
            """
        )

        return self

    def _term_db(self) -> Self:
        if self.csr is not None:
            self.csr.close()
            self.csr = None
        if self.cnn is not None:
            self.cnn.close()
            self.cnn = None
        return self

    def _store_item(self, item: NewsItem):
        doc_id = build_ulid(prefix="Nws")
        values = [item["url"], doc_id, item["html"]]

        self.csr.execute(
            """INSERT INTO news_data (
            url, doc_id, html, created_at
        ) VALUES (?, ?, ?, datetime('now', 'localtime'))
        """,
            values,
        )
        self.cnn.commit()

        return

次に、ニュースレコードとして保持するクラスを定義します。
url, html だけを保持するシンプルな Item クラスです。

backend/app/scrapy/newspy/items.py NewsItem
class NewsItem(scrapy.Item):
    url = scrapy.Field()
    html = scrapy.Field()

そして、クロールの仕方(何を抽出して、どうリンクを辿るか)を、Spider として定義します。

backend/app/scrapy/newspy/spiders/news.py NewsSpider
class NewsSpider(scrapy.Spider):
    name = "news"
    start_urls = [
        # 一般
        # "https://news.yahoo.co.jp/",
        "https://www.nikkei.com/",
        # "https://www3.nhk.or.jp/news/",
        # "https://www.jiji.com/",
        # "https://diamond-rm.net/",
        # IT
        "https://xtech.nikkei.com/top/it/",
        "https://www.itmedia.co.jp/news/",
        "https://japan.zdnet.com/",
        # AI
        "https://ledge.ai/theme/news/",
        "https://ainow.ai/",
        "https://news.mynavi.jp/techplus/tag/artificial_intelligence/",
        "https://ja.stateofaiguides.com/",
        "https://www.itmedia.co.jp/news/subtop/aiplus/",
    ]
    allowed_domains = [urlparse(u).netloc for u in start_urls]

    def parse(self, response):

        item = NewsItem()
        item["url"] = response.url
        item["html"] = response.body.decode(response.encoding)

        yield item

        for ank in response.css("a::attr(href)"):
            url: str = ank.get().strip()
            if not url:
                continue

            if url[0] in ["/"]:
                url = urljoin(response.url, url)

            scheme = "http"
            if url[: len(scheme)] != scheme:
                g_logger.warning(
                    f"Found non {scheme} scheme: skipped [{url=}][parent.url={response.url}]"
                )
                continue

            yield scrapy.Request(url, callback=self.parse)

        return

このSpiderでは、parse メソッドで、リンク(a タグの href 属性)を抽出して、相対パスを絶対パスに変換しておき、http から始まるurl を再起的にアクセスしていきます。
response の url, html をセットしてNewsItem を生成していきます。

最後に、settings.py です。見やすくなるようにコメントを除いています。

backend/app/scrapy/newspy/settings.py
BOT_NAME = "newspy"

SPIDER_MODULES = ["newspy.spiders"]
NEWSPIDER_MODULE = "newspy.spiders"

ROBOTSTXT_OBEY = True

DOWNLOAD_DELAY = 3
ITEM_PIPELINES = {
   'newspy.pipelines.NewsPipeline': 300,
}

HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0
HTTPCACHE_DIR = "httpcache"
HTTPCACHE_IGNORE_HTTP_CODES = []
HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"

REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"

DEPTH_LIMIT = 2

特に、最後の DEPTH_LIMIT を 3 にすると、1週間かかっても終わらなかったりしたので、注意が必要ですね。

以上で、scrapy に必要なコード、設定が完了です。

以下のような感じで、クロールできるようになります。

scrapy crawl news --loglevel=INFO -O data/news.json

ニュース記事の追加学習

最後に、クロールしたニュース記事を追加学習させましょう。

github のコード では、

cd backend && make tuned-wordvector

で動作確認ができます。

追加学習は、先ほどのミニバッチ学習とほぼ同じですので、復習感覚で見ていきましょう。

backend/app/auto_topic/executable/tune_wordvector.py
def _filter_text(body) -> str:
    soup: BeautifulSoup = BeautifulSoup(body, "lxml-xml")
    for tg in ["script", "noscript", "meta"]:
        try:
            soup.find(tg).replace_with(" ")
        except Exception:
            # NOTE: Not Found `tg` tag
            pass
    return soup.get_text("\n\n")


def clean_text(text: str):
    contents = []
    for txt in re.split(r"(。|\n)", text):
        txt = txt.strip().replace("\u200b", "").replace("\u3000", " ")
        txt = re.sub(r"\n+", "\n", txt)
        txt = re.sub(r"([\W])\1+", " ", txt)
        if not txt:
            continue
        # contents.append(txt)
        # contents.extend(txt.split("\n"))
        contents.append(txt.split("\n")[-1])
    return contents


def make_sentences(json_file: str):
    with open(json_file, "r") as f:
        news = json.load(f)

    cleaned_texts = []
    for itm in news:
        html = itm["html"]
        text = _filter_text(html)
        texts = clean_text(text)
        cleaned_texts.append(texts)

    tokenizer = TokenizerWord(use_stoppoes=False, filterpos=[], use_orgform=False)
    tidy_sentences = []
    for idx, texts in enumerate(cleaned_texts):
        for text in texts:
            tokenized = tokenizer([[text]])
            if not tokenized:
                continue
            snt = tokenized[0]
            if len(snt) < 3:
                continue
            # print(idx, snt)       # for debugging
            tidy_sentences.append("".join(snt))
    return tidy_sentences


def main(
    batch_size: int = 10000,
    base_pipe_file: str = "data/pipe_wikivec.gz",
    news_json_file: str = "app/scrapy/data/news.json",
):
    log_info("Start", "retrain_wordvector")
    log_info("Start", "loading trained vectors")
    pipe_wikivec = joblib.load(base_pipe_file)
    log_info("End", "loading trained vectors")

    log_info("Start", "make sentences")
    tidy_sentences = make_sentences(news_json_file)
    log_info("End", "make sentences")

    log_info("Start", "tune model")
    for idx, offset in enumerate(tqdm(range(0, len(tidy_sentences), batch_size))):
        X = tidy_sentences[offset : offset + batch_size]
        print(offset, offset + batch_size)
        pipe_wikivec.fit([X])
    log_info("End", "tune model")

    log_info("Start", "dumping retrained model")
    joblib.dump(pipe_wikivec, "data/pipe_wikivec.tuned.gz", compress=("gzip", 3))
    log_info("End", "dumping retrained model")

    log_info("End", "retrain_wordvector")

make_sentences では、クロールした html を簡易的にクレンジングして、概ね文単位になるように文書(文リスト)を作成します。
main では、make_sentences で作成した文リストを使ってミニバッチ学習をしていきます。
この時に、すでに学習済のパイプライン pipe_wikivec に対してミニバッチ単位で追加学習します。

まとめ

以上により、Wikipedia をベースとして、時事情報・トレンド情報を反映した簡易的なベクトル化モデルを構築することができます。
ここで、昨今のLLMs による embeddings と比較すると、word2vec によるベクトル化は、あくまで表層形によるベクトル化であり、
文脈を踏まえたベクトル化ができないという点には注意が必要でしょう。いわゆる、"bank" が、銀行なのか土手なのか問題ですね。

一般的に、企業内での自然言語処理では、業界固有、企業固有の言い回しや方言があることは、一般的でしょう。
あくまで汎用的なベクトル化モデルを構築するという観点では、このような文脈によって意味が異なる単語は、通常の文脈における単語の意味が異なるため、異なるベクトル値を持つように変換すべきでしょう。
word2vec は仕組み上、文脈に応じたベクトル化はできないため、汎用性が高い(マルチドメイン、マルチタスクを狙う) ベクトル化モデルを構築するには、BERT や LLMs で利用されるベクトル化手法を使う方が有望そうです。


次回は、言語モデルについてなにか書く予定です。
ちなみに、前回はこちら

GitHubで編集を提案

Discussion