🚅

transformerで英語から日本語の機械翻訳を行う。

2023/06/21に公開

はじめに

tensorflow のチュートリアルにある transformer の記事を勉強した。

https://www.tensorflow.org/text/tutorials/transformer

この記事では、JESC のデータセットを用いて、英語から日本語への機械翻訳を行う。チュートリアルのコードは解説のため冗長な部分があるため、この記事ではそれを省き最低限の実装で英語日本語翻訳機を実装する。コードの全体は以下に配置した。

コードの全体

データセットの説明

JESC のデータセットを以下のように加工した。

input_1: starttoken, this, is, a, pen, endtoken
input_2: starttoken, これ, は, ペン, です
output: これ, は, ペン, です, endtoken

データの加工方法については、この記事で解説した。input_1は Encoder に入力し、input_2には Decoder に入力する。outputを予想する問題を解く。

モデルの説明

transformer を用いた翻訳機は以下のようなモデルである。

上記の図は論文[1]から拝借した。この記事では以下のパラメーターを用いる。

パラメータ
VOCAB_SIZE=20000
ENGLISH_SEQUENCE_LENGTH=64
JAPANESE_SEQUENCE_LENGTH=64
EMBEDDING_DIM=128
NUM_LAYERS=6
NUM_HEADS=8
DROPOUT_RATE=0.1
FFN_UNITS=512
NUM_EPOCHS=20
BATCH_SIZE=512

Positional Embedding

transformer は RNN と違って input 位置情報を取得できない。そのため初めに Embedding ベクトルに対して単語の位置情報を埋め込むことで、文章中の単語の位置情報を獲得する。その具体的な方法として、Positional Embedding を用いる。

Positional Embedding では、通常の Embedding に対して、以下の三角関数を足すことによって、位置情報を Embedding ベクトルに付与する。

\begin{aligned} PE(p, 2d) &= \sin \left( \frac{p}{10000^{2d/D}} \right) \\ PE(p, 2d+1) &= \cos \left( \frac{p}{10000^{2d/D}} \right) \end{aligned}

ここで、p (= 0, 1, \cdots, \text{length} - 1)は単語がその文中で何番目に現れるかを表し、Dは Embedding ベクトルの次元を表す。また、2d, 2d +1は Embedding ベクトルの成分を表し、偶奇で\sin\cosを使い分ける。

Positional Embedding の実装は以下に記載した。

Positional Embedding
# positional embedding

def positional_encoding(position, d_model):
    angle_rads = np.expand_dims(np.arange(position), axis=1)
    angle_rads = angle_rads * (1 / np.power(10000, (2 * (
        np.expand_dims(np.arange(d_model), axis=0)
    )) / np.float32(d_model)))

    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = np.expand_dims(angle_rads, axis=0)

    return tf.cast(pos_encoding, dtype=tf.float32)

class PositionalEmbedding(Layer):
    def __init__(self, sequence_length, vocab_size, embedding_dim):
        super(PositionalEmbedding, self).__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.embedding = Embedding(vocab_size, embedding_dim, mask_zero=True)
        self.pos_encoding = positional_encoding(sequence_length, embedding_dim)

    def call(self, inputs):
        x = self.embedding(inputs)
        x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
        x += self.pos_encoding[:, :tf.shape(inputs)[1], :]
        return x

    def compute_mask(self, *args, **kwargs):
        return self.embedding.compute_mask(*args, **kwargs)

compute_maskメソッドを作ることで、mask 情報を付与する。mask に関する詳細な解説はこの記事で行った。

付与する三角関数を可視化すると以下のようになる。

pos_encoding = positional_encoding(ENGLISH_SEQUENCE_LENGTH, EMBEDDING_DIM)[0]

plt.vlines(30, 0, 128, colors='k', linestyles='solid', label='')

plt.pcolormesh(pos_encoding.numpy().T, cmap='RdBu')
plt.ylabel('d')
plt.xlabel('p')
plt.colorbar()
plt.show()

上図の黒い線は 30 番目の単語の Embedding ベクトルに足される項を表す。

次に、この三角関数同士の足し算を計算すると以下のようになる。

pos_encoding = positional_encoding(ENGLISH_SEQUENCE_LENGTH, EMBEDDING_DIM)[0]
pos_encoding/=tf.norm(pos_encoding, axis=1, keepdims=True)

plt.vlines(30, 0, 1, colors='k', linestyles='solid', label='')

p = pos_encoding[30]
dots = tf.einsum('pd,d -> p', pos_encoding, p)
plt.ylim([0,1])
plt.xlim([0,ENGLISH_SEQUENCE_LENGTH])
plt.plot(dots)
plt.show()

30 番目の単語と 1~64 番目の単語の相対的な量を計算している。確かに付近 2,3 個の単語が大きい量を獲得していることがわかる。(今回用いた長さ 64 の文字列だと、遠くの単語同士でも大きい量を獲得しているように見える。チュートリアルは文章の長さを長くして図を良く見せようとしているのかな。もしかしたら私の実装が間違ってるかもしれないので、間違ってたら教えて欲しいです。)

Encoder Transformer

Encoder に用いる transformer は以下のように実装する。

Encoder Transformer
# encoder transformer

class EncoderTransformer(Layer):
    def __init__(self, num_heads):
        super(EncoderTransformer, self).__init__()
        self.attention = MultiHeadAttention(
            num_heads = num_heads,
            key_dim = EMBEDDING_DIM
        )
        self.feed_forward = tf.keras.Sequential([
            Dense(FFN_UNITS, activation='relu'),
            Dense(EMBEDDING_DIM)
        ])
        self.layernorm_1 = LayerNormalization()
        self.layernorm_2 = LayerNormalization()
        self.dropout_1 = Dropout(DROPOUT_RATE)
        self.dropout_2 = Dropout(DROPOUT_RATE)

        self.supports_masking = True

    def call(self, inputs):
        att_output, att_score = self.attention(
            query = inputs,
            value = inputs,
            key = inputs,
            return_attention_scores = True
        )
        att_output = self.dropout_1(att_output)
        ffd_input = self.layernorm_1(inputs + att_output)

        ffd_output = self.feed_forward(ffd_input)

        ffd_output = self.dropout_2(ffd_output)
        outputs = self.layernorm_2(ffd_input + ffd_output)

        return outputs

Dropout は、元論文を参考に挟んだため、チュートリアルの内容とは異なっている。self.supports_masking = Trueの部分についてはこの記事で解説した。

Encoder

Encoder Transformer を複数個連ねることで Encoder を作成する。

Encoder
# encoder

class Encoder(Model):
    def __init__(self, num_layers, num_heads):
        super(Encoder, self).__init__()
        self.num_layers = num_layers
        self.transformers = [
            EncoderTransformer(num_heads) for _ in range(num_layers)
        ]
        self.dropout = Dropout(DROPOUT_RATE)

        self.supports_masking = True

    def call(self, x):
        x = self.dropout(x)
        for i in range(self.num_layers):
            x = self.transformers[i](x)
        return x

Decoder Transformer

Decoder に用いる transformer は以下のように実装する。

Decoder Transformer
# decoder transformer

class DecoderTransformer(Layer):
    def __init__(self, num_heads):
        super(DecoderTransformer, self).__init__()
        self.attention_1 = MultiHeadAttention(
            num_heads = num_heads,
            key_dim = EMBEDDING_DIM
        )
        self.attention_2 = MultiHeadAttention(
            num_heads = num_heads,
            key_dim = EMBEDDING_DIM
        )
        self.feed_forward = tf.keras.Sequential([
            Dense(FFN_UNITS, activation='relu'),
            Dense(EMBEDDING_DIM)
        ])
        self.layernorm_1 = LayerNormalization()
        self.layernorm_2 = LayerNormalization()
        self.layernorm_3 = LayerNormalization()

        self.dropout_1 = Dropout(DROPOUT_RATE)
        self.dropout_2 = Dropout(DROPOUT_RATE)
        self.dropout_3 = Dropout(DROPOUT_RATE)

        self.supports_masking = True

    def call(self, inputs, encoder_outputs):
        att_output_1, att_score_1 = self.attention_1(
            query = inputs,
            value = inputs,
            key = inputs,
            return_attention_scores = True,
            use_causal_mask = True,
        )

        att_output_1 = self.dropout_1(att_output_1)
        att_input_2 = self.layernorm_1(inputs + att_output_1)

        att_output_2, att_score_2 = self.attention_2(
            query = att_input_2,
            value = encoder_outputs,
            key = encoder_outputs,
            return_attention_scores = True
        )

        att_output_2 = self.dropout_2(att_output_2)
        ffd_input = self.layernorm_2(att_input_2 + att_output_2)

        ffd_output = self.feed_forward(ffd_input)
        ffd_output = self.dropout_3(ffd_output)

        outputs = self.layernorm_3(ffd_input + ffd_output)
        return outputs

attention_1は masked MHA と呼ばれ、attention 行列の右上の三角の部分を mask している。この mask もこの記事で解説した。
attention_2は encoder と decoder を組み合わせる attention で、decoder の input を query として encoder から情報を取り出している。このレイヤーのatt_socre_2を可視化することで日本語と英語の相関が可視化できる。

Decoder

Decoder Transformer を複数個連ねることで Decoder を作成する。

Decoder
# decoder

class Decoder(Model):
    def __init__(self, num_layers, num_heads):
        super(Decoder, self).__init__()
        self.num_layers = num_layers
        self.transformers = [
            DecoderTransformer(num_heads) for _ in range(num_layers)
        ]
        self.dropout = Dropout(DROPOUT_RATE)

    def call(self, x, encoder_outputs):
        x = self.dropout(x)
        for i in range(self.num_layers):
            x = self.transformers[i](x, encoder_outputs)
        return x

Model

モデル全体としては次のように実装される。

model
# model

class seq2seq(Model):
    def __init__(self, encoder, decoder):
        super(seq2seq, self).__init__()
        self.english_embedding = PositionalEmbedding(ENGLISH_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIM)
        self.encoder = encoder

        self.japanese_embedding = PositionalEmbedding(JAPANESE_SEQUENCE_LENGTH, VOCAB_SIZE, EMBEDDING_DIM)
        self.decoder = decoder

        self.dense = Dense(VOCAB_SIZE)

    def call(self, inputs):
        english_inputs, japanese_inputs = inputs

        encoder_inputs = self.english_embedding(english_inputs)
        encoder_outputs = self.encoder(encoder_inputs)

        decoder_inputs = self.japanese_embedding(japanese_inputs)
        decoder_outputs = self.decoder(decoder_inputs, encoder_outputs)

        outputs = self.dense(decoder_outputs)
        return outputs

encoder = Encoder(num_layers = NUM_LAYERS, num_heads = NUM_HEADS)
decoder = Decoder(num_layers = NUM_LAYERS, num_heads = NUM_HEADS)

model = seq2seq(encoder, decoder)
model.build(input_shape=[(None, ENGLISH_SEQUENCE_LENGTH), (None, JAPANESE_SEQUENCE_LENGTH)])
model.summary()
Model: "seq2seq"
_________________________________________________________________
 Layer (type)                Output Shape              Param #
=================================================================
 positional_embedding (Posit  multiple                 2560000
 ionalEmbedding)

 encoder (Encoder)           multiple                  3958272

 positional_embedding_1 (Pos  multiple                 2560000
 itionalEmbedding)

 decoder (Decoder)           multiple                  7124736

 dense_24 (Dense)            multiple                  2580000

=================================================================
Total params: 18,783,008
Trainable params: 18,783,008
Non-trainable params: 0
_________________________________________________________________

学習

前章で作成したモデルを学習する。

損失関数

損失関数は CrossEntropy を用いる。ただ mask された部分は学習しないように取り除く。

損失関数
# loss function

def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss) / tf.reduce_sum(mask)
  return loss

精度

精度も mask された部分だけ取り除く。

精度
# accuracy

def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=-1)
  label = tf.cast(label, pred.dtype)

  match = label == pred
  mask = label != 0
  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match) / tf.reduce_sum(mask)

最適化関数

最適化関数は、warm-up を行う。

最適化関数
# learning rate Warm-up,

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# optimizer

learning_rate = CustomSchedule(EMBEDDING_DIM)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

ちなみに Attention is all you need の論文のレビュー[2]に面白い記載があった。

The learning rate schedule seems to really matter. Using simple SGD works fine for LSTM, but seems to fail here
訳: 学習速度のスケジュールは非常に重要なようです。 単純な SGD の使用は LSTM では正常に機能しますが、ここでは失敗するようです。

このモデルを学習させるには、ウォームアップは必須らしい。

学習

学習は以下のように行った。

model.compile(optimizer = optimizer, loss = masked_loss, metrics = [masked_accuracy])
history = model.fit(train_dataset, validation_data=test_dataset, epochs=NUM_EPOCHS, shuffle=True)

学習は綺麗に進んだ。

結果

翻訳機
def translator(english_sentence):
    tokenized_english_sentence = english_vectorization_layer([
        "starttoken " + english_sentence + " endtoken"
    ])[:, : len(english_sentence.split()) + 2]
    japanese_sentences = ["starttoken"]

    for i in range(JAPANESE_SEQUENCE_LENGTH):
        tokenized_japanese_sentence = japanese_vectorization_layer([
            " ".join(japanese_sentences)
        ])[:, :i+1]

        predictions = model(
            [tokenized_english_sentence, tokenized_japanese_sentence],
            training=False
        )
        predicted_id = tf.argmax(tf.math.softmax(predictions[0, -1, :]))

        decoder_outputs_last_word = japanese_vocabulary[predicted_id]
        japanese_sentences.append(decoder_outputs_last_word)

        if decoder_outputs_last_word == 'endtoken':
            break

    return "".join(japanese_sentences[1:-1])
print(translator("this is a pen."))
print(translator('i am a student'))
print(translator("i love you."))
print(translator("what are you doing?"))
print(translator("there are so many people around here who are enjoying their time celebrating this white night festival."))
output
ペンだ
私は学生です
愛してる
何をしてるの
この辺で楽しい時間を[UNK]てる人がたくさんいるのよ

おわりに

Transformer を用いた翻訳機を実装した。Transformer 自体は LSTM などと比べると非常に実装が楽で、これで精度が出るなら楽でいいなって思った。(mask の部分はしっかり考えないと学習が進まなくなるが。)

現在の Keras の MultiHeadAttention の実装を見ると普通に tensorflow の関数組み合わせて実装されている。この辺がTransformerEngineに置き換わることで高速化されるのかな。

次は Bert を実装しよう。

脚注
  1. https://arxiv.org/abs/1706.03762 ↩︎

  2. https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Reviews.html ↩︎

Discussion