😳

べ、べつにAIに褒められたって嬉しくないんだからね///(褒めてくれるLINE Botをつくる)

2023/01/30に公開

いや、嘘です。嬉しいです。
AIであっても褒められたいですよね?
この記事は「日々こんなに辛い思いをしているのになんで誰も褒めてくれなんだ💢」と、褒めが足りなくて困っている方々向けのソリューションです。

完成品はこんな感じです↓

1. 概要

友人に何か作って欲しいものはないかと聞いたら、ただただ無条件に褒めてくれるサービスが欲しいと言っていたので「それだっ!」と思い作ってみることにしました。

他の友人も一人暮らしで寂しいから、「さっさとお風呂入りなよー」とか言ってくれるヴァーチャルオカンが欲しいと言っていたので、寂しさを埋めてくれるようなサービスの需要は結構あるのかもしれません。

まずは無条件に共感し、褒めてくれるLINE Botの作成に挑戦します。

2. AIはどうしたら褒めてくれるか

モデルを考える

まずはどんなモデルをつくればAIが褒めてくれるようになるかを考えます。いろいろ調べた結果次の2つを参考にさせていただきました。

https://qiita.com/sonoisa/items/a9af64ff641f0bbfed44

https://qiita.com/iNakamura/items/683091fb65396570a63d

テキスト入力に対してテキストを出力するThe Text-To-Text Transfer Transformerを使えば、人間からのメッセージ対してAIが共感や褒めている返事を返すということが実現できそうです。

Transformerは英語入力に対して、日本語を返すを学習させることで機械翻訳などに用いられているようです。

https://www.ogis-ri.co.jp/otc/hiroba/technical/similar-document-search/part7.html

学習データを考える

褒め言葉や褒めあっているデータセットがないものかと色々探しましたが、見つかりませんでした😭
結構需要はあると思いますが、なかなか厳しい現状です。
そこで、自分でデータを作ってしまおうと考えて、LINEのトーク履歴が使える!と思いました。LINEはトーク履歴をテキストファイルとして出力できます。

http://menulist.mb.softbank.jp/feature_20220105/

LINEのトーク履歴はナチュラルな対話の宝庫だと思います。友人の許可を得てトーク履歴を漁りはじめましたが、次なる壁にぶつかります。

どこが褒めている/褒められている部分なんだろう??

目的のBotを作る上で褒めている/褒められているとはどんな状況かという問いは最も難しい本質的であり、かつ難しい問いでした。
安直に「褒め方」でググってみるとどんなサイトにも「さしすせそ」が書いてあります。

(さしすせそを思い出そうとしたとき、「せ」ってなんだっけっとなり、センスがいいですねの無理矢理感をひしひしと感じます。)
このさしすせそに準拠して、これらの単語を含む会話を抽出し、1対話ずつ手作業でデータ化しましたが、、、めんどくさすぎる!!
私自身、複数の話題を同時にやり取りするタイプなので、どの褒め言葉が、どこの話題を受けたものなのかを1つずつ判断し、必要なデータだけ集めることは困難を極めまくりました。

そこで、こちらの記事のデータを使わせていただきます。

https://qiita.com/reppy4620/items/e4305f22cd8f6962e00a

上記の記事で使用されているTwitterの対話データのうち、褒め言葉を含む対話のみを抽出し、学習データとします。

褒め発見の様子
find_praise.ipynb
#https://github.com/reppy4620/Dialog のデータからほめを抽出
#一行ずつ読み込み
with open('train_data.txt', mode='r') as f:
    lines=f.readlines()
['一生懸命の懸!全然違う。\n',
 '桃。がね、面白いの。3本あるの。\n',
 '\n',
 'ハイレモン!ヨーグレット!大好き!\n',
 'だよね!!昔からあるよね?多分\n']
find_praise.ipynb
#['発言','返信']となるように整形
new_lines = []
for i in range(0, len(lines), 3):
    line=[lines[i], lines[i+1]]
    new_lines.append(line)
print(f'対話の数:{len(new_lines)}')
new_lines[:5]
対話の数:3512500
[['一生懸命の懸!全然違う。\n', '桃。がね、面白いの。3本あるの。\n'],
 ['ハイレモン!ヨーグレット!大好き!\n', 'だよね!!昔からあるよね?多分\n'],
 ['なるほどもみ心地のよい傾斜装甲があるから気をつけないとね\n', 'あれね重さあるから肩こるのだよ\n'],
 ['iPhoneに元からある画像。\n', '放送事故的な画素感ある。\n'],
 ['むむ氏とDbDするためにコンビニへ走るわたし非常に健気\n', '夜もあるかこれは?\n']]
find_praise.ipynb
#抽出する褒め言葉を定義する
praising_words=['さすが','最高','さいこう','知らなかった','しらなかった','すごいね','すごいな','素晴らしい','すばらしい','すてき','素敵','センス','そうなんだ','その通り','そのとおり','おつかれ','お疲れ', 'えらい', '偉い', '頑張っ', 'がんばっ', 'しんどい', '大丈夫だよ', '大変', 'わかる', '応援', 'おはよう','おやすみ','かわいい','可愛い', 'イケメン', 'かっこいい','おめでとう','いいな']

praising_count = [[i,0] for i in praising_words]
#削除したい単語
delete_words=['それはない','無理']

#褒め言葉を抜き出す
praising_lines=[]
for line in new_lines:
    for i, word in enumerate(praising_words):
        if word in line[1]:
            if praising_count[i][1] >= 10000: #1万件超えていたら
                pass
            else:
                praising_count[i][1] += 1
                #改行文字を消す
                line[0] = line[0].replace('\n', '')
                line[1] = line[1].replace('\n', '')
                praising_lines.append(line)
print(f'褒め要素を含む対話の数(削除したい言葉を消す前):{len(praising_lines)}')
#praising_lines[:5]

#削除したい言葉を消す
for i in range(len(praising_lines)):
    for word in delete_words:
        try:
            if word in praising_lines[i][1]:
                praising_lines.pop(i)
        except IndexError as e:
            pass


print(f'褒め要素を含む対話の数(削除したい言葉を消す前):{len(praising_lines)}')
print(praising_lines[:5])
print(praising_count)
褒め要素を含む対話の数(削除したい言葉を消す前):152103
褒め要素を含む対話の数:151197
[['喋れなくなって周りに迷惑かかるから応募してないww', 'まじかwwある意味えらいw'], ['それは違うw', 'かわいいくないプリクラの写真なら沢山あるお'], ['ファミマで月ちゃんの声聞けたああ!', 'よかったやーん!!笑みやゆうさんもかわいいとこある。'], ['思ってた以上に猫の写真ばかりで遡るの大変だわ', 'わかるあるよねww'], ['そうか?落ちサビよーちゃんだしね', 'マイリスかわいいからすきって歌詞あるからダイヤちゃんでもよかったかなって']]
[['さすが', 10000], ['最高', 10000], ['さいこう', 292], ['知らなかった', 1532], ['しらなかった', 75], ['すごいね', 368], ['すごいな', 718], ['素晴らしい', 2431], ['すばらしい', 177], ['すてき', 225], ['素敵', 4821], ['センス', 2952], ['そうなんだ', 6082], ['その通り', 720], ['そのとおり', 76], ['おつかれ', 1842], ['お疲れ', 10000], ['えらい', 1478], ['偉い', 1284], ['頑張っ', 10000], ['がんばっ', 1676], ['しんどい', 2346], ['大丈夫だよ', 1921], ['大変', 6926], ['わかる', 10000], ['応援', 2995], ['おはよう', 10000], ['おやすみ', 6858], ['かわいい', 10000], ['可愛い', 10000], ['イケメン', 4464], ['かっこいい', 4619], ['おめでとう', 5787], ['いいな', 9438]]
find_praise.ipynb
#csv化
with open("praise_data.csv","w") as o:
    for row in praising_lines:
        print(*row, sep=",", file=o)

さしすせそを参考にこんなメッセージが来たらほんわかしそうな単語群('さすが','最高','さいこう','知らなかった','しらなかった','すごいね','すごいな','素晴らしい','すばらしい','すてき','素敵','センス','そうなんだ','その通り','そのとおり','おつかれ','お疲れ', 'えらい', '偉い', '頑張っ', 'がんばっ', 'しんどい', '大丈夫だよ', '大変', 'わかる', '応援', 'おはよう','おやすみ','かわいい','可愛い', 'イケメン', 'かっこいい','おめでとう','いいな')を含むおよそ15万件の対話を抽出しました。

各単語の最大対話数の上限は1万件となるようにし、特定の単語が異様に多くなってしまうのを避けました。
また、何度か学習をおこなっていく中で、やたらと「それはない」や、「無理」と言いまくるモデルができてしまい、今回の趣旨に著しく反するので、学習データの時点でこれらの単語が含まれる対話は削除します。

3. 褒め機能の実装

学習データができたので既存のTransformerのモデルを転移学習していきます。学習にあたっては上記の2記事を大変参考にさせていただいているので、ご自身でやりたい方はこちらこちらをご参照下さい🙇

環境

Ubuntu 18.04.4
conda 22.9.0
GPU: RTX 2080 Ti
メモリ: 8GB

転移学習

詳細
homehome.py
#事前学習済みモデル
PRETRAINED_MODEL_NAME = "sonoisa/t5-base-japanese"

#転移学習済みモデル
#事前にmodelという名前のフォルダを用意しておき、https://huggingface.co/sonoisa/t5-base-japanese/tree/mainから
#config.json、pytorch_model.bin、special_tokens_map.json、spiece.model、tf_model.h5、tokenizer_config.jsonをダウンロードして入れておく
MODEL_DIR = "model"

#ファイルのload
import pandas as pd
data = pd.read_csv('praise_data.csv',header=None) #本記事2章でつくったやつ

#文字列の正規化の定義
#https://github.com/neologd/mecab-ipadic-neologd/wiki/Regexp.ja から引用・一部改変
from __future__ import unicode_literals
import re
import unicodedata

def unicode_normalize(cls, s):
    pt = re.compile('([{}]+)'.format(cls))

    def norm(c):
        return unicodedata.normalize('NFKC', c) if pt.match(c) else c

    s = ''.join(norm(x) for x in re.split(pt, s))
    s = re.sub('-', '-', s)
    return s

def remove_extra_spaces(s):
    s = re.sub('[  ]+', ' ', s)
    blocks = ''.join(('\u4E00-\u9FFF',  # CJK UNIFIED IDEOGRAPHS
                      '\u3040-\u309F',  # HIRAGANA
                      '\u30A0-\u30FF',  # KATAKANA
                      '\u3000-\u303F',  # CJK SYMBOLS AND PUNCTUATION
                      '\uFF00-\uFFEF'   # HALFWIDTH AND FULLWIDTH FORMS
                      ))
    basic_latin = '\u0000-\u007F'

    def remove_space_between(cls1, cls2, s):
        p = re.compile('([{}]) ([{}])'.format(cls1, cls2))
        while p.search(s):
            s = p.sub(r'\1\2', s)
        return s

    s = remove_space_between(blocks, blocks, s)
    s = remove_space_between(blocks, basic_latin, s)
    s = remove_space_between(basic_latin, blocks, s)
    return s

def normalize_neologd(s):
    s = s.strip()
    s = unicode_normalize('0-9A-Za-z。-゚', s)

    def maketrans(f, t):
        return {ord(x): ord(y) for x, y in zip(f, t)}

    s = re.sub('[˗֊‐‑‒–⁃⁻₋−]+', '-', s)  # normalize hyphens
    s = re.sub('[﹣-ー—―─━ー]+', 'ー', s)  # normalize choonpus
    s = re.sub('[~∼∾〜〰~]+', '〜', s)  # normalize tildes (modified by Isao Sonobe)
    s = s.translate(
        maketrans('!"#$%&\'()*+,-./:;<=>?@[¥]^_`{|}~。、・「」',
              '!”#$%&’()*+,-./:;<=>?@[¥]^_`{|}〜。、・「」'))

    s = remove_extra_spaces(s)
    s = unicode_normalize('!”#$%&’()*+,-./:;<>?@[¥]^_`{|}〜', s)  # keep =,・,「,」
    s = re.sub('[’]', '\'', s)
    s = re.sub('[”]', '"', s)
    return s

#文字のノーマライズ化とデータ整形を実施
import re
import pickle
from tqdm import tqdm

def normalize_text(text):
    # assert "\n" not in text and "\r" not in text
    text = text.replace("\t", " ")
    text = text.strip()
    text = normalize_neologd(text)
    text = text.lower()
    return text

all_data = []

for i in tqdm(range(len(data))):
    all_data.append({
                    "text": normalize_text(data[0][i]),
                    "response": normalize_text(data[1][i]),
                    })
#all_data[:5]

#データ分割
#データセットを90% : 5%: 5% の比率でtrain/val/testに分割します。
#trainデータ: 学習に利用するデータ/valデータ: 学習中の精度評価等に利用するデータ/testデータ: 学習結果のモデルの精度評価に利用するデータ
import random
from tqdm import tqdm

random.seed(1234)
random.shuffle(all_data)

def to_line(data):
    text = data["text"]
    response = data["response"]

    assert len(text) > 0 and len(response) > 0
    return f"{text}\t{response}\n"

data_size = len(all_data)
train_ratio, val_ratio, test_ratio = 0.9, 0.05, 0.05

with open(f"data/train.tsv", "w", encoding="utf-8") as f_train, \
    open(f"data/val.tsv", "w", encoding="utf-8") as f_val, \
    open(f"data/test.tsv", "w", encoding="utf-8") as f_test:
    
    for i, data in tqdm(enumerate(all_data)):
        line = to_line(data)
        if i < train_ratio * data_size:
            f_train.write(line)
        elif i < (train_ratio + val_ratio) * data_size:
            f_val.write(line)
        else:
            f_test.write(line)

#学習に必要なクラス等の定義
#学習にはPyTorch/PyTorch-lightning/Transformersを利用します。
import argparse
import glob
import os
import json
import time
import logging
import random
import re
from itertools import chain
from string import punctuation

import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl


from transformers import (
    AdamW,
    T5ForConditionalGeneration,
    T5Tokenizer,
    get_linear_schedule_with_warmup
)

# 乱数シードの設定
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)

# GPU利用有無
USE_GPU = torch.cuda.is_available()

# 各種ハイパーパラメータ
args_dict = dict(
    data_dir="./data",  # データセットのディレクトリ
    model_name_or_path=MODEL_DIR, #PRETRAINED_MODEL_NAME,
    tokenizer_name_or_path=PRETRAINED_MODEL_NAME,

    learning_rate=3e-4,
    weight_decay=0.0,
    adam_epsilon=1e-8,
    warmup_steps=0,
    gradient_accumulation_steps=1,

    n_gpu=1 if USE_GPU else 0,
    early_stop_callback=False,
    fp_16=False,
    opt_level='O1',
    max_grad_norm=1.0,
    seed=42,
)

#TSVデータセットクラス
#TSV形式のファイルをデータセットとして読み込みます。形式は"{text}\t{response}"です。
from tqdm import tqdm
class TsvDataset(Dataset):
    def __init__(self, tokenizer, data_dir, type_path, input_max_len=512, target_max_len=512):
        self.file_path = os.path.join(data_dir, type_path)
        
        self.input_max_len = input_max_len
        self.target_max_len = target_max_len
        self.tokenizer = tokenizer
        self.inputs = []
        self.targets = []

        self._build()
  
    def __len__(self):
        return len(self.inputs)
  
    def __getitem__(self, index):
        source_ids = self.inputs[index]["input_ids"].squeeze()
        target_ids = self.targets[index]["input_ids"].squeeze()

        source_mask = self.inputs[index]["attention_mask"].squeeze()
        target_mask = self.targets[index]["attention_mask"].squeeze()

        return {"source_ids": source_ids, "source_mask": source_mask, 
                "target_ids": target_ids, "target_mask": target_mask}

    def _make_record(self, text, response):
        # 応答生成タスク用の入出力形式に変換する。
        input = f"{text}"
        target = f"{response}"
        return input, target
  
    def _build(self):
        with open(self.file_path, "r", encoding="utf-8") as f:
            for i, line in tqdm(enumerate(f)):
                line = line.strip().split("\t")
                assert len(line) == 2
                assert len(line[0]) > 0
                assert len(line[1]) > 0

                text = line[0]
                response = line[1]

                input, target = self._make_record(text, response)

                tokenized_inputs = self.tokenizer.batch_encode_plus(
                    [input], max_length=self.input_max_len, truncation=True, 
                    padding="max_length", return_tensors="pt"
                )

                tokenized_targets = self.tokenizer.batch_encode_plus(
                    [target], max_length=self.target_max_len, truncation=True, 
                    padding="max_length", return_tensors="pt"
                )

                self.inputs.append(tokenized_inputs)
                self.targets.append(tokenized_targets)

                if (i >= 1000000): # 対話データを100万までで制限する
                    break

#学習処理クラス(PyTorch-Lightning)
import pickle
class T5FineTuner(pl.LightningModule):
    def __init__(self, hparams):
        super().__init__()
        self.hparams = hparams

        # 事前学習済みモデルの読み込み
        self.model = T5ForConditionalGeneration.from_pretrained(hparams.model_name_or_path, from_tf=True)

        # トークナイザーの読み込み
        self.tokenizer = T5Tokenizer.from_pretrained(hparams.tokenizer_name_or_path, is_fast=True)

    def forward(self, input_ids, attention_mask=None, decoder_input_ids=None, 
                decoder_attention_mask=None, labels=None):
        """順伝搬"""
        return self.model(
            input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask,
            labels=labels
        )

    def _step(self, batch):
        """ロス計算"""
        labels = batch["target_ids"]

        # All labels set to -100 are ignored (masked), 
        # the loss is only computed for labels in [0, ..., config.vocab_size]
        labels[labels[:, :] == self.tokenizer.pad_token_id] = -100

        outputs = self(
            input_ids=batch["source_ids"],
            attention_mask=batch["source_mask"],
            decoder_attention_mask=batch['target_mask'],
            labels=labels
        )

        loss = outputs[0]
        return loss

    def training_step(self, batch, batch_idx):
        """訓練ステップ処理"""
        loss = self._step(batch)
        self.log("train_loss", loss)
        return {"loss": loss}

    def validation_step(self, batch, batch_idx):
        """バリデーションステップ処理"""
        loss = self._step(batch)
        self.log("val_loss", loss)
        return {"val_loss": loss}

    def test_step(self, batch, batch_idx):
        """テストステップ処理"""
        loss = self._step(batch)
        self.log("test_loss", loss)
        return {"test_loss": loss}

    def configure_optimizers(self):
        """オプティマイザーとスケジューラーを作成する"""
        model = self.model
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in model.named_parameters() 
                            if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay,
            },
            {
                "params": [p for n, p in model.named_parameters() 
                            if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
        ]
        optimizer = AdamW(optimizer_grouped_parameters, 
                          lr=self.hparams.learning_rate, 
                          eps=self.hparams.adam_epsilon)
        self.optimizer = optimizer

        scheduler = get_linear_schedule_with_warmup(
            optimizer, num_warmup_steps=self.hparams.warmup_steps, 
            num_training_steps=self.t_total
        )
        self.scheduler = scheduler

        return [optimizer], [{"scheduler": scheduler, "interval": "step", "frequency": 1}]

    def get_dataset(self, tokenizer, type_path, args):
        """データセットを作成する"""
        return TsvDataset(
            tokenizer=tokenizer, 
            data_dir=args.data_dir, 
            type_path=type_path, 
            input_max_len=args.max_input_length,
            target_max_len=args.max_target_length)
    
    def setup(self, stage=None):
        """初期設定(データセットの読み込み)"""
        if stage == 'fit' or stage is None:
            train_dataset = self.get_dataset(tokenizer=self.tokenizer, type_path="train.tsv", args=self.hparams)
            self.train_dataset = train_dataset

            val_dataset = self.get_dataset(tokenizer=self.tokenizer, type_path="val.tsv", args=self.hparams)
            self.val_dataset = val_dataset

            self.t_total = (
                (len(train_dataset) // (self.hparams.train_batch_size * max(1, self.hparams.n_gpu)))
                // self.hparams.gradient_accumulation_steps
                * float(self.hparams.num_train_epochs)
            )

    def train_dataloader(self):
        """訓練データローダーを作成する"""
        return DataLoader(self.train_dataset, 
                          batch_size=self.hparams.train_batch_size, 
                          drop_last=True, shuffle=True, num_workers=0)#4)

    def val_dataloader(self):
        """バリデーションデータローダーを作成する"""
        return DataLoader(self.val_dataset, 
                          batch_size=self.hparams.eval_batch_size, 
                          num_workers=0)#4

# 学習に用いるハイパーパラメータを設定する
args_dict.update({
    "max_input_length":  24,  # 入力文の最大トークン数
    "max_target_length": 24,  # 出力文の最大トークン数
    "train_batch_size":  8,
    "eval_batch_size":   8,
    "num_train_epochs":  20,
    })
args = argparse.Namespace(**args_dict)

train_params = dict(
    accumulate_grad_batches=args.gradient_accumulation_steps,
    gpus=args.n_gpu,
    max_epochs=args.num_train_epochs,
    precision= 16 if args.fp_16 else 32,
    amp_level=args.opt_level,
    gradient_clip_val=args.max_grad_norm,
)
homehome.py
# 転移学習の実行(GPUを利用して1エポック6時間程度)
model = T5FineTuner(args)
trainer = pl.Trainer(**train_params)
trainer.fit(model)

# 最終エポックのモデルを保存
model.tokenizer.save_pretrained(MODEL_DIR)
model.model.save_pretrained(MODEL_DIR)

del model

実際に返答を生成してみる

詳細
homehome.py
#学習済みモデルの読み込み
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import T5ForConditionalGeneration, T5Tokenizer

# トークナイザー(SentencePiece)
tokenizer = T5Tokenizer.from_pretrained(MODEL_DIR, is_fast=True)

# 学習済みモデル
trained_model = T5ForConditionalGeneration.from_pretrained(MODEL_DIR)

# GPUの利用有無
USE_GPU = torch.cuda.is_available()
if USE_GPU:
    trained_model.cuda()

trained_model.eval()

MAX_SOURCE_LENGTH = args.max_input_length   # 入力される記事本文の最大トークン数
MAX_TARGET_LENGTH = args.max_target_length  # 生成されるタイトルの最大トークン数

def preprocess_body(text):
    return normalize_text(text.replace("\n", " "))

# 推論モード設定
trained_model.eval()

# 前処理とトークナイズを行う
inputs = [preprocess_body(body)]
batch = tokenizer.batch_encode_plus(
    inputs, max_length=MAX_SOURCE_LENGTH, truncation=True, 
    padding="longest", return_tensors="pt")

input_ids = batch['input_ids']
input_mask = batch['attention_mask']
if USE_GPU:
    input_ids = input_ids.cuda()
    input_mask = input_mask.cuda()

# 生成処理を行う
outputs = trained_model.generate(
    input_ids=input_ids, attention_mask=input_mask, 
    max_length=MAX_TARGET_LENGTH,
    temperature=1.0,          # 生成にランダム性を入れる温度パラメータ
    num_beams=10,             # ビームサーチの探索幅
    diversity_penalty=1.0,    # 生成結果の多様性を生み出すためのペナルティ
    num_beam_groups=10,       # ビームサーチのグループ数
    num_return_sequences=10,  # 生成する文の数
    repetition_penalty=1.5,   # 同じ文の繰り返し(モード崩壊)へのペナルティ
)

# 生成されたトークン列を文字列に変換する
generated_titles = [tokenizer.decode(ids, skip_special_tokens=True, 
                                     clean_up_tokenization_spaces=False) 
                    for ids in outputs]

# 生成されたタイトルを表示する
for i, title in enumerate(generated_titles):
    print(f"{i+1:2}. {title}")
homehome.py
body = """
今日つらいことがあったんだよね
"""

MAX_SOURCE_LENGTH = args.max_input_length   # 入力される記事本文の最大トークン数
MAX_TARGET_LENGTH = args.max_target_length  # 生成されるタイトルの最大トークン数

def preprocess_body(text):
    return normalize_text(text.replace("\n", " "))

# 推論モード設定
trained_model.eval()

# 前処理とトークナイズを行う
inputs = [preprocess_body(body)]
batch = tokenizer.batch_encode_plus(
    inputs, max_length=MAX_SOURCE_LENGTH, truncation=True, 
    padding="longest", return_tensors="pt")

input_ids = batch['input_ids']
input_mask = batch['attention_mask']
if USE_GPU:
    input_ids = input_ids.cuda()
    input_mask = input_mask.cuda()

# 生成処理を行う
outputs = trained_model.generate(
    input_ids=input_ids, attention_mask=input_mask, 
    max_length=MAX_TARGET_LENGTH,
    temperature=1.0,          # 生成にランダム性を入れる温度パラメータ
    num_beams=10,             # ビームサーチの探索幅
    diversity_penalty=1.0,    # 生成結果の多様性を生み出すためのペナルティ
    num_beam_groups=10,       # ビームサーチのグループ数
    num_return_sequences=10,  # 生成する文の数
    repetition_penalty=1.5,   # 同じ文の繰り返し(モード崩壊)へのペナルティ
)

# 生成されたトークン列を文字列に変換する
generated_titles = [tokenizer.decode(ids, skip_special_tokens=True, 
                                     clean_up_tokenization_spaces=False) 
                    for ids in outputs]

# 生成されたタイトルを表示する
for i, title in enumerate(generated_titles):
    print(f"{i+1:2}. {title}")

「今日つらいことがあったんだよね」に対する返答

1. お疲れ様です
2. それはしんどい
3. それはしんどいね
4. そうなんだね
5. そうなんだね!
6. その通りです。
7. わかる、、
8. お疲れ様です
9. それはしんどい
10. その通りです

しんどいねって言ってあげられるの強い

「バイト終わったー」に対する返答

1. お疲れ様です!
2. お疲れ様です
3. お疲れ様です。
4. お疲れ様です
5. お疲れ様ですー
6. お疲れ様です
7. おつかれさまです
8. お疲れさまです!
9. お疲れさまです
10. お疲れ様です!!

お疲れ様一辺倒

「明日から18連勤なんだよね、、、つらい、、、、」に対する返答

1. お疲れ様です。
2. それはしんどい、、
3. お疲れ様です、、
4. お疲れ様です、、。
5. 頑張って、、
6. それはしんどい、、。
7. そうなんだよね。頑張ってね
8. それは大変だ、、
9. それな、、頑張って!!
10. お仕事頑張ってください!

これはなかなかのクオリティー

「ねえ聞いて!!逆上がりができるようになったよ!」に対する返答

1. そうなんだ!ありがとう!
2. え、いいなぁ
3. え、いいなぁ!
4. それな!かっこいいよね
5. それな!かっこいい
6. イケメンはいないよ!!
7. イケメンはいない
8. イケメンはいないよ
9. 今年は最高やで
10. イケメンすぎて好きだわ

イケメンはいない、、、?

4. LINE Bot化

モデルが完成したのでLINE Bot化します。
やはりテキストでやり取りするなら最強のUXはLINEだと思うので、このモデルがLINE上で動くようにしていきます。

「LINE Bot オウム返し」で検索するとLINE Botの作り方がたくさん出てきます。今回はそのオウム返しLINE Botに少しだけ付け足しして、褒めてくれるように改良していきます。
オウム返しの作り方↓

https://github.com/line/line-bot-sdk-python

model.py

まずは受け取ったメッセージに対して、先ほどのモデルが返答を生成してくれる関数を定義します。

model.py
from __future__ import unicode_literals
import unicodedata
import re
from transformers import T5ForConditionalGeneration, T5Tokenizer

def create_return(body):
    # 転移学習済みモデル
    MODEL_DIR = "model"

    # トークナイザー(SentencePiece)
    tokenizer = T5Tokenizer.from_pretrained(MODEL_DIR)

    # 学習済みモデル
    trained_model = T5ForConditionalGeneration.from_pretrained(MODEL_DIR,force_download=True,low_cpu_mem_usage=True)

    MAX_SOURCE_LENGTH = 24   # 入力される記事本文の最大トークン数
    MAX_TARGET_LENGTH = 24  # 生成されるタイトルの最大トークン数

    def normalize_text(text):
        # assert "\n" not in text and "\r" not in text
        text = text.replace("\t", " ")
        text = text.strip()
        text = normalize_neologd(text)
        text = text.lower()
        return text

    def unicode_normalize(cls, s):
        pt = re.compile('([{}]+)'.format(cls))

        def norm(c):
            return unicodedata.normalize('NFKC', c) if pt.match(c) else c

        s = ''.join(norm(x) for x in re.split(pt, s))
        s = re.sub('-', '-', s)
        return s

    def remove_extra_spaces(s):
        s = re.sub('[  ]+', ' ', s)
        blocks = ''.join(('\u4E00-\u9FFF',  # CJK UNIFIED IDEOGRAPHS
                        '\u3040-\u309F',  # HIRAGANA
                        '\u30A0-\u30FF',  # KATAKANA
                        '\u3000-\u303F',  # CJK SYMBOLS AND PUNCTUATION
                        '\uFF00-\uFFEF'   # HALFWIDTH AND FULLWIDTH FORMS
                        ))
        basic_latin = '\u0000-\u007F'

        def remove_space_between(cls1, cls2, s):
            p = re.compile('([{}]) ([{}])'.format(cls1, cls2))
            while p.search(s):
                s = p.sub(r'\1\2', s)
            return s

        s = remove_space_between(blocks, blocks, s)
        s = remove_space_between(blocks, basic_latin, s)
        s = remove_space_between(basic_latin, blocks, s)
        return s

    def unicode_normalize(cls, s):
        pt = re.compile('([{}]+)'.format(cls))

        def norm(c):
            return unicodedata.normalize('NFKC', c) if pt.match(c) else c

        s = ''.join(norm(x) for x in re.split(pt, s))
        s = re.sub('-', '-', s)
        return s

    def normalize_neologd(s):
        s = s.strip()
        s = unicode_normalize('0-9A-Za-z。-゚', s)

        def maketrans(f, t):
            return {ord(x): ord(y) for x, y in zip(f, t)}

        s = re.sub('[˗֊‐‑‒–⁃⁻₋−]+', '-', s)  # normalize hyphens
        s = re.sub('[﹣-ー—―─━ー]+', 'ー', s)  # normalize choonpus
        s = re.sub('[~∼∾〜〰~]+', '〜', s)  # normalize tildes (modified by Isao Sonobe)
        s = s.translate(
            maketrans('!"#$%&\'()*+,-./:;<=>?@[¥]^_`{|}~。、・「」',
                '!”#$%&’()*+,-./:;<=>?@[¥]^_`{|}〜。、・「」'))

        s = remove_extra_spaces(s)
        s = unicode_normalize('!”#$%&’()*+,-./:;<>?@[¥]^_`{|}〜', s)  # keep =,・,「,」
        s = re.sub('[’]', '\'', s)
        s = re.sub('[”]', '"', s)
        return s

    def preprocess_body(text):
        return normalize_text(text.replace("\n", " "))

    # 推論モード設定
    trained_model.eval()

    # 前処理とトークナイズを行う
    inputs = [preprocess_body(body)]
    batch = tokenizer.batch_encode_plus(
        inputs,
        max_length=MAX_SOURCE_LENGTH,
        truncation=True,
        padding="longest", return_tensors="pt")

    input_ids = batch['input_ids']
    input_mask = batch['attention_mask']

    # 生成処理を行う
    outputs = trained_model.generate(
        input_ids=input_ids,
        attention_mask=input_mask,
        max_length=MAX_TARGET_LENGTH,
        temperature=1.0,          # 生成にランダム性を入れる温度パラメータ
        num_beams=10,             # ビームサーチの探索幅
        diversity_penalty=1.0,    # 生成結果の多様性を生み出すためのペナルティ
        num_beam_groups=10,       # ビームサーチのグループ数
        num_return_sequences=1,  # 生成する文の数
        repetition_penalty=1.5,   # 同じ文の繰り返し(モード崩壊)へのペナルティ
    )

    # 生成されたトークン列を文字列に変換する
    generated_titles = [tokenizer.decode(ids, skip_special_tokens=True,
                                        clean_up_tokenization_spaces=False)
                        for ids in outputs]

    return generated_titles[0]

T5ForConditionalGeneration.from_pretrained(MODEL_DIR,force_download=True,low_cpu_mem_usage=True)においてlow_cpu_mem_usage=Trueとすることでメモリの消費量を抑えられ、後述の通りRaspberry Piで動かすことができました。

app.py

続いて、LINE Botを動かすapp.pyを作ります。

app.py
import os
import sys
from argparse import ArgumentParser

from flask import Flask, request, abort
from linebot import (
    LineBotApi, WebhookParser
)
from linebot.exceptions import (
    InvalidSignatureError
)
from linebot.models import (
    MessageEvent, TextMessage, TextSendMessage,
)

from model import create_return

app = Flask(__name__)

# get channel_secret and channel_access_token from your environment variable
# channel_secret = os.getenv('LINE_CHANNEL_SECRET', None)
# channel_access_token = os.getenv('LINE_CHANNEL_ACCESS_TOKEN', None)
channel_secret = '<<チャンネルシークレットを記入する>>'
channel_access_token = '<<チャンネルアクセストークンを記入する>>'

if channel_secret is None:
    print('Specify LINE_CHANNEL_SECRET as environment variable.')
    sys.exit(1)
if channel_access_token is None:
    print('Specify LINE_CHANNEL_ACCESS_TOKEN as environment variable.')
    sys.exit(1)

line_bot_api = LineBotApi(channel_access_token)
parser = WebhookParser(channel_secret)

@app.route("/callback", methods=['POST'])
def callback():
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)

    # parse webhook body
    try:
        events = parser.parse(body, signature)
    except InvalidSignatureError:
        abort(400)

    # if event is MessageEvent and message is TextMessage, then echo text
    for event in events:
        if not isinstance(event, MessageEvent):
            continue
        if not isinstance(event.message, TextMessage):
            continue

        #返信処理
        line_bot_api.reply_message(
            event.reply_token,
            # TextSendMessage(text=event.message.text)
            TextSendMessage(text=create_return(event.message.text))
        )

    return 'OK'

if __name__ == "__main__":
    arg_parser = ArgumentParser(
        usage='Usage: python ' + __file__ + ' [--port <port>] [--help]'
    )
    arg_parser.add_argument('-p', '--port', type=int, default=8000, help='port')
    arg_parser.add_argument('-d', '--debug', default=False, help='debug')
    options = arg_parser.parse_args()

    app.run(debug=options.debug, port=8000)

Heroku上で動かせるようにしたかったのですが、モデルのサイズが大きすぎてダメだったので、最近買ったRaspberry Piを使います。設定方法は下記の通り。

https://nekonisi.com/raspberrypi_flask_ngrok_line-bot/

先ほどのapp.pyport=8000としたので、Rasberry Pi上で、

$ ngrok http 8000

とし、Forwardingの欄に書かれているURLhttps//なんちゃらかんちゃら.jp.ngrok.ioをコピーし、LINE DevelopersのWebhook URLに貼り付ければ(コピーしたURLの後ろに/callbackをつけてください)完成です!!

5. まとめ

(自称)全肯定型ほめほめAI搭載LINE Botを作成することができました。返答は完璧とは言い難いですが、それなりに文脈を捉え、褒めたり共感したりしてくれるLINE Botが作れたのではないかと思います。
学習データの質をなんとか改善できるとよりクオリティーの高く褒めてもらえる気がします。なんかいい方法ないかな。

Discussion