transformerで英語から日本語の機械翻訳を行う。
はじめに
tensorflow のチュートリアルにある transformer の記事を勉強した。
この記事では、JESC のデータセットを用いて、英語から日本語への機械翻訳を行う。チュートリアルのコードは解説のため冗長な部分があるため、この記事ではそれを省き最低限の実装で英語日本語翻訳機を実装する。コードの全体は以下に配置した。
コードの全体
データセットの説明
JESC のデータセットを以下のように加工した。
input_1: starttoken, this, is, a, pen, endtoken
input_2: starttoken, これ, は, ペン, です
output: これ, は, ペン, です, endtoken
データの加工方法については、この記事で解説した。input_1
は Encoder に入力し、input_2
には Decoder に入力する。output
を予想する問題を解く。
モデルの説明
transformer を用いた翻訳機は以下のようなモデルである。
上記の図は論文[1]から拝借した。この記事では以下のパラメーターを用いる。
パラメータ
VOCAB_SIZE=20000
ENGLISH_SEQUENCE_LENGTH=64
JAPANESE_SEQUENCE_LENGTH=64
EMBEDDING_DIM=128
NUM_LAYERS=6
NUM_HEADS=8
DROPOUT_RATE=0.1
FFN_UNITS=512
NUM_EPOCHS=20
BATCH_SIZE=512
Positional Embedding
transformer は RNN と違って input 位置情報を取得できない。そのため初めに Embedding ベクトルに対して単語の位置情報を埋め込むことで、文章中の単語の位置情報を獲得する。その具体的な方法として、Positional Embedding を用いる。
Positional Embedding では、通常の Embedding に対して、以下の三角関数を足すことによって、位置情報を Embedding ベクトルに付与する。
ここで、
Positional Embedding の実装は以下に記載した。
Positional Embedding
# positional embedding
def positional_encoding(position, d_model):
angle_rads = np.expand_dims(np.arange(position), axis=1)
angle_rads = angle_rads * (1 / np.power(10000, (2 * (
np.expand_dims(np.arange(d_model), axis=0)
)) / np.float32(d_model)))
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = np.expand_dims(angle_rads, axis=0)
return tf.cast(pos_encoding, dtype=tf.float32)
class PositionalEmbedding(Layer):
def __init__(self, sequence_length, vocab_size, embedding_dim):
super(PositionalEmbedding, self).__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.embedding = Embedding(vocab_size, embedding_dim, mask_zero=True)
self.pos_encoding = positional_encoding(sequence_length, embedding_dim)
def call(self, inputs):
x = self.embedding(inputs)
x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
x += self.pos_encoding[:, :tf.shape(inputs)[1], :]
return x
def compute_mask(self, *args, **kwargs):
return self.embedding.compute_mask(*args, **kwargs)
compute_mask
メソッドを作ることで、mask 情報を付与する。mask に関する詳細な解説はこの記事で行った。
付与する三角関数を可視化すると以下のようになる。
pos_encoding = positional_encoding(ENGLISH_SEQUENCE_LENGTH, EMBEDDING_DIM)[0]
plt.vlines(30, 0, 128, colors='k', linestyles='solid', label='')
plt.pcolormesh(pos_encoding.numpy().T, cmap='RdBu')
plt.ylabel('d')
plt.xlabel('p')
plt.colorbar()
plt.show()
上図の黒い線は 30 番目の単語の Embedding ベクトルに足される項を表す。
次に、この三角関数同士の足し算を計算すると以下のようになる。
pos_encoding = positional_encoding(ENGLISH_SEQUENCE_LENGTH, EMBEDDING_DIM)[0]
pos_encoding/=tf.norm(pos_encoding, axis=1, keepdims=True)
plt.vlines(30, 0, 1, colors='k', linestyles='solid', label='')
p = pos_encoding[30]
dots = tf.einsum('pd,d -> p', pos_encoding, p)
plt.ylim([0,1])
plt.xlim([0,ENGLISH_SEQUENCE_LENGTH])
plt.plot(dots)
plt.show()
30 番目の単語と 1~64 番目の単語の相対的な量を計算している。確かに付近 2,3 個の単語が大きい量を獲得していることがわかる。(今回用いた長さ 64 の文字列だと、遠くの単語同士でも大きい量を獲得しているように見える。チュートリアルは文章の長さを長くして図を良く見せようとしているのかな。もしかしたら私の実装が間違ってるかもしれないので、間違ってたら教えて欲しいです。)
Encoder Transformer
Encoder に用いる transformer は以下のように実装する。
Encoder Transformer
# encoder transformer
class EncoderTransformer(Layer):
def __init__(self, num_heads):
super(EncoderTransformer, self).__init__()
self.attention = MultiHeadAttention(
num_heads = num_heads,
key_dim = EMBEDDING_DIM
)
self.feed_forward = tf.keras.Sequential([
Dense(FFN_UNITS, activation='relu'),
Dense(EMBEDDING_DIM)
])
self.layernorm_1 = LayerNormalization()
self.layernorm_2 = LayerNormalization()
self.dropout_1 = Dropout(DROPOUT_RATE)
self.dropout_2 = Dropout(DROPOUT_RATE)
self.supports_masking = True
def call(self, inputs):
att_output, att_score = self.attention(
query = inputs,
value = inputs,
key = inputs,
return_attention_scores = True
)
att_output = self.dropout_1(att_output)
ffd_input = self.layernorm_1(inputs + att_output)
ffd_output = self.feed_forward(ffd_input)
ffd_output = self.dropout_2(ffd_output)
outputs = self.layernorm_2(ffd_input + ffd_output)
return outputs
Dropout は、元論文を参考に挟んだため、チュートリアルの内容とは異なっている。self.supports_masking = True
の部分についてはこの記事で解説した。
Encoder
Encoder Transformer を複数個連ねることで Encoder を作成する。
Encoder
# encoder
class Encoder(Model):
def __init__(self, num_layers, num_heads):
super(Encoder, self).__init__()
self.num_layers = num_layers
self.transformers = [
EncoderTransformer(num_heads) for _ in range(num_layers)
]
self.dropout = Dropout(DROPOUT_RATE)
self.supports_masking = True
def call(self, x):
x = self.dropout(x)
for i in range(self.num_layers):
x = self.transformers[i](x)
return x
Decoder Transformer
Decoder に用いる transformer は以下のように実装する。
Decoder Transformer
# decoder transformer
class DecoderTransformer(Layer):
def __init__(self, num_heads):
super(DecoderTransformer, self).__init__()
self.attention_1 = MultiHeadAttention(
num_heads = num_heads,
key_dim = EMBEDDING_DIM
)
self.attention_2 = MultiHeadAttention(
num_heads = num_heads,
key_dim = EMBEDDING_DIM
)
self.feed_forward = tf.keras.Sequential([
Dense(FFN_UNITS, activation='relu'),
Dense(EMBEDDING_DIM)
])
self.layernorm_1 = LayerNormalization()
self.layernorm_2 = LayerNormalization()
self.layernorm_3 = LayerNormalization()
self.dropout_1 = Dropout(DROPOUT_RATE)
self.dropout_2 = Dropout(DROPOUT_RATE)
self.dropout_3 = Dropout(DROPOUT_RATE)
self.supports_masking = True
def call(self, inputs, encoder_outputs):
att_output_1, att_score_1 = self.attention_1(
query = inputs,
value = inputs,
key = inputs,
return_attention_scores = True,
use_causal_mask = True,
)
att_output_1 = self.dropout_1(att_output_1)
att_input_2 = self.layernorm_1(inputs + att_output_1)
att_output_2, att_score_2 = self.attention_2(
query = att_input_2,
value = encoder_outputs,
key = encoder_outputs,
return_attention_scores = True
)
att_output_2 = self.dropout_2(att_output_2)
ffd_input = self.layernorm_2(att_input_2 + att_output_2)
ffd_output = self.feed_forward(ffd_input)
ffd_output = self.dropout_3(ffd_output)
outputs = self.layernorm_3(ffd_input + ffd_output)
return outputs
attention_1
は masked MHA と呼ばれ、attention 行列の右上の三角の部分を mask している。この mask もこの記事で解説した。
attention_2
は encoder と decoder を組み合わせる attention で、decoder の input を query として encoder から情報を取り出している。このレイヤーのatt_socre_2
を可視化することで日本語と英語の相関が可視化できる。
Decoder
Decoder Transformer を複数個連ねることで Decoder を作成する。
Decoder
# decoder
class Decoder(Model):
def __init__(self, num_layers, num_heads):
super(Decoder, self).__init__()
self.num_layers = num_layers
self.transformers = [
DecoderTransformer(num_heads) for _ in range(num_layers)
]
self.dropout = Dropout(DROPOUT_RATE)
def call(self, x, encoder_outputs):
x = self.dropout(x)
for i in range(self.num_layers):
x = self.transformers[i](x, encoder_outputs)
return x
Model
モデル全体としては次のように実装される。
model
# model
class seq2seq(Model):
def __init__(self, encoder, decoder):
super(seq2seq, self).__init__()
self.english_embedding = PositionalEmbedding(ENGLISH_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIM)
self.encoder = encoder
self.japanese_embedding = PositionalEmbedding(JAPANESE_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIM)
self.decoder = decoder
self.dense = Dense(VOCAB_SIZE)
def call(self, inputs):
english_inputs, japanese_inputs = inputs
encoder_inputs = self.english_embedding(english_inputs)
encoder_outputs = self.encoder(encoder_inputs)
decoder_inputs = self.japanese_embedding(japanese_inputs)
decoder_outputs = self.decoder(decoder_inputs, encoder_outputs)
outputs = self.dense(decoder_outputs)
return outputs
encoder = Encoder(num_layers = NUM_LAYERS, num_heads = NUM_HEADS)
decoder = Decoder(num_layers = NUM_LAYERS, num_heads = NUM_HEADS)
model = seq2seq(encoder, decoder)
model.build(input_shape=[(None, ENGLISH_SEQUENCE_LENGTH), (None, JAPANESE_SEQUENCE_LENGTH)])
model.summary()
Model: "seq2seq"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
positional_embedding (Posit multiple 2560000
ionalEmbedding)
encoder (Encoder) multiple 3958272
positional_embedding_1 (Pos multiple 2560000
itionalEmbedding)
decoder (Decoder) multiple 7124736
dense_24 (Dense) multiple 2580000
=================================================================
Total params: 18,783,008
Trainable params: 18,783,008
Non-trainable params: 0
_________________________________________________________________
学習
前章で作成したモデルを学習する。
損失関数
損失関数は CrossEntropy を用いる。ただ mask された部分は学習しないように取り除く。
損失関数
# loss function
def masked_loss(label, pred):
mask = label != 0
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
loss = loss_object(label, pred)
mask = tf.cast(mask, dtype=loss.dtype)
loss *= mask
loss = tf.reduce_sum(loss) / tf.reduce_sum(mask)
return loss
精度
精度も mask された部分だけ取り除く。
精度
# accuracy
def masked_accuracy(label, pred):
pred = tf.argmax(pred, axis=-1)
label = tf.cast(label, pred.dtype)
match = label == pred
mask = label != 0
match = match & mask
match = tf.cast(match, dtype=tf.float32)
mask = tf.cast(mask, dtype=tf.float32)
return tf.reduce_sum(match) / tf.reduce_sum(mask)
最適化関数
最適化関数は、warm-up を行う。
最適化関数
# learning rate Warm-up,
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super().__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
step = tf.cast(step, dtype=tf.float32)
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps ** -1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
# optimizer
learning_rate = CustomSchedule(EMBEDDING_DIM)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
ちなみに Attention is all you need の論文のレビュー[2]に面白い記載があった。
The learning rate schedule seems to really matter. Using simple SGD works fine for LSTM, but seems to fail here
訳: 学習速度のスケジュールは非常に重要なようです。 単純な SGD の使用は LSTM では正常に機能しますが、ここでは失敗するようです。
このモデルを学習させるには、ウォームアップは必須らしい。
学習
学習は以下のように行った。
model.compile(optimizer = optimizer, loss = masked_loss, metrics = [masked_accuracy])
history = model.fit(train_dataset, validation_data=test_dataset, epochs=NUM_EPOCHS, shuffle=True)
学習は綺麗に進んだ。
結果
翻訳機
def translator(english_sentence):
tokenized_english_sentence = english_vectorization_layer([
"starttoken " + english_sentence + " endtoken"
])[:, : len(english_sentence.split()) + 2]
japanese_sentences = ["starttoken"]
for i in range(JAPANESE_SEQUENCE_LENGTH):
tokenized_japanese_sentence = japanese_vectorization_layer([
" ".join(japanese_sentences)
])[:, :i+1]
predictions = model(
[tokenized_english_sentence, tokenized_japanese_sentence],
training=False
)
predicted_id = tf.argmax(tf.math.softmax(predictions[0, -1, :]))
decoder_outputs_last_word = japanese_vocabulary[predicted_id]
japanese_sentences.append(decoder_outputs_last_word)
if decoder_outputs_last_word == 'endtoken':
break
return "".join(japanese_sentences[1:-1])
print(translator("this is a pen."))
print(translator('i am a student'))
print(translator("i love you."))
print(translator("what are you doing?"))
print(translator("there are so many people around here who are enjoying their time celebrating this white night festival."))
ペンだ
私は学生です
愛してる
何をしてるの
この辺で楽しい時間を[UNK]てる人がたくさんいるのよ
おわりに
Transformer を用いた翻訳機を実装した。Transformer 自体は LSTM などと比べると非常に実装が楽で、これで精度が出るなら楽でいいなって思った。(mask の部分はしっかり考えないと学習が進まなくなるが。)
現在の Keras の MultiHeadAttention の実装を見ると普通に tensorflow の関数組み合わせて実装されている。この辺がTransformerEngineに置き換わることで高速化されるのかな。
次は Bert を実装しよう。
Discussion