【週末研究】05. 学習済word2vecモデルの構築 - word2vec の追加学習と記事のクロール
学習済word2vecモデルの構築
今回は、前回 使った word2vec モデルを作る方法を解説していきます。
ざっくり、以下のような構成でバッチを構成しています。
コードは、こちら に配置しています。
- UPDATE: 記事のクロールとクロールした記事での追加学習について更新しました!
Word2vec の Wikipedia train dataset のミニバッチ学習
まずは、word2vec モデルを、 Wikipedia train dataset を使ってミニバッチ学習する方法を紹介します。
Wikipedia dataset のデータベース化
ミニバッチ学習するためのデータを整備していきましょう。
実施することは、TensorFlow Datasets として利用可能な wiki40b/ja
をパースして、sqlite3 に保存しておきます。
github のコード では、
cd backend && make wikidata
で動作確認ができます。
では、実際に、どんなコードで実現しているかを見ていきましょう。
尚、コードのすべてを確認されたい方は、github のコード を参照ください。
class WikiDbMaker(object):
def __init__(self, mode: str) -> None:
self.mode = mode # train|valid|test
self.n_intervals = 10000
self.db = WikiDb(mode)
self._init_db()
def _init_db(self) -> WikiDbMaker:
self.db.connect()
self.db.create_tables()
return self
def execute(self, ds):
try:
self._populate(ds)
except Exception as e:
print(e, tb.format_exc())
finally:
self.db.close()
return self
def _populate(self, ds):
records = []
for rec in ds.as_numpy_iterator():
lines = rec["text"].decode().split("\n")
parsed = self.parse(lines)
doc_id = build_ulid(prefix="Doc")
recs = [
WikiRecord(document_id=doc_id).from_dict(prg).to_record()
for prg in parsed
]
records.extend(recs)
if len(records) >= self.n_intervals:
self.db.insert_many(records)
records = []
if len(records) > 0:
self.db.insert_many(records)
return self
def parse(self, lines: list[str]) -> WikiDbMaker:
"""parse to each paragraph"""
parsed = []
status = "none"
record: dict[str, str] = dict(
article="_NO_ARTICLE_",
section="_NO_SECTION_",
paragraph="_NO_PARAGRAPH_",
)
for line in lines:
if line.strip() == "":
continue
if line == "_START_ARTICLE_":
status = "article"
continue
if line == "_START_SECTION_":
# end of paragraph
if status == "paragraph":
parsed.append(record)
# initialize record
assert record["article"]
record = dict(
article=record["article"],
section="_NO_SECTION_",
paragraph="_NO_PARAGRAPH_",
)
# update status
status = "section"
continue
if line == "_START_PARAGRAPH_":
status = "paragraph"
continue
assert status != "none"
if status == "paragraph":
line = line.replace("_NEWLINE_", "\n")
record[status] = line
assert status == "paragraph"
parsed.append(record)
return parsed
これは、TensorFlow Datasets をパラグラフ単位にパースして、データベースに保存するクラスです。
メインは、parse
メソッドです。
_populate
メソッドでは、データセットの文書単位で処理し、行に分割して parse
し、結果をDBにINSERTします。
parse
では、"START_ARTICLE", "START_SECTION", "START_PARAGRAPH" を参考に、処理対象の行がどこの部分かを把握しながら、データレコード化していきます。
このクラスのメソッドを実行することで、TensorFlow Datasets の wiki40b/ja
をデータベースに保存できます。
Wikipedia データのミニバッチ学習
次は、Wikipedia データの学習について見ていきましょう。
github のコード では、
cd backend && make wordvector
で動作確認ができます。
さて、実際にどうやってミニバッチをしているかを見ていきます。
(各処理の実行時間を計測するために、ログ出力を混ぜています。)
def main(
n_limit: int = -1,
mode: str = "train",
batch_size: int = 10000,
pipe_file: str = "data/pipe_wikivec.gz",
):
log_info("Start", "train_wordvector")
# pickup wiki data
wdb = WikiDb(mode=mode)
log_info("Start", "Select WikiDb")
records = wdb.select(n_limit=n_limit)
log_info("End", "Select WikiDb")
log_info("Start", "Make WikiRecord")
X: TextSequences = [WikiRecord(*rec).paragraph.splitlines() for rec in records]
log_info("End", "Make WikiRecord")
log_info("Start", "Create Pipeline")
pipe_vectorizer = Pipeline(
steps=[
(
TokenizerWord(
use_stoppoes=False, filterpos=["名詞", "動詞"], use_orgform=True
),
None,
),
(VectorizerWord2vec(min_count=1), None),
]
)
log_info("End", "Create Pipeline")
log_info("Start", "Fit Wiki data")
n = len(X)
for bch_idx, offset in enumerate(tqdm(range(0, n, batch_size))):
log_info("Processing ...", f"{bch_idx=}")
bch = X[offset : offset + batch_size]
pipe_vectorizer.fit(bch)
log_info("End", "Fit Wiki data")
log_info("Start", "Dump Pipeline for Wiki vectorizer")
joblib.dump(pipe_vectorizer, pipe_file, compress=("gzip", 3))
log_info("End", "Dump Pipeline for Wiki vectorizer")
log_info("End", "train_wordvector")
ざっくりと説明しますと、DBに保存していたWikipedia データのパラグラフを行分割して、TextSequence のリストとしてデータ化します。
次に、ベクトル化するためのパイプラインを構築します。特に、「名詞」、「動詞」(原形に正規化)だけに絞ってトークナイズするようにしています。
そして、バッチサイズ単位でループし、追加学習を繰り返します。
ここで、追加学習(word2vecの追加学習)をどうやってやっているのかを見ておきましょう。
class VectorizerWord2vec(Vectorizer):
def __init__(
self,
vector_size=128,
sg=1,
max_vocab_size=1000 * 1000,
min_count=10,
window=7,
epochs=5,
) -> None:
super().__init__() # must be called at first
self.params = dict(
vector_size=vector_size,
sg=sg,
max_vocab_size=max_vocab_size,
min_count=min_count,
window=window,
epochs=epochs,
)
self.__dict__.update(self.params)
self.model = None
def clear_model(self) -> Self:
# NOTE: if you would like to train as the first take
self.model = None
return self
def fit(self, X: TextSequences, **kwargs) -> Tensor:
if self.model is None:
# the first training
print("[DEBUG] train firstly")
self.model = word2vec.Word2Vec(X, **self.params)
else:
# the updating training
print("[DEBUG] train updately")
self.model.build_vocab(X, update=True)
self.model.train(
X, total_examples=self.model.corpus_count, epochs=self.model.epochs
)
return self
def transform(self, X: TextSequences, **kwargs) -> Tensor:
ws = self.window
y = []
for s in X:
for idw, w in enumerate(s):
v = None
if w in self.model.wv:
v = self.model.wv[w]
else:
# 辞書にないトークンは、モデルのwindowサイズを前後の文脈語彙として類似ベクトルを推定
context = s[max(idw - ws, 0) : idw + ws]
tokens = [tkn for tkn in context if tkn in self.model.wv]
try:
v = self.model.wv.most_similar(tokens)
except Exception as e:
# most_similar を取得できなかったら、完全な unknown としてゼロベクトルをセット
# # このケースは、対象の文(s) のすべてのトークンが min_count 未満の時に発生する
print(
f"[WARNING] {e.args[0]} : couldn't get most_similar({tokens=}) / {context=}"
)
v = numpy.zeros(self.vector_size)
assert v is not None
y.append(v)
return y
def __getstate__(self):
state = super().__getstate__()
state.update(dict(model=self.model))
return state
def __setstate__(self, state):
model = state.pop("model", None)
super().__setstate__(state)
self.model = model
今回のミニバッチ学習のポイントは、fit
メソッドです。
def fit(self, X: TextSequences, **kwargs) -> Tensor:
if self.model is None:
# the first training
print("[DEBUG] train firstly")
self.model = word2vec.Word2Vec(X, **self.params)
else:
# the updating training
print("[DEBUG] train updately")
self.model.build_vocab(X, update=True)
self.model.train(
X, total_examples=self.model.corpus_count, epochs=self.model.epochs
)
return self
model
は、word2vec.Word2Vec
クラスのインスタンスであることに注意しておきましょう。
else
分岐の build_vocab()
メソッドに、update=True
を指定して、語彙集合を再構築しておくことが追加学習のポイントです。
そのあとに、train
メソッドをコールすることで、これまで未知語だった語彙に対してもベクトル化ができるように学習する、という構成になっています。
以上の仕組みにより、Wikipedia データセットを使って、学習済の word2vec モデルを構築(保存)することができます。
Scrapy によるニュース記事のクロール
次に、Wikipedia のように時流に依存しないデータの学習に加えて、時流に合わせたトレンド情報も学習させるようにしましょう。
ここでは、scrapy を使ってニュース記事をクロールする方法を紹介します。
github のコード では、
cd backend && make news
で動作確認ができます。
さて、クロールするのに必要なコードを見ていきましょう。
まずは、パイプラインクラス (NewsPipeline
) を定義します。
このパイプラインでは、sqlite3 データベースに記録するコードになっています。
今回の記事では、使ってないです(⌒-⌒; )
class NewsPipeline:
def process_item(self, item: NewsItem, spider) -> NewsItem:
self._init_db()
try:
self._store_item(item)
except sqlite3.IntegrityError as e:
# possibly for unique constraint
item["html"] = "(omitted)" # for more simplification of log output
raise DropItem(f"{e}: skipped [{item['url']}]")
finally:
self._term_db()
return item
def _init_db(self) -> Self:
self.cnn = sqlite3.connect("data/news.db")
# テーブル作成
self.csr = self.cnn.cursor()
self.csr.execute(
"""CREATE TABLE IF NOT EXISTS news_data(
url TEXT PRIMARY KEY,
doc_id TEXT UNIQUE NOT NULL,
html TEXT NOT NULL,
created_at DATE NOT NULL,
updated_at DATE
);
"""
)
return self
def _term_db(self) -> Self:
if self.csr is not None:
self.csr.close()
self.csr = None
if self.cnn is not None:
self.cnn.close()
self.cnn = None
return self
def _store_item(self, item: NewsItem):
doc_id = build_ulid(prefix="Nws")
values = [item["url"], doc_id, item["html"]]
self.csr.execute(
"""INSERT INTO news_data (
url, doc_id, html, created_at
) VALUES (?, ?, ?, datetime('now', 'localtime'))
""",
values,
)
self.cnn.commit()
return
次に、ニュースレコードとして保持するクラスを定義します。
url, html だけを保持するシンプルな Item クラスです。
class NewsItem(scrapy.Item):
url = scrapy.Field()
html = scrapy.Field()
そして、クロールの仕方(何を抽出して、どうリンクを辿るか)を、Spider として定義します。
class NewsSpider(scrapy.Spider):
name = "news"
start_urls = [
# 一般
# "https://news.yahoo.co.jp/",
"https://www.nikkei.com/",
# "https://www3.nhk.or.jp/news/",
# "https://www.jiji.com/",
# "https://diamond-rm.net/",
# IT
"https://xtech.nikkei.com/top/it/",
"https://www.itmedia.co.jp/news/",
"https://japan.zdnet.com/",
# AI
"https://ledge.ai/theme/news/",
"https://ainow.ai/",
"https://news.mynavi.jp/techplus/tag/artificial_intelligence/",
"https://ja.stateofaiguides.com/",
"https://www.itmedia.co.jp/news/subtop/aiplus/",
]
allowed_domains = [urlparse(u).netloc for u in start_urls]
def parse(self, response):
item = NewsItem()
item["url"] = response.url
item["html"] = response.body.decode(response.encoding)
yield item
for ank in response.css("a::attr(href)"):
url: str = ank.get().strip()
if not url:
continue
if url[0] in ["/"]:
url = urljoin(response.url, url)
scheme = "http"
if url[: len(scheme)] != scheme:
g_logger.warning(
f"Found non {scheme} scheme: skipped [{url=}][parent.url={response.url}]"
)
continue
yield scrapy.Request(url, callback=self.parse)
return
このSpiderでは、parse
メソッドで、リンク(a タグの href 属性)を抽出して、相対パスを絶対パスに変換しておき、http から始まるurl を再起的にアクセスしていきます。
response の url, html をセットしてNewsItem を生成していきます。
最後に、settings.py
です。見やすくなるようにコメントを除いています。
BOT_NAME = "newspy"
SPIDER_MODULES = ["newspy.spiders"]
NEWSPIDER_MODULE = "newspy.spiders"
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 3
ITEM_PIPELINES = {
'newspy.pipelines.NewsPipeline': 300,
}
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0
HTTPCACHE_DIR = "httpcache"
HTTPCACHE_IGNORE_HTTP_CODES = []
HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
DEPTH_LIMIT = 2
特に、最後の DEPTH_LIMIT
を 3 にすると、1週間かかっても終わらなかったりしたので、注意が必要ですね。
以上で、scrapy に必要なコード、設定が完了です。
以下のような感じで、クロールできるようになります。
scrapy crawl news --loglevel=INFO -O data/news.json
ニュース記事の追加学習
最後に、クロールしたニュース記事を追加学習させましょう。
github のコード では、
cd backend && make tuned-wordvector
で動作確認ができます。
追加学習は、先ほどのミニバッチ学習とほぼ同じですので、復習感覚で見ていきましょう。
def _filter_text(body) -> str:
soup: BeautifulSoup = BeautifulSoup(body, "lxml-xml")
for tg in ["script", "noscript", "meta"]:
try:
soup.find(tg).replace_with(" ")
except Exception:
# NOTE: Not Found `tg` tag
pass
return soup.get_text("\n\n")
def clean_text(text: str):
contents = []
for txt in re.split(r"(。|\n)", text):
txt = txt.strip().replace("\u200b", "").replace("\u3000", " ")
txt = re.sub(r"\n+", "\n", txt)
txt = re.sub(r"([\W])\1+", " ", txt)
if not txt:
continue
# contents.append(txt)
# contents.extend(txt.split("\n"))
contents.append(txt.split("\n")[-1])
return contents
def make_sentences(json_file: str):
with open(json_file, "r") as f:
news = json.load(f)
cleaned_texts = []
for itm in news:
html = itm["html"]
text = _filter_text(html)
texts = clean_text(text)
cleaned_texts.append(texts)
tokenizer = TokenizerWord(use_stoppoes=False, filterpos=[], use_orgform=False)
tidy_sentences = []
for idx, texts in enumerate(cleaned_texts):
for text in texts:
tokenized = tokenizer([[text]])
if not tokenized:
continue
snt = tokenized[0]
if len(snt) < 3:
continue
# print(idx, snt) # for debugging
tidy_sentences.append("".join(snt))
return tidy_sentences
def main(
batch_size: int = 10000,
base_pipe_file: str = "data/pipe_wikivec.gz",
news_json_file: str = "app/scrapy/data/news.json",
):
log_info("Start", "retrain_wordvector")
log_info("Start", "loading trained vectors")
pipe_wikivec = joblib.load(base_pipe_file)
log_info("End", "loading trained vectors")
log_info("Start", "make sentences")
tidy_sentences = make_sentences(news_json_file)
log_info("End", "make sentences")
log_info("Start", "tune model")
for idx, offset in enumerate(tqdm(range(0, len(tidy_sentences), batch_size))):
X = tidy_sentences[offset : offset + batch_size]
print(offset, offset + batch_size)
pipe_wikivec.fit([X])
log_info("End", "tune model")
log_info("Start", "dumping retrained model")
joblib.dump(pipe_wikivec, "data/pipe_wikivec.tuned.gz", compress=("gzip", 3))
log_info("End", "dumping retrained model")
log_info("End", "retrain_wordvector")
make_sentences
では、クロールした html を簡易的にクレンジングして、概ね文単位になるように文書(文リスト)を作成します。
main
では、make_sentences
で作成した文リストを使ってミニバッチ学習をしていきます。
この時に、すでに学習済のパイプライン pipe_wikivec
に対してミニバッチ単位で追加学習します。
まとめ
以上により、Wikipedia をベースとして、時事情報・トレンド情報を反映した簡易的なベクトル化モデルを構築することができます。
ここで、昨今のLLMs による embeddings と比較すると、word2vec によるベクトル化は、あくまで表層形によるベクトル化であり、
文脈を踏まえたベクトル化ができないという点には注意が必要でしょう。いわゆる、"bank" が、銀行なのか土手なのか問題ですね。
一般的に、企業内での自然言語処理では、業界固有、企業固有の言い回しや方言があることは、一般的でしょう。
あくまで汎用的なベクトル化モデルを構築するという観点では、このような文脈によって意味が異なる単語は、通常の文脈における単語の意味が異なるため、異なるベクトル値を持つように変換すべきでしょう。
word2vec は仕組み上、文脈に応じたベクトル化はできないため、汎用性が高い(マルチドメイン、マルチタスクを狙う) ベクトル化モデルを構築するには、BERT や LLMs で利用されるベクトル化手法を使う方が有望そうです。
Discussion