📝

TensorFlow の使い方練習:自動微分と Sequential モデル

2023/03/21に公開

はじめに

numpy だといろいろ機能が足りないことが増えてきてメインウェポンを tensorflow に移行したいので勉強したことまとめ。この記事は Sequential モデルを用いた単純なフィードフォワードニューラルネットの作成まで。

Google Colab のノートブックはこちら。

よい子のみなさんはこんな記事を読まずに公式のガイドやチュートリアルを読みましょう。

https://www.tensorflow.org/guide

参考文献

tensorflow のインポート

import tensorflow as tf

演算に GPU を使用しているか否かのチェック

# 環境に存在している物理デバイスを列挙する
tf.config.list_physical_devices()
output
[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
# 環境に存在している物理 GPU を列挙する
tf.config.list_physical_devices('GPU')
output
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

余計なことをしなければ、物理デバイスとして認識されている GPU は自動的に計算のアクセラレーションに使用される。よって以下のように判定できる。

if tf.config.list_physical_devices('GPU'):
  print("TensorFlow **IS** using the GPU")
else:
  print("TensorFlow **IS NOT** using the GPU")
output
TensorFlow **IS NOT** using the GPU

演算に GPU を使いたくないとき

余計なことをする。

Google Colab のランタイムを GPU に切り替える


tf.constant

tensorflow で定数を定義するには tf.constant() を用いる。定数はイミュータブルであり、一度定義するとあとから中身を書き換えることはできない。コンストラクタには numpy 配列を渡してもよい。

x = tf.constant([[1., 2., 3.],
                 [4., 5., 6.]])

print(x)
output
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]], shape=(2, 3), dtype=float32)
print(x.shape)
print(x.dtype)
output
(2, 3)
<dtype: 'float32'>

他の形式から変換

リストや numpy の配列など直感的に変換できそうなやつを変換してくれる。

x = tf.convert_to_tensor([1, 2, 3])

print(x)
output
tf.Tensor([1 2 3], shape=(3,), dtype=int32)

numpy 配列に変換

x = tf.constant([[1., 2., 3.],
                 [4., 5., 6.]])

A = x.numpy()

print(A)
output
[[1. 2. 3.]
 [4. 5. 6.]]

スライス

スライスによる参照は可能。

print(x[:, 2])
output
<tf.Tensor: shape=(2,), dtype=float32, numpy=array([3., 6.], dtype=float32)>

演算

各種演算が可能。イミュータブルオブジェクトなので行列(テンソル)の一部を後から書き換えることは不可能。行列の shape が異なる場合、numpy と同様にブロードキャストが行われる( numpy と完全に同じかどうかは確認していない)。

x = tf.constant([[1., 2., 3.],
                 [4., 5., 6.]])
y = tf.constant([[1., 3., 5.],
                 [2., 4., 6.]])

# スカラー倍
print(5 * x)
# 和
print(x + y)
# 差
print(x - y)
# 積
print(x * y)
# 商
print(x / y)
# 転置
print(tf.transpose(x))
# 行列積
print(tf.transpose(x) @ y)
output
tf.Tensor(
[[ 5. 10. 15.]
 [20. 25. 30.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[ 2.  5.  8.]
 [ 6.  9. 12.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[ 0. -1. -2.]
 [ 2.  1.  0.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[ 1.  6. 15.]
 [ 8. 20. 36.]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[1.        0.6666667 0.6      ]
 [2.        1.25      1.       ]], shape=(2, 3), dtype=float32)
tf.Tensor(
[[1. 4.]
 [2. 5.]
 [3. 6.]], shape=(3, 2), dtype=float32)
tf.Tensor(
[[ 9. 19. 29.]
 [12. 26. 40.]
 [15. 33. 51.]], shape=(3, 3), dtype=float32)

結合

x = tf.constant([[1., 2., 3.],
                 [4., 5., 6.]])
y = tf.constant([[1., 3., 5.],
                 [2., 4., 6.]])

z = tf.concat([x, y], axis=0)

print(z)
output
tf.Tensor(
[[1. 2. 3.]
 [4. 5. 6.]
 [1. 3. 5.]
 [2. 4. 6.]], shape=(4, 3), dtype=float32)

tf.Variables

tf.Variables はミュータブルな変数であり、ニューラルネットの重みなどを格納するのに使う。コンストラクタには numpy 配列を渡してもよい。スライス、演算、結合など上で紹介した tf.constant でできる操作は大抵できる[1]。(→ ガイド

var = tf.Variable([[0., 0., 0.],
                   [0., 0., 0.]])
print(var)
output
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 0., 0.],
       [0., 0., 0.]], dtype=float32)>

代入

値を代入するには assign を使う。メモリは使い回し(のはず)。

var.assign([[1., 2., 3.],
            [4., 5., 6.]])

print(var)
output
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

スライスに対して assign を行うことで一部を上書きできる。

var[:, 1].assign([0., 0.])

print(var)
output
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 0., 3.],
       [4., 0., 6.]], dtype=float32)>

名前をつける

デバッグなどを容易にするためにテンソルには名前をつけることができる。同じ名前をつけたテンソルを複数作成することができる(同じ名前をつけたからといって同じ実体が参照されるわけではない)。

var = tf.Variable([[0., 0., 0.],
                   [0., 0., 0.]], name='mytensor')

print(var)
output
<tf.Variable 'mytensor:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 0., 0.],
       [0., 0., 0.]], dtype=float32)>

自動微分をオフにする

詳しくは知らないが、tf.Variable にはあとで紹介する自動微分を計算するための機能がデフォルトで組み込まれている。初期化時に trainable=False を指定することで機能をオフにできる(はず)。自動微分が実際にどういう仕組みで実現されているか知らないが、二重数を使って計算されているとしたらオフにしたほうがメモリや計算量を節約できるはずである。

var = tf.Variable([[0., 0., 0.],
                   [0., 0., 0.]], trainable=False)

print(var)
output
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[0., 0., 0.],
       [0., 0., 0.]], dtype=float32)>
print(var.trainable)
output
False

テンソルを CPU/GPU に配置する

TensorFlow ではテンソルは互換性のある最速のデバイス(大抵は GPU)に配置される。配置場所を明示的に指定する方法はここを参照。

自動微分

自動微分するためには tf.GradientTape で括ったコンテキストの中で計算を定義する。tape.gradient(出力, 入力)で指定すると、入力の微分値が求まる。

以下では

y = x ^ 2 + 2x + 3

に対してx = 1における微分を計算しているので

\left. \frac{dy}{dx} \right| _ {x = 1} = \left. (2x + 2) \right| _ {x = 1} = 4

となるはずであり、出力が 4.0 なので正しく求まっている。

x = tf.Variable(1.)

def f(x):
  return x ** 2 + 2 * x + 3

with tf.GradientTape() as tape:
  y = f(x)

dy_dx = tape.gradient(y, x)

print(dy_dx)
output
tf.Tensor(4.0, shape=(), dtype=float32)

入力が複数の変数に分かれている場合

入力が複数の変数に分かれている場合は、入力をリストなり辞書なりにして渡す。 戻り値は入力をリストで渡せばリスト、辞書で渡せば辞書になる。

以下ではy = ax + bより

\left. \frac{dy}{da} \right| _ {x = 2} = 2, \quad \left. \frac{dy}{db} \right| _ {x = 2} = 1

であり、これも正しく求まっている。

a = tf.Variable(1.)
b = tf.Variable(0.)

x = tf.Variable(2.)

def f(x):
  return a * x + b

with tf.GradientTape() as tape:
  y = f(x)

dy_da, dy_db = tape.gradient(y, [a, b])

print(dy_da.numpy(), dy_db.numpy())
output
2.0 1.0

最適化

TensorFlow での最適化フレームワークは自動微分により計算された目的関数(または損失関数ともいう)の勾配に基づく連続最適化(確率的最適化)である。損失関数、オプティマイザは自前で実装してもよいが、有名どころに関しては Keras に実装されているものを用いる。

モデル

ここでは簡単に線形回帰モデルを最小二乗法で最適化してみる。出力Y \in \mathbb{R} ^ {m \times d}、入力X \in \mathbb{R} ^ {m \times n}、重みW \in \mathbb{R} ^ {n \times d}、切片B \in \mathbb{R} ^ {m \times d}として、予測値\hat{Y}

\hat{Y} = XW + B

により定義する。損失関数は

L(W, B) = \| Y - \hat{Y} \| _ F ^ 2 = \sum _ {i, j = 1} ^ {m, d} \bigl(Y _ {ij} - (XW + B) _ {ij} \bigr) ^ 2

とする。

モデル、損失関数、オプティマイザの定義

ドキュメントはコレ

データセット生成
import tensorflow as tf

# 全体のシードを固定
# TensorFlow はなぜか全体、個別、両方のシードを指定しないと結果が変わる
tf.random.set_seed(6)

# 行列の形式を指定
m = 100
n = 1
d = 3

# 入力
X = tf.random.normal((m, n), 0, 1, seed=0)
# 重み
W_true = tf.random.normal((n, d), 0, 10, seed=1)
# 切片
B_true = tf.random.normal((1, d), 0, 10, seed=2)

# 観測ノイズ
noise = tf.random.normal((m, d), 0, 1, seed=3)

# 出力
Y = (X @ W_true) + B_true + noise

# データセットは (X, Y)、パラメータは (W, B)
データセットの描画
from matplotlib import pyplot as plt

# データセットのプロット
plt.figure(figsize=(8, 6))
plt.scatter(X[:, 0], Y[:, 0], c='r', label=f'w={W_true[0,0]:.02f}, b={B_true[0,0]:.02f}')
plt.scatter(X[:, 0], Y[:, 1], c='g', label=f'w={W_true[0,1]:.02f}, b={B_true[0,1]:.02f}')
plt.scatter(X[:, 0], Y[:, 2], c='b', label=f'w={W_true[0,2]:.02f}, b={B_true[0,2]:.02f}')
plt.grid(True)
plt.legend()
plt.show()

最適化
import tensorflow as tf

# 実行結果を再現するために一応シードを固定しておく
tf.random.set_seed(10)

# パラメータの初期値
W = tf.Variable(tf.random.normal((n, d), 0, 10, seed=11), name='W')
B = tf.Variable(tf.random.normal((1, d), 0, 10, seed=12), name='B')

# 損失関数の定義
def loss_function(X, Y, params):
    mse = tf.keras.losses.MeanSquaredError()
    loss = mse(Y, (X @ params[0]) + params[1])
    return loss

# 各ステップにおける更新
def training_step(X, Y, params, optimizer):
    # 損失を計算
    with tf.GradientTape() as tape:
        loss = loss_function(X, Y, params)
    
    # 勾配を計算
    grads = tape.gradient(loss, params)
    
    # 勾配を用いてパラメータを更新
    optimizer.apply_gradients(zip(grads, params))
    
    # プロットのために損失を返しておく
    # (このような処理は本来コールバック関数を用いて実装するようだが簡単のため)
    return loss.numpy()

# オプティマイザの定義
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-1)

# 最適化を実行
T = 100  # ステップ数

losses = []  # 各ステップにおける損失
for _ in range(T):
    losses.append(training_step(X, Y, [W, B], optimizer))

losses = tf.constant(losses)
損失の減少
from matplotlib import pyplot as plt

# 各ステップにおける損失をプロット
plt.figure(figsize=(8, 6))
plt.plot(losses)
plt.yscale('log')
plt.xlabel('step', fontsize=16)
plt.ylabel('log loss', fontsize=16)
plt.grid()
plt.show()

豆知識として、確率的勾配降下法(SGD: Stochastic Gradient descent)の収束の速さはO(\varepsilon)である。簡単に言えば対数を取った損失がステップ数に比例して減少する(損失が適当なt回の更新につき1桁小さくなる)ということなので、対数損失をプロットしたときに最初のほうのステップで直線的に減少していれば最適化がうまくいっていることになる[2]。Adam など Keras に実装されているオプティマイザはどれも収束の速さがO(\varepsilon)のアルゴリズムであるから、どれを使っても解釈の仕方は同じである[3]

from matplotlib import pyplot as plt

# 直線描画のために端点を取得
X_end = tf.constant([[tf.math.reduce_max(X).numpy()],
                      [tf.math.reduce_min(X).numpy()]])

# 真の関数にしたがったときの出力
Y_true = (X_end @ W_true) + B_true

# 出力の予測
Y_hat = (X @ W) + B

# 結果を描画
plt.figure(figsize=(8, 6))

# 真の関数をプロット
plt.plot(X_end[:,0], Y_true[:,0], color='r', label=f'Y_true: w={W_true[0,0]:.2f}, b={B_true[0,0]:.2f}')
plt.plot(X_end[:,0], Y_true[:,1], color='g', label=f'Y_true: w={W_true[0,1]:.2f}, b={B_true[0,1]:.2f}')
plt.plot(X_end[:,0], Y_true[:,2], color='b', label=f'Y_true: w={W_true[0,2]:.2f}, b={B_true[0,2]:.2f}')

# 予測値をプロット
plt.scatter(X[:, 0], Y_hat[:, 0], c='r', label=f'w={W[0,0]:.02f}, b={B[0,0]:.02f}')
plt.scatter(X[:, 0], Y_hat[:, 1], c='g', label=f'w={W[0,1]:.02f}, b={B[0,1]:.02f}')
plt.scatter(X[:, 0], Y_hat[:, 2], c='b', label=f'w={W[0,2]:.02f}, b={B[0,2]:.02f}')

plt.grid(True)
plt.legend()
plt.title('Result', fontsize=16)
plt.show()

Keras

深層学習を取り扱う場合はさらに Keras という深層学習フレームワークを使うことができる。Keras は Python で書かれた高水準の深層学習ライブラリで、バックエンドとして TensorFlow を使用できる(他にも CNTK と Theano を使用できる)。以下のようなケースで Keras を使うとよいと書いてある。

  • 容易に素早くプロトタイプの作成が可能(ユーザーフレンドリー,モジュール性,および拡張性による)
  • CNNとRNNの両方,およびこれらの2つの組み合わせをサポート
  • CPUとGPU上でシームレスな動作

RNN が使えるということはフィードフォワードだけでなくフィードバックを持つような有向グラフを定義できるということである。

単純なフィードフォワードNNなら Sequential モデルを用いて簡単に作成できる。スキップコネクション、フィードバック、複数のインプットといったより複雑な構造を持つネットワークのグラフを定義するには FunctionalAPI を使う。

記事が長くなってしまうので FuctionalAPI を使ったネットワーク構築の解説はここではしない。

Sequentialモデルを用いた多値分類

単純なフィードフォワードNNであれば Keras の Sequential モデルで簡単に作ることができる。ここでは簡単な多値分類NNを作成してみる。

https://keras.io/ja/getting-started/sequential-model-guide/

データセットの読み込み

sklearn にある covertype dataset を使う。covertype dataset はアメリカの森林の植生を土地の情報から予測する問題のデータセットである。54の特徴量があり、7クラスの植生に分類する。

データセットの説明取得
from sklearn.datasets import fetch_covtype

print(fetch_covtype().DESCR)
output
.. _covtype_dataset:

Forest covertypes
-----------------

The samples in this dataset correspond to 30×30m patches of forest in the US,
collected for the task of predicting each patch's cover type,
i.e. the dominant species of tree.
There are seven covertypes, making this a multiclass classification problem.
Each sample has 54 features, described on the
`dataset's homepage <https://archive.ics.uci.edu/ml/datasets/Covertype>`__.
Some of the features are boolean indicators,
while others are discrete or continuous measurements.

**Data Set Characteristics:**

    =================   ============
    Classes                        7
    Samples total             581012
    Dimensionality                54
    Features                     int
    =================   ============

:func:`sklearn.datasets.fetch_covtype` will load the covertype dataset;
it returns a dictionary-like 'Bunch' object
with the feature matrix in the ``data`` member
and the target values in ``target``. If optional argument 'as_frame' is
set to 'True', it will return ``data`` and ``target`` as pandas
data frame, and there will be an additional member ``frame`` as well.
The dataset will be downloaded from the web if necessary.
データセットの読み込み
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split

# データセットの読み込み
X, y = fetch_covtype(return_X_y=True, as_frame=True)

# データセットの分割
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.02, test_size=0.01, shuffle=True, random_state=3)

# クラスが 1-7 番なので 0-6 番に変換
y_train -= 1
y_test -= 1

# 表示
display(X_train)
display(X_test)
display(y_train)
display(y_test)
出力の加工(整数 → one-hot表現)
from keras.utils import to_categorical

y_train_cat = to_categorical(y_train.values)
y_test_cat = to_categorical(y_test.values)

# 表示
display(y_train_cat)
display(y_test_cat)

# 各クラスの偏り確認
print(y_train_cat.sum(axis=0))
print(y_test_cat.sum(axis=0))
output
array([[1., 0., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       ...,
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 1., 0.]], dtype=float32)
array([[1., 0., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       ...,
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.]], dtype=float32)
[4125. 5782.  729.   66.  179.  363.  376.]
[2104. 2839.  382.   27.   98.  179.  182.]

モデルの定義・学習・予測

import keras
from keras.models import Sequential
from keras.layers import Dense, maximum

# ネットワークの定義
model = Sequential()
model.add(Dense(units=32, activation='relu', input_dim=54, name='layer1'))
model.add(Dense(units=32, activation='relu', name='layer2'))
model.add(Dense(units=32, activation='relu', name='layer3'))
model.add(Dense(units=7, activation='softmax', name='layer4'))

# 損失関数とオプティマイザを定義
model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              metrics=['accuracy'])

# 学習
# X_train と y_train は sklearn の API と同様の形式
result = model.fit(X_train.values, y_train_cat, epochs=5, batch_size=32)

# テストデータに対する当てはまり
loss_and_metrics = model.evaluate(X_test.values, y_test_cat, batch_size=32)

# 予測結果の取得
classes = model.predict(X_test.values, batch_size=128)

学習結果の表示

データセット内のラベルが 0, 1 に偏っているので全部 0 とか全部 1 で予測しても accuracy が 0.49 くらい行ってしまう。そういうズルをしてないか確認しておく[4]

y_pred = classes.argmax(axis=1)
print(np.unique(y_pred))
output
[0 1 2 3]

4, 5, 6 のラベルはまったく予測してないけどまぁいいや(練習でそこまでやるの不毛だし)。

from matplotlib import pyplot as plt

# Accuracyのプロット
plt.figure()
plt.title('Accuracy')
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.plot(result.history['accuracy'], label='train')
plt.legend()

# Lossのプロット
plt.figure()
plt.title('categorical_crossentropy Loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.plot(np.log(result.history['loss']), label='train')
plt.legend()
plt.show()

Sequential

モデルは学習済みであるとする。

Sequential 内の各レイヤーを取得

レイヤーは番号または名前でアクセスすることができる。すべてのレイヤを取得するには layers プロパティを使う。layers はリストで返るのでインデックスでアクセスすれば浅いほうの層から順に取得できる。名前でレイヤーを取得するには get_layer メソッドを用いる。

インデックスによるアクセスはネットワーク構造を変えたときに意図した層を指さなくなる可能性があるので、なるべくならアクセスしたい層には名前をつける。

# すべてのレイヤーを取得
print(model.layers)

# 各レイヤーの名前を取得
for layer in model.layers:
    print(layer.name)

# レイヤーに名前でアクセスする
print(model.get_layer('layer1'))
output
[<keras.layers.core.dense.Dense object at 0x7f707cb2a760>, <keras.layers.core.dense.Dense object at 0x7f70856b72e0>, <keras.layers.core.dense.Dense object at 0x7f70856b7250>]
layer1
layer2
layer3
<keras.layers.core.dense.Dense object at 0x7f707cb2a760>

Dense レイヤーの重みとバイアス

Dense レイヤーの重みとバイアスは get_weights メソッドで取得可能。タプルで返り、ひとつ目が重み、ふたつ目がバイアス。

weights = model.get_layer('layer2').get_weights()[0]
biases = model.get_layer('layer2').get_weights()[1]

print(weights)
print(biases)

Dense レイヤーへのインプットとアウトプットは以下で取得できる。

layer_input = model.get_layer('layer2').input
layer_output = model.get_layer('layer2').output

print(layer_input)
print(layer_output)
output
KerasTensor(type_spec=TensorSpec(shape=(None, 32), dtype=tf.float32, name=None), name='layer1/Relu:0', description="created by layer 'layer1'")
KerasTensor(type_spec=TensorSpec(shape=(None, 32), dtype=tf.float32, name=None), name='layer2/Relu:0', description="created by layer 'layer2'")

おしまい

Functional API を使ったりオプティマイザをカスタイマイズしたり、まだやりたいことがあるのでそのうちまた勉強したら記事書くと思います。

脚注
  1. どこまで同じことができてどこからができないのかは調べていない。使ってるうちにエラーが出たときに考えればいいかなって。 ↩︎

  2. 各ステップごとに適切な学習係数を選択し続ければ線形な減少傾向が続く(Armijo条件について調べてみるとよい)。直線的に減少しないときは学習係数の設定が適切ではない。実際、今回の実験では学習係数が固定なのでステップ数が 40 を超えたあたりから直線的に減少していない。また、GPU で効率的に扱える浮動小数点数の表現はfloat32であり、有向桁数は 6, 7 ケタしかないので表現力の限界によりある程度のところで損失の減少は止まる。損失の減少が止まった原因が学習係数の設定不備によるものと表現力の限界によるものとのどちらなのか判断することは一般に面倒臭い(計算の過程でケタ落ちが発生していないかどうか確かめることになる)。今回はデータセットをバッチに分割していない単なる勾配降下法なので綺麗に直線的に減少しているが、データセットをバッチに分割する確率的勾配降下法で最適化した場合は損失が多少振動しながら全体の傾向としては直線的に減少する。個人的には、エラーをプロットするときに対数を取ってないやつは連続最適化を学んでいないモグリだと思っている。 ↩︎

  3. ちなみにニュートン法は2次収束O(\varepsilon ^ 2)であり、損失が二次関数を描いて収束する。ハレー法は3次収束O(\varepsilon ^ 3)で三次関数を描いて収束する。準ニュートン法はO(\varepsilon)O(\varepsilon ^ 2)の中間くらいの収束の速さを持つ。 ↩︎

  4. 3層だと頑張って調整してもズルしがちだった。4層に増やしたら割と賢くなった。これくらいの規模のネットワークでもうまくいかないときに何が悪くてうまくいかないのかわからないのがニューラルネットの悪いとこやね…… ↩︎

Discussion