【E資格】深層学習②

III 深層学習Day3

 ここでは再帰的ニューラルネットワーク(RNN)について説明したのちに、RNNの問題点の解決策としてのLSTMおよびその改良であるGRUや双方向RNNについて述べる。その後、RNNを自然言語処理に応用したSeq2SeqやWord2Vec、そして時系列データの中身に関連度をつける手法であるAttention Mechanismについて説明する。

1. 再帰型ニューラルネットワーク(RNN)の概念

 RNNとは時系列データに対応可能なニューラルネットワークを指す。ここで、時系列データとは音声・株価のように、時間的順序を追って観察され、相互に統計的依存性があるデータをさす。RNNの流れを図示化すると以下のようになる。

特徴は、中間層から出力層に渡すだけではなく、「入力のように」中間層に情報を組み込んでいる点である。つまり、ある時点の情報を保持して、次の時点の入力にも取り込むのがRNNである。上図を数式で記述すると以下のようになる。

$$\begin{eqnarray}
u^t &=& W_{(in)}x^t + Wz^{t-1} + b\\
z^t &=& f\left(W_{(in)}x^t+Wz^{t-1}+b\right)\\
v^t&=&W_{(out)}z^t + c\\
y^t &=& g\left(W_{(out)}z^t +\right)
\end{eqnarray}$$

この構造を実現するためには、初期の状態と過去の時間t-1の状態を保持し、そこから次の時点でのtを再帰的に求める再帰構造が必要になる。ソースコードのイメージとしては1学習単位(エポック)ごとに、全時系列データを学習させるため、for 学習回数〜 : for 時系列〜 :のような流れになる。
 次にBPTT(Back Propagation Through Time)について説明する。BPTTはRNNにおいてのパラメータ調整方法の一つであり、誤差逆伝播法の一種である。数学的記述は以下のようになる。

・重み3つの更新式

$$\begin{eqnarray}
\frac{\partial E}{\partial W_{(in)}} &=& \frac{\partial E}{\partial u^t}\left[\frac{\partial u^t}{\partial W_{(in)}}\right]^T=\delta^t\left[x^t\right]^T\\
\frac{\partial E}{\partial W_{(out)}} &=& \frac{\partial E}{\partial v^t}\left[\frac{\partial v^t}{\partial W_{(out)}}\right]^T=\delta^{out,t}\left[z^t\right]^T\\
\frac{\partial E}{\partial W} &=& \frac{\partial E}{\partial u^t}\left[\frac{\partial u^t}{\partial W}\right]^T=\delta^t\left[z^{t-1}\right]^T
\end{eqnarray}$$

・バイアスの更新式

$$\begin{eqnarray}
\frac{\partial E}{\partial b} &=& \frac{\partial E}{\partial u^t}\frac{\partial u^t}{\partial b} = \delta^t\\
\frac{\partial E}{\partial c} &=& \frac{\partial E}{\partial v^t}\frac{\partial v^t}{\partial c} = \delta^{out,t}
\end{eqnarray}$$

・δtについて

$$\begin{eqnarray}
\frac{\partial E}{\partial u^t} &=& \frac{\partial E}{\partial v^t}\frac{\partial v^t}{\partial u^t}=f'(u^t)W_{(out)}^T\delta^{out,t}=\delta^t\\
\delta^{t-1} &=& \frac{\partial E}{\partial u^{t-1}} = \delta^t\left(Wf'(u^{t-1})\right)\\
\delta^{t-z-1} &=& = \delta^{t-z}\left(Wf'(u^{t-z-1})\right)
\end{eqnarray}$$

・パラメータの更新

$$\begin{eqnarray}
W_{(in)}^{t+1} &=& W_{(in)}^t-\epsilon\frac{\partial E}{\partial W_{(in)}}=W_{(in)}^t-\epsilon\sum^{T_t}_{z=0}\delta^{t-z}\left[x^{t-z}\right]^T\\
W_{(out)}^{t+1}&=&W_{(out)}^t-\epsilon\frac{\partial E}{\partial W_{(out)}}=W_{(out)}^t-\epsilon\delta^{out,t}\left[z^t\right]^T\\
W^{t+1}&=&W^t-\epsilon\frac{\partial E}{\partial W}=W_{(in)}^t-\epsilon\sum^{T_t}_{z=0}\delta^{t-z}\left[z^{t-z-1}\right]^T\\
b^{t+1}&=&b^t-\epsilon\frac{\partial E}{\partial b}=b^t-\epsilon\sum^{T_t}_{z=0}\delta^{t-z}\\
c^{t+1}&=&c^t-\epsilon\frac{\partial E}{\partial c}=c^t - \epsilon\delta^{out,t}

\end{eqnarray}$$

・BPTTの全体像

$$\begin{eqnarray}
E^t &=& loss\left(y^t, d^t\right)\\
&=& loss\left(g\left(W_{(out)}z^t+c\right),d^t\right)\\
&=& loss\left(g\left(W_{(out)}f\left(W_{(in)}x^t+Wz^{t-1}+b\right)+c\right),d^t\right)
\end{eqnarray}$$

これより、誤差関数に中間層(z)の時間依存性があることがわかる。

実装演習

バイナリ加算をRNNで実装した結果を以下に示す。

import numpy as np
from common import functions
import matplotlib.pyplot as plt


def d_tanh(x):
    return 1/(np.cosh(x) ** 2)

binary_dim = 8
largest_number = pow(2, binary_dim)
binary = np.unpackbits(np.array([range(largest_number)],dtype=np.uint8).T,axis=1)

input_layer_size = 2
hidden_layer_size = 16
output_layer_size = 1

weight_init_std = 1
learning_rate = 0.1

iters_num = 10000
plot_interval = 100

# ウェイト初期化 (バイアスは簡単のため省略)
W_in = weight_init_std * np.random.randn(input_layer_size, hidden_layer_size)
W_out = weight_init_std * np.random.randn(hidden_layer_size, output_layer_size)
W = weight_init_std * np.random.randn(hidden_layer_size, hidden_layer_size)

# 勾配
W_in_grad = np.zeros_like(W_in)
W_out_grad = np.zeros_like(W_out)
W_grad = np.zeros_like(W)

u = np.zeros((hidden_layer_size, binary_dim + 1))
z = np.zeros((hidden_layer_size, binary_dim + 1))
y = np.zeros((output_layer_size, binary_dim))

delta_out = np.zeros((output_layer_size, binary_dim))
delta = np.zeros((hidden_layer_size, binary_dim + 1))

all_losses = []

for i in range(iters_num):
    
    # A, B初期化 (a + b = d)
    a_int = np.random.randint(largest_number/2)
    a_bin = binary[a_int] # binary encoding
    b_int = np.random.randint(largest_number/2)
    b_bin = binary[b_int] # binary encoding
    
    # 正解データ
    d_int = a_int + b_int
    d_bin = binary[d_int]
    
    # 出力バイナリ
    out_bin = np.zeros_like(d_bin)
    
    # 時系列全体の誤差
    all_loss = 0    
    
    # 時系列ループ
    for t in range(binary_dim):
        # 入力値
        X = np.array([a_bin[ - t - 1], b_bin[ - t - 1]]).reshape(1, -1)
        # 時刻tにおける正解データ
        dd = np.array([d_bin[binary_dim - t - 1]])
        
        u[:,t+1] = np.dot(X, W_in) + np.dot(z[:,t].reshape(1, -1), W)
        z[:,t+1] = functions.sigmoid(u[:,t+1])
        y[:,t] = functions.sigmoid(np.dot(z[:,t+1].reshape(1, -1), W_out))

        loss = functions.mean_squared_error(dd, y[:,t])
        delta_out[:,t] = functions.d_mean_squared_error(dd, y[:,t]) * functions.d_sigmoid(y[:,t])        
        all_loss += loss

        out_bin[binary_dim - t - 1] = np.round(y[:,t])
    
    
    for t in range(binary_dim)[::-1]:
        X = np.array([a_bin[-t-1],b_bin[-t-1]]).reshape(1, -1)        

        delta[:,t] = (np.dot(delta[:,t+1].T, W.T) + np.dot(delta_out[:,t].T, W_out.T)) * functions.d_sigmoid(u[:,t+1])

        # 勾配更新
        W_out_grad += np.dot(z[:,t+1].reshape(-1,1), delta_out[:,t].reshape(-1,1))
        W_grad += np.dot(z[:,t].reshape(-1,1), delta[:,t].reshape(1,-1))
        W_in_grad += np.dot(X.T, delta[:,t].reshape(1,-1))
    
    # 勾配適用
    W_in -= learning_rate * W_in_grad
    W_out -= learning_rate * W_out_grad
    W -= learning_rate * W_grad
    
    W_in_grad *= 0
    W_out_grad *= 0
    W_grad *= 0
    
    if(i % plot_interval == 0):
        all_losses.append(all_loss)        
        out_int = 0
        for index,x in enumerate(reversed(out_bin)):
            out_int += x * pow(2, index)

lists = range(0, iters_num, plot_interval)
plt.plot(lists, all_losses, label="loss")
plt.savefig("BinaryAdd.png")

損失関数は以下の図のようになり、lossが十分に小さくなっていることから正しく学習できていると考えられる。


確認テスト

確認テスト RNN①

RNNのネットワークには大きくわけて3つの重みがある。一つは入力から現在の中間層を定義する際にかけられる重み、もう一つは中間層から出力を定義する際にかけられる重みである。残り一つの重みについて説明せよ。


★解答:
一時点前の中間層の情報を現在の中間層に渡す際の重み

演習チャレンジ RNN①

★解答:2
 活性化関数にかけるのは表現leftとrightを合わせたものをに重みをかけることによって実現するため。

コード演習問題 RNN①

★解答:2
 中間層出力が過去の中間層出力に依存していることと、損失関数を重みに関して偏微分すると過去に遡るたびにUがかけられることを考慮すると、delta_t = delta_t.dot(U)となる。

2. LSTM

 RNNの課題として、時系列を遡るほど勾配が消失していくことから長い時系列の学習が困難なことが挙げられる。この解決さくとして、LSTMがある。
 別の課題として、活性化関数に恒等関数を用いた際は勾配爆発が起きうる。勾配爆発とは層を逆伝搬するごとに指数関数的に勾配が大きくなる現象である。この解決策として勾配クリッピング法がある。これは勾配のノルムが閾値より大きい時に、ノルムを閾値に正規化する方法である。
 LSTMの全体像は以下のとおりである。

以下では、LSTMの各要素について説明する。上図の中央にあるCECは入力や中間層の情報などの記憶機能を持たせた部分である。LSTMでは中間層を記憶と、考える部分で分離しており、CECは記憶を担当している。勾配消失や、勾配爆発の解決策として、勾配が1であれば解決できるため勾配を1としている。

$$\begin{eqnarray}
\delta^{t-z-1}&=&\delta^{t-z}\left(Wf'(u^{t-z-1})\right)=1\\
\frac{\partial E}{\partial c^{t-1}}&=&\frac{\partial E}{\partial c^t}\frac{\partial c^t}{\partial c^{t-1}}=\frac{\partial E}{\partial c^t}\frac{\partial}{\partial c^{t-1}}(a^t-c^{t-1})=\frac{\partial E}{\partial c^t}
\end{eqnarray}$$

ただし、CECだけでは勾配が1であるため、時間データに対して重みが一律になってしまう。つまり、NNの学習特性がないことになってしまう。そこで周りに学習機能を持たせたのが入力・出力ゲートである。
 入力ゲートは入力情報に対して、「覚え方」をCECに指示する(入力データと内積をとる)。この覚え方を学習するのが入力ゲートである。出力ゲートはCECが覚えていた情報を「どのように利用するか」をCECに指示する(出力と内積)。この利用方法を学習するのが出力ゲートである。入出力ゲートを追加することで、それぞれのゲートへの入力値の重みを重み行列W、Uで可変可能にしている。ただし、このままではCECは過去の情報全てが保管されており、過去の情報がいらなくなっても削除することができない。そこで、過去の情報をいらなくなったタイミングで忘却できるようにしたのが忘却ゲートである。
 忘却ゲートでは今回の入力と前回の出力から記憶を調整するための関数を作り、CECが自己ループを行なって削除する。

$$C(t) = i(t)\cdot a(t)+ f(t)\cdot C(t-1)$$

ただし、C(t)が今回CECが覚えている情報であり、右辺第一項が今回覚える情報、第二項が不要な情報の調整である。
 ここまでの機能でもLSTMとして機能はするが、CEC自身の値がゲート制御に影響を与えていないことから、覗き穴結合を行うことで、CEC自身の値に重み行列を介して伝搬可能にしている。ただし、実際には覗き穴結合を行っても大きな効果は得られなかったことが知られている。

実装演習

kerasを用いる場合はモデルに簡単に指定することができる。何かのモデルを生成する際にLSTMを用いたい場合は以下のようにモデルとして組み込めば良い。

def rnn_model_maker(n_samples, n_out):
    # 3層RNN(リカレントネットワーク)を定義
    model = Sequential()
    # 中間層(RNN)を定義
    model.add(LSTM(units=n_units, input_shape=(n_rnn, 1), dropout=r_dropout, return_sequences=False))
    # 出力層を定義(ニューロン数は1個)
    model.add(Dense(units=n_out, activation='linear'))
    # 回帰学習モデル作成
    model.compile(loss='mean_squared_error', optimizer='rmsprop')

    return model

確認テスト

確認テスト LSTM①

以下の文章をLSTMに入力し空欄に当てはまる単語を予測したいとする。文章の「とても」という言葉は空欄の予測において、なくなっても影響を及ぼさないと考えられる。このような場合、dのゲートが作用すると考えられるか。
「映画おもしろかったね。ところで、とてもお腹が空いたからなにか___」


★解答:忘却ゲート

演習チャレンジ LSTM①

★解答:1
 クリッピングした勾配は、勾配×(閾値/勾配のノルム)となるためgradient×rateが答えとなる。

演習チャレンジ LSTM②

★解答:3
 新しいセルの状態は、計算されたセルへの入力と1ステップ前のセルの状態に入力ゲート・忘却ゲートをかけて足し合わせたものと表現される。

3. GRU

 GRUはLSTMの改良版である。従来のLSTMではパラメータが多数存在していたため、計算負荷が大きかった。LSTMの精度を保ちつつ、パラメータ数を削減したのがGRUである。GRUの全体像は以下の図のとおりである。

h(t)以下の式のとおり、隠れ層として計算状態を保持していることがわかる。LSTMと基本的な考え方は似ているが、ゲートの名称が更新ゲート/リセットゲートとなったこと、構造がシンプルになったことが差異となっている。

実装演習

GRUもLSTMと同様にモデルとして組み込むのは簡単である。どちらかというとなぜ、GRUが必要なのか、どういう場面で用いるのかを頭に入れる方が重要。

def rnn_model_maker(n_samples, n_out):
    # 3層RNN(リカレントネットワーク)を定義
    model = Sequential()
    # 中間層(RNN)を定義
    model.add(GRU(units=n_units, input_shape=(n_rnn, 1), dropout=r_dropout, return_sequences=False))
    # 出力層を定義(ニューロン数は1個)
    model.add(Dense(units=n_out, activation='linear'))
    # 回帰学習モデル作成
    model.compile(loss='mean_squared_error', optimizer='rmsprop')

    return model

確認テスト

確認テスト GRU①

LSTMとCECが抱える課題について、それぞれ簡潔に述べよ。


★解答

LSTM:パラメータ数が多く、計算不可が高い
CEC:勾配が1であり、学習能力がない

確認テスト GRU②

LSTMとGRUの違いを簡潔に述べよ


★解答
LSTM:3つのゲートとCECがある。パラメータが多い
GRU:2つのゲート、CECはない。パラメータは少ない

演習チャレンジ GRU①

★解答:4
 新しい中間状態は、1ステップ前の中間表現と計算された中間表現の和で表現される。このため更新ゲートzを用いて(1-z)×h+z×h_barと表現される。

4. 双方向RNN

 ここまでのRNNは過去の時系列的な情報を加味した学習を行うことができるモデルであった。一方で、文章の推敲や機械翻訳を行う際には、過去の情報だけでなく、未来の情報も加味することで精度が向上することが予想させる。そこで考えられたのが双方向RNNである。イメージとしては通常のRNNの特徴である中間層から中間層への情報の受け渡しを過去→未来だけでなく逆方向にも用意したものである。図に表すと以下のようになる。


演習チャレンジ 双方向RNN①

★解答:4
 双方向RNNでは順方向と逆方向に情報を伝搬した際の中間層表現を合わせたものが特微量となるため、concatenateを用いて結合したものが答えとなる。

5. Seq2Seq

 ここまでは一般的なRNNについて述べた。以降ではRNNを用いた自然言語処理に着目していくつかアルゴリズムを説明する。ここでは、1つの時系列データから別の時系列データを得る手法であるSeq2Seqについて説明する。
 Seq2Seqの全体像は以下のとおりである。

まずEncoder RNNではインプットしたテキストデータを単語等のトークンに区切って渡す処理を行う。この際、隠れ層には単語の並びをもとに文の意味がベクトルとして注入される。具体的には以下の3つの処理が行われる。

  • Taking : 文章をトークンごとに分割し、トークンごとのIDにする(one-hot-vector)
  • Embedding : IDから、そのトークンを表す分散表現ベクトルに変換する。(単語の意味が似ていものは数字が同じになるように圧縮する→意味の抽出)
  • Encoder RNN : ベクトルを順番にRNNに入力していく

手順としては、以下の通りである。

  1. ベクトル1をRNNに入力し、隠れ層を出力
  2. 1.の隠れ層と次の入力ベクトル2をまたRNNに入力した隠れ層を出力
  3. 1., 2.を繰り返す
  4. 最後のベクトルを入力した際の隠れ層を最終状態として保持
  5. この最終状態が「thought vector」と呼ばれ、入力した文の意味を表すベクトルとなる

 次にDecoder RNNではシステムがアウトプットしたデータを単語などのトークンごとに生成する機構造をもつ。処理手順は以下のとおりである。

  1. Decoder RNN : Encoder RNNのthought vectorから、各トークンの生成確率を出力する。最終状態をDecoder RNNの初期状態として設定し、Embeddingを入力する。
  2. Sampling : 生成確率に基づいてトークンをランダムに選ぶ
  3. Embedding : 2.で選ばれたトークンをEmbedding してDecoder RNNへの次の入力とする
  4. Detokenize : 1.~3.を繰り返し、2.で得られたトークンを文字列に直す。

 Seq2Seqに関する基本的な説明は以上である。ただし、このままでは1つの文に対して1つの答えしか用意することができない。そこで過去の文脈の情報も利用できるようにしたのがHREDである。
 HREDでは過去n-1個の発話から次の発話を生成する。具体的な構造としては「Seq2Seq+Context RNN」という形となる。Context RNNはEncoderのまとめた各文章の系列をまとめて、これまでの会話コンテキスト全体を表すベクトルに変換する構造をもつ。これにより、過去の発話の履歴を加味した返答が可能となる。
 ただし、HREDでも同じコンテキストを与えられても、答えは会話の流れとしては同じものしか出せない。つまり「会話の流れ」のような多様性はない。また、短く情報量に乏しい答え(相槌のみなど)を出力することが多かった。
 そこでHREDを改良したのがVHREDである。VHREDはHREDにVAEの潜在変数の概念を追加したものである。VAEについて説明する前にオートエンコーダの概念について説明する。まず、オートエンコーダは入力データから潜在変数zに変換するNNをEncoderとして、潜在変数zを入力とし、元画像を復元するNNをDecoderとする。これにより次元削減が行われる。ただし、オートエンコーダの場合は潜在変数zの構造が不明である。そこで、潜在変数zに確率分布z〜N(0,1)を仮定したものをVAEという。VAEを用いることで、データを潜在変数zの確率分布の構造に押し込めることが可能となる。


確認テスト

確認テスト Seq2Seq①

下記の選択肢から、Seq2Seqについて説明しているものを選べ。
(1)時刻に関して順方向と逆方向のRNNを構成し、それら2つの中間層表現を特微量として利用するものである。
(2)RNNを用いたEncoder-Decoder モデルの一種であり、機械翻訳などのモデルに使われる。
(3)構文木などの木構造に対して、隣接単語から表現ベクトル(フレーズ)を作るという演算を再帰的に行い(重みは共通)、文全体の表現ベクトルを得るニューラルネットワークである。
(4)RNNの一種であり、単純なRNNにおいて問題となる勾配消失問題をCECゲートの概念を導入することで解決したものである。


★解答:(2)
 (1)は双方向RNN、(3)は構文木、(4)はLSTM

確認テスト Seq2Seq②

seq2seqとHRED、HREDとVHREDの違いを簡潔に述べよ。


★解答
seq2seqとHRED:seq2seqは一つの文に対して1つの答えを出すのに対して、過去の文脈の情報を使えるようにした。
HREDとVHRED:VHREDはHREDに潜在変数の概念が追加されたもの。

確認テスト Seq2Seq③

VAEに関する下記の説明文中の空欄に当てはまる言葉を答えよ。
「自己符号化器の潜在変数に__を導入したもの」


★解答:確率分布N(0,1)

演習チャレンジ Seq2Seq①

★解答:1
 単語wはone-hotベクトルであり、単語埋め込みにより、別の特微量に変換する。これは埋め込み行列Eと内積を取ることで実現される。

Word2Vec

 RNNでは単語のような可変長の文字をNNに与えることができない。つまり、固定長形式で単語を表す必要がある。word2vecは単語の分散表現ベクトルを得る手法である。word2vecは学習データからボキャブラリを作成する。それをボキャブラリ×任意の単語ベクトル次元で重み行列を作成することで、メモリ量を抑えることができるようにした。従来ではボキャブラリ×ボキャブラリの行列になり、ボキャブラリが増えるほど、メモリ量の確保や、計算速度にも影響が大きかった。

 文字だけでまとめてもなかなか理解しづらい部分ではあるが、イメージとしては

king(の埋め込みベクトル) - man(の埋め込みベクトル) + woman (の埋め込みベクトル) = queen

が成り立つように埋め込みベクトルを決定する仕組みである。上の例は以下の論文にも記載されている。
https://arxiv.org/pdf/1301.3781.pdf


7. Attention Mechanism

 seq2seqでは単語数によらず文章中の単語を固定次元ベクトルの中に入力しなければならなかった。つまり、文章が長くなるほど、シーケンスの内部表現の次元も大きくする仕組みが必要であった。そこで、入力と出力のどの単語が関連しているのかの関連度を学習する仕組みを用いることで有限の大きさの隠れ層でも、重要な情報を選択的に抽出可能にしたのがAttention Mechanismである。

 現実的には(単純な)Attention 機構を施しただけのモデルでは明らかに精度がよくなることはほとんどないと言われている。ただし、精度を保ちつつ、予測の根拠を明確にしたい場合などAttention モデルを組み込むことはある。


確認テスト

確認テスト Attention Mech. ①

RNNとword2vec、seq2seqとAttentionの違いを簡潔に述べよ


★解答
RNNとword2vec : RNNでは固定長形式で単語を表していたが、word2vecでは任意の単語ベクトル次元重み行列を作成可能。
seq2seqとAttention : seq2sqでは文章を固定次元ベクトルに挿入していたため、文書が長くなるほど、内部表現の次元も大きくなる仕組みが必要であった。一方で、Attentionでは有限の大きさの隠れ層でも重要な情報を選択的に抽出可能な仕組みであるため、内部表現の次元は有限でよい。

IV 深層学習Day4

 ここでは、まず強化学習について説明した後に、実装例であるAlphaGoについて概要を述べる。その後、軽量化・高速化技術についてまとめ、応用例をいくつか紹介する。さらに機械翻訳に用いられるTransformerと、物体検知・セグメンテーションについてまとめる。

1. 強化学習

 まずは強化学習について説明する。強化学習は、行動の結果として与えられる報酬をもとに、行動を決定する原理を改善していく仕組みである。

 強化学習では探索と利用がトレードオフとなる。

  • 探索が足りない状態:過去のベストな行動のみを選択すると、よりよいものが見つからない
  • 利用が足りない状態:未知の行動のみ選択すると過去が活かせない

 強化学習は、今では応用例が多いものの、過去には計算速度に課題があった。単純な計算機の性能の向上も強化学習を可能にした要因の一つであるが、よりよい計算方式も登場した。それは関数近似法とQ学習を組み合わせる手法である。
 以下では強化学習で用いられる用語について簡単にまとめる。

  • 関数近似法:価値関数や方策関数を関数近似する手法
  • Q学習:行動価値関数を行動する毎に更新して学習をすすめる手法
  • 価値関数:状態価値関数と行動価値関数
    • 状態価値関数(V(s)):ある状態の価値に着目
    • 行動価値関数(Q(s,a)):状態と価値を組み合わせた価値に注目
  • 方策関数(Π(s,a)):ある状態でどのような行動を取るのかの確率を与える関数
  • 方策勾配法:方策をモデル化して最適化する手法。ニューラルネットワークでは誤差を「小さく」するのに対して、強化学習では報酬を「大きく」するので更新時の符号が逆になる。

$$\begin{eqnarray}
\theta^{(t+1)}&=&\theta^{(t)}+\epsilon\nabla J(\theta)\\
\nabla_{\theta}J(\theta) &=& E_{\pi_{\theta}}[(\nabla_{\theta}\log\pi_{\theta}(a|s)Q^{\pi}(s,a))]
\end{eqnarray}$$

 考察:強化学習はまだまだ成長分野であると感じているため取り残されないようにしたいと考えている。自動運転やロボットアームなど、一昔前では近未来的なものであった分野に応用が効くこともあり、機械学習→深層学習→強化学習→深層強化学習と時代が進んでいるように感じた。

2. AlphaGo

 ここでは囲碁プログラムのうち、AlphaGo LeeとAlphaGo Zeroについて概要を説明する。前提として、囲碁は想定される局面の状態が多いため、チェスのようなボードゲームよりもコンピュータが人間に勝つのが困難と考えられていた。AlphaGoではNNを応用している点で従来の囲碁プログラムとは異なる。
 AlphaGo LeeのPolicyNetでは畳こみを行い、最終層でSoftMax Layerを用いることで19×19マスの着手予想確率を出力する。SoftMax Layerは多次元を総和1の確率分布に変換するLayerである。一方、ValueNetでも同様に畳み込みを行うが、最終的な出力としては現局面の勝率を出力したいので、現結合層を2層通したあとに活性化関数としてtanhを用いる。出力は勝率が-1〜1で表される。
 実際の学習手順は以下のようになる。

  1. 教師あり学習によるRollOutPolicyとPolicyNetを学習
  2. 強化学習によるPolicyNetの学習
  3. 強化学習によるValueNetの学習

 1. ではまずRollOutPolicyとしてNNではなく線形の方策関数を用いる。これにより、探索中に高速で着手確率を算出することが可能になる。計算速度としてはPolicyNetに比べると1000倍程度である。またPolicyNetでは過去の棋譜データから教師と同じ手を予測できるように学習する。
 2. ではPolicyNetの強化学習として、現状のPolicyNetとPolicyPool(強化学習の過程の記録→過学習を抑制するために用いる。)からランダムに選択されたPolicyNetとで対局シミュレーションを行い、その結果から方策勾配法で学習する。
 3. ではPolicyNetを使用して対局シミュレーションを行い、その結果の勝敗を教師として学習する。
 学習手順には載せていないが、Q関数の学習時に用いられる手法としてモンテカルロ木探索がある。これは現局面から末端局面までPlayOutと呼ばれるランダムシミュレーションを多数回行い、勝敗を集計する。シミュレーション回数が、一定回数を超えると、その手を着手したあとの局面をシミュレーション開始局面とするように、探索木を成長させる。この木の成長で、一定条件下における最善主を返す手法である。

 次に、AlphaGo Leeの改良版(正確にはAlphaGo Leeは2代目、AlphaGo Zeroは5代目)としてAlphaGo Zeroについて説明する。AlphaGO Leeとの差分は以下のとおりである。

  • 強化学習のみを行う
  • 特微入力からヒューリスティック要素を排除
  • PolicyNetとValueNetを1つに結合
  • Residual Netを導入
  • モンテカルロ木探索からRollOutシミュレーションをなくした

 Residual Networkではネットワークにショートカット構造を追加することで、勾配の爆発・消失を抑える効果を狙ったものである。シュートカットを用いることでネットワークの深さを抑えることができる。これにより、勾配消失の抑制やアンサンブル効果を得ることができる。
 Residual Network の派生系がいくつかあるが、基本的な構造は変わらないためここでは名前の紹介にとどめる。

  • Residual Blockの工夫:Bottleneck, PreActivation
  • Network構造の工夫:WideResNet, PyramidNet

 AlphaGo Zeroの学習方法は自己対局による教師データの作成である。自己対局ではモンテカルロ木探索を行う。30手まではランダムでうち、それ以降は探索を行う。学習では自己対局のデータを教師データとして学習し、NetworkのPolicy部分の教師は着手選択確率分布を用い、Value部分の教師は勝敗を用いる。損失関数としてはPolicy部分はCrossEntropy, Value部分はMSEを用いる。またネットワークの更新では、現状のネットワークと学習後のネットワークで対局テストを行う。このテストの勝率に応じてネットワークを更新する。

 考察:AlphaGoのソースを実際に見てみたが、思っていたよりは大規模ではなかった。Pythonのコードの特性もあるが、C++で実装しようとおもったら(できるかどうかは別として)10倍ぐらいになりそうだと感じた。「囲碁のAI」と効くと難しい・複雑なコードなんだろうと考えるかもしれないが、原理をしっかりと理解しておけば基本的な部分は理解できると思う。

3. 軽量化・高速化技術

 深層学習や、強化学習では効率の良い学習が必要となる。精度がいかに高くても学習に何百年もかかっていては現実に用いることはできないので。単体の計算機の性能の向上には限度があることから複数のワーカーを用いることによる高速化を行うための手法がいくつか考えられている。ここではデータ並列化・モデル並列化・GPUによる高速技術について説明する。

 まずデータ並列化では、データを分割し、親モデルを各ワーカーに子モデルとしてコピーすることで各ワーカーごとに計算させる手法である。データ並列化には同期型と非同期型がある。同期型では各ワーカーが計算が終わるのを待ち、各ワーカーの勾配が算出された後に勾配の平均を計算し、親モデルのパラメータを更新する。パラメータ更新後のモデルを子モデルとしてコピーして次の学習を始める非同期型では各ワーカーは互いの計算を待たず、子モデルごとに更新を行う。つまり、学習が終わった子モデルはパラメータサーバにPushする。新たな学習を始める際はパラメータサーバからPopしたモデルを用いる。非同期型では同期型に比べて処理は早いが、最新のモデルのパラメータを学習できないので、学習が不安定になりやすい(Scale Gradient Problem)。

 次に、モデル並列化は親モデルを各ワーカーに分割し、それぞれのモデルを学習する。その後全てのデータで学習が終わった時点で1つのモデルに復元する。実際にはモデルやデータの大きさによってデータ並列化とモデル並列化を使い分ければよく、モデルが大きい時はモデル並列化を、データが大きい時はデータ並列化を用いると良い。

 モデルやデータの工夫とともにハードウェアの性能を上げることも重要である。ゲーミングPCで使われるGPUや、グラフィック以外の用途で使用されるGPUであるGPGPUがある。GPGPUの開発環境はCUDAやOpenCLがある。現実的にはNVIDIA社のGPUを使用している場合が多いと思うのでCUDAが必要と思うかもしれないが、DLフレームワーク内で実装されているので、使用する際は指定するだけでよい。

 最後にモデルを軽量化する以下の手法について説明する。

  • 量子化
  • 蒸留
  • プルーニング

 量子化では、簡単にはパラメータの精度を落とす手法である。メモリ上に保存するのは大半は重みなので64bitもいらないことも多い。計算の高速化、省メモリ化が行える一方で精度が悪くなることには変わりはない。ただし、精度を悪くしないようにTensorFlowではbfloat16という型がある。
 次に蒸留とは、規模の大きなモデルの知識を用いて、軽量なモデルの作成を行う手法である。蒸留では教師モデルと生徒モデルの2つで構成される。

  • 教師モデル:予測精度の高い、複雑なモデルやアンサンブルされたモデル
  • 生徒モデル:教師モデルをもとに作られる軽量なモデル

学習方法としては、教師モデルの重みを固定rし、生徒モデルの重みを更新していく。誤差は教師モデルと生徒モデルのそれぞれの誤差を用いる。これにより、学習回数は少なくでも精度の良いモデルが作成可能であることが知られている。

 最後にプルーニングではモデルの精度に寄与しないニューロンを削減する手法する。モデルの精度に寄与しないとは重みが閾値以下であることを指し、これによりモデルの圧縮を行うことができる。研究結果としては9割以上のニューロンがなくても、精度には大きく影響しないことも知られている。

4. 応用モデル

 

 まずは、画像認識でDLモデルの軽量化・高速化・高精度化を実現したモデルがMobileNetである。一般的な畳み込みレイヤーは以下の通りである。

  • 入力特微マップ(チャネル数):H×W×C
  • 畳込みカーネルのサイズ:K×K×C
  • 出力チャネル数(フィルタ数):M
  • ストライド1でパディングを適用した場合の畳み込み計算の計算量
    :H×W×K×K×C×M

これでは計算量が多いため、MobileNetではDepthwise ConvolutionとPointwise Convolutionの組み合わせで軽量化を実現する。Depthwise Convolutionでは入力マップのチャネルごとに畳み込みを実施し、出力マップをそれらと結合する。用いるカーネルサイズはK×K×1, フィルタ数1であり、出力マップの計算量はH×W×K×K×Cとなる。Pointwise Convolutionでは、入力マップのポイントごとの畳み込みを実施することで、出力マップはフィルタ数分だけ作成が可能になる。これにより計算量はH×W×C×Mとなる。
 まとめると以下のとおりである。

  • Depthwise Separable Convolution : 計算量の削減
  • Depthwise Convolution : チャネルごとに空間方向に畳み込む→計算量は1/フィルタ数
  • Pointwise Convolution : チャネル方向に畳み込む→計算量は1/(カーネルサイズ)^2

 次に、画像認識のネットワークであるDenseNetについて説明する。特徴としてはConvolution層の後にDense Blockを挟む点である。入力がl×kの場合、出力は(l+1)×kとなる。kはgrowth rateと呼ばれるハイパーパラメータである。ただし、このままではチャネル数が増えてしまっているので減らす必要がある。これがTransition Layerである。Transition Layerではダウンサンプリングを行い、Dense blockをつなぐ。似たようなネットワークとしてResNetがあるが、ResNetでは前1層の入力のみを後方の層に入力しているのに対し、DenseNetは前方の各層の出力全てが後方の層に入力される。DenseNetの特徴マップの入力に対し、Batch正規化が行われる。

 最後に、生の音声波形を生成する深層学習モデルであるWaveNetについて説明する。WaveNetでは時系列データに畳み込み(Dilated convolution)を適用する。Dilated convolutionでは、層が深くなるにつれて畳み込むリンクを離す。これにより受容野を簡単に増やすことができる。

実装演習

 DenseNetについて実装。CIFAR-10で確認

from keras.layers import Conv2D, Activation, BatchNormalization, Concatenate, AveragePooling2D, Input, GlobalAveragePooling2D, Dense
from keras.models import Model
from keras.optimizers import Adam
from keras.datasets import cifar10
from keras.utils import to_categorical
from keras.preprocessing.image import ImageDataGenerator
import pickle
import numpy as np

class DenseNetSimple:
    def __init__(self, growth_rate, compression_factor=0.5, blocks=[1,2,4,3]):
        # 成長率(growth_rate):DenseBlockで増やすフィルターの数
        self.k = growth_rate
        # 圧縮率(compression_factor):Transitionレイヤーで圧縮するフィルターの比
        self.compression = compression_factor
        # モデルの作成
        self.model = self.make_model(blocks)

    # DenseBlockのLayer
    def dense_block(self, input_tensor, input_channels, nb_blocks):
        x = input_tensor
        n_channels = input_channels
        for i in range(nb_blocks):
            # 分岐前の本線
            main = x
            # DenseBlock側の分岐
            x = BatchNormalization()(x)
            x = Activation("relu")(x)
            # Bottle-Neck 1x1畳み込み
            x = Conv2D(128, (1, 1))(x)
            x = BatchNormalization()(x)
            x = Activation("relu")(x)
            # 3x3畳み込み フィルター数は成長率
            x = Conv2D(self.k, (3, 3), padding="same")(x)
            # 本線と結合
            x = Concatenate()([main, x])
            n_channels += self.k
        return x, n_channels

    # Transition Layer
    def transition_layer(self, input_tensor, input_channels):
        n_channels = int(input_channels * self.compression)
        # 1x1畳み込みで圧縮
        x = Conv2D(n_channels, (1, 1))(input_tensor)
        # AveragePooling
        x = AveragePooling2D((2, 2))(x)
        return x, n_channels

    # モデルの作成
    def make_model(self, blocks):
        # blocks=[6,12,24,16]とするとDenseNet-121の設定に準じる
        input = Input(shape=(32,32,3))
        # 端数を出さないようにフィルター数16にする
        n = 16
        x = Conv2D(n, (1,1))(input)
        # DenseBlock - TransitionLayer - DenseBlock…
        for i in range(len(blocks)):
            # Transition
            if i != 0:
                x, n = self.transition_layer(x, n)
            # DenseBlock
            x, n = self.dense_block(x, n, blocks[i])
        # GlobalAveragePooling(チャンネル単位の全平均)
        x = GlobalAveragePooling2D()(x)
        # 出力層
        output = Dense(10, activation="softmax")(x)
        # モデル
        model = Model(input, output)
        return model

    # 訓練
    def train(self, X_train, y_train, X_val, y_val):
        # コンパイル
        self.model.compile(optimizer=Adam(), loss="categorical_crossentropy", metrics=["acc"])
        # Data Augmentation
        datagen = ImageDataGenerator(
            rescale=1./255,
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            channel_shift_range=50,
            horizontal_flip=True)
        # 訓練
        #history = self.model.fit(X_train, y_train, batch_size=128, epochs=1, validation_data=(X_val, y_val)).history
        # 水増しありの訓練
        history = self.model.fit_generator(datagen.flow(X_train, y_train, batch_size=128),
            steps_per_epoch=len(X_train) / 128, validation_data=(X_val, y_val), epochs=1).history
        # 保存
        with open("history.dat", "wb") as fp:
            pickle.dump(history, fp)

if __name__ == "__main__":
    # k=16の場合
    densenet = DenseNetSimple(16)
    # densenet.model.summary()

    # CIFAR-10の読み込み
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    # X_train = (X_train / 255.0).astype("float32")
    X_test = (X_test / 255.0).astype("float32")
    y_train, y_test = to_categorical(y_train), to_categorical(y_test)

    densenet.train(X_train, y_train, X_test, y_test)

結果

原著論文(https://arxiv.org/abs/1608.06993)でもデータの水増しを行なっていたが、左図の結果が示すように水増しを行わないと過学習状態になる。データの水増し+モデルの深さの調整を行うと、右図のように比較的ましにはなった。

5. Transformer

 ここでは、機械翻訳のモデルとしてTransformerについてまとめる。機械翻訳はSeq2Seqなどのニューラル機械翻訳もあるが、翻訳元の文の内容を一つのベクトルで表現するために文が長くなると表現力が不足する。そこで統計的機械翻訳モデルが登場した。Transformerについて説明するにあたり、Attentionについて説明する。
 Attentionは翻訳先の各単語を選択する際に、翻訳元の文中の各単語の隠れ状態を利用する手法である。注意を分岐しているイメージである。Attentionは辞書オブジェクトであり、{query, key, value}に分類される。queryに一致するkeyを索引し、対応するvalueを取り出す機構である。
 TransformerはRNNを使わずにAttentionのみを用いるモデルである。Transformerで用いるAttentionには2種類ある。

  • Source Target Attention : keyとvalueをsourceとして、queryをターゲットとする。これは受け取った情報に対して、targetが近いものをattention vector として取り出して着目する。
  • Self-Attention : query, key, valueをsourceとする。これは入力を全て同じにして、注意箇所を決めていく機構である。

 Transformer-EncoderではSelf-Attentionにより、文脈を考慮して各単語をエンコードすることが可能になる。EncoderもDecoderも6層あり、各層で二種類のAttentionを用いている。AttentionyやFeed Forward層のあとにAdd, Norm層がある。

  • Add(Residual Connection)
    • 入出力の差分を学習させる。(実際は出力を入力にそのまま加算)
    • 効果:学習、テストエラーの低減
  • Norm(Layer Normalization)
    • 各層においてバイアスを除く活性化関数のへ入力を平均0、分散1に正則化
    • 効果:学習の高速化

 上で述べたようにTransformerではRNNを用いないため、単語列の語順情報を追加する必要がある。単語の位置情報を追加する必要がある。ここで用いられるのがPosition Encodingである。Position Encoding は単語の位置情報をエンコードする。

$$\begin{eqnarray}
PE_{(pos,2i)} &=& \sin\left(\frac{pos}{10000^{2i/512}}\right)\\
PE_{(pos,2i+1)} &=& \cos\left(\frac{pos}{10000^{2i/512}}\right)
\end{eqnarray}$$

 機械翻訳モデルとしてはEncoder-Decoder Model→Transformer→BERTといった流れで変遷していた。ただし、Encoder-Decoder ModelとAttentionを組み合わせたTransformerがベースとなる技術として重要であるため、ここではTransformerのみに着目してまとめた。

実装演習

pytorchで実装。ただし、テスト用の入力を出力するように学習するモデルの実装までであり、実際の(よりリアルな)文章に関して処理をしたわけではない。ここでは学習の結果として入力=出力となっていれば実装としてはOKとみなす。

import os
import sys
import time
import math
import argparse
import numpy as np
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

BOS_ID = 1
EOS_ID = 2
PAD_ID = 3

class SimpleAttention(nn.Module):
    def __init__(self, dim):
        super(SimpleAttention, self).__init__()

        self.dim = dim

        self.q_lin = nn.Linear(dim, dim)
        self.k_lin = nn.Linear(dim, dim)
        self.v_lin = nn.Linear(dim, dim)
        self.out_lin = nn.Linear(dim, dim)

    def forward(self, x, memory, attension_mask):
        query = self.q_lin(x)
        key = self.k_lin(memory)
        value = self.v_lin(memory)

        logit = torch.matmul(query, torch.transpose(key, 1, 2))
        attension_weight = F.softmax(logit, dim=2)

        output = torch.matmul(attension_weight, value)

        return self.out_lin(output)

class MultiHeadAttention(nn.Module):
    def __init__(self, n_heads, dim, dropout):
        super(MultiHeadAttention, self).__init__()

        self.n_heads = n_heads
        self.dim = dim
        self.dropout = dropout

        self.q_lin = nn.Linear(dim, dim)
        self.k_lin = nn.Linear(dim, dim)
        self.v_lin = nn.Linear(dim, dim)
        self.out_lin = nn.Linear(dim, dim)

    def forward(self, x, memory, mask):
        batch_size, _, dim = x.shape
        assert dim == self.dim, 'dimension mismatched'

        dim_per_head = dim // self.n_heads

        def split(x):
            x = x.view(batch_size, -1, self.n_heads, dim_per_head)
            return x.transpose(1, 2)

        def combine(x):
            x = x.transpose(1, 2)
            return x.contiguous().view(batch_size, -1, dim)

        q = split(self.q_lin(x))
        k = split(self.k_lin(memory))
        v = split(self.v_lin(memory))

        q = q / math.sqrt(dim_per_head)

        shape = mask.shape
        mask_shape = (shape[0], 1, shape[1], shape[2]) if mask.dim() == 3 else (shape[0], 1, 1, shape[1])
        logit = torch.matmul(q, k.transpose(2, 3))
        logit.masked_fill_(mask.view(mask_shape), -float('inf'))

        weights = F.softmax(logit, dim=-1)
        weights = F.dropout(weights, p=self.dropout, training=self.training)

        output = torch.matmul(weights, v)
        output = combine(output)

        return self.out_lin(output)

class FeedForward(nn.Module):
    def __init__(self, dim_in, dim_hidden, dim_out, dropout):
        super(FeedForward, self).__init__()

        self.dropout = dropout
        self.hidden = nn.Linear(dim_in, dim_hidden)
        self.out = nn.Linear(dim_hidden, dim_out)

    def forward(self, x):
        x = self.hidden(x)
        x = F.relu(x)
        x = self.out(x)
        return F.dropout(x, p=self.dropout, training=self.training)

class ResidualNormalizationWrapper(nn.Module):
    def __init__(self, dim, layer, dropout):
        super(ResidualNormalizationWrapper, self).__init__()

        self.layer = layer
        self.dropout = dropout
        self.normal = nn.LayerNorm(dim)

    def forward(self, input, *args):
        x = self.normal(input)
        x = self.layer(x, *args)
        x = F.dropout(x, p=self.dropout, training=self.training)
        return input + x

class TransformerModel(nn.Module):
    def __init__(self, config, is_decoder):
        super(TransformerModel, self).__init__()

        self.config = config
        self.is_decoder = is_decoder
        self.n_layers = config.n_layers
        self.n_heads = config.n_heads
        self.dim = config.dim
        self.dim_hidden = config.dim * 4
        self.dropout = config.dropout

        self.token_embeddings = nn.Embedding(config.vocab_size, config.dim)
        self.position_embeddings = nn.Embedding(config.n_words, config.dim)

        self.layer_norm_emb = nn.LayerNorm(self.dim)

        self.attentions = nn.ModuleList()
        self.source_attentions = nn.ModuleList()
        self.ffns = nn.ModuleList()
        for _ in range(self.n_layers):
            attention = MultiHeadAttention(self.n_heads, self.dim, self.dropout)
            self.attentions.append(ResidualNormalizationWrapper(self.dim, attention, self.dropout))

            if (is_decoder):
                source_attention = MultiHeadAttention(self.n_heads, self.dim, self.dropout)
                self.source_attentions.append(ResidualNormalizationWrapper(self.dim, source_attention, self.dropout))

            ffn = FeedForward(self.dim, self.dim_hidden, self.dim, self.dropout)
            self.ffns.append(ResidualNormalizationWrapper(self.dim, ffn, self.dropout))

        if is_decoder:
            self.pred_layer = nn.Linear(config.dim, config.vocab_size).to(config.device)

        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def _get_mask(self, input, causal):
        device = self.config.device
        pad_tensor = torch.empty(1).fill_(PAD_ID).expand_as(input)
        mask = input.to(dtype=torch.int, device=device) == pad_tensor

        if not causal:
            return mask.to(device), mask.to(device)

        batch_size, n_words = input.size()
        shape = (batch_size, n_words, n_words)
        source_mask_np = np.triu(np.ones(shape), k=1).astype('uint8')
        source_mask = torch.from_numpy(source_mask_np) == 1
        pad_mask = mask.unsqueeze(-2)
        source_mask = pad_mask | source_mask

        return mask.to(device), source_mask.to(device)

    def forward(self, input, src_enc=None, src_mask=None, causal=False):
        batch_size, n_sentences = input.size()
        (mask, att_mask) = self._get_mask(input, causal)

        positions = torch.arange(n_sentences).to(dtype=torch.long, device=self.config.device).unsqueeze(0)
        x = self.token_embeddings(input.to(dtype=int))
        x = x + self.position_embeddings(positions).expand_as(x)

        x = self.layer_norm_emb(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x[mask] = 0

        for i in range(self.n_layers):
            x = self.attentions[i](x, x, att_mask)

            if self.is_decoder:
                x = self.source_attentions[i](x, src_enc, src_mask)

            x = self.ffns[i](x)
            x[mask] = 0

        return x

    def predict(self, x):
        return F.log_softmax(self.pred_layer(x), dim=-1)

class Trainer(object):
    def __init__(self, config):
        self.config = config
        self.encoder = TransformerModel(config, is_decoder=False).to(config.device)
        self.decoder = TransformerModel(config, is_decoder=True).to(config.device)

        self.optimizer_enc = self._get_optimizer(self.encoder)
        self.scheduler_enc = self._get_scheduler(self.optimizer_enc)

        self.optimizer_dec = self._get_optimizer(self.decoder)
        self.scheduler_dec = self._get_scheduler(self.optimizer_dec)

        self.criterion = LabelSmoothing(config.vocab_size, 0.1).to(config.device)

        if os.path.isfile(config.model_path):
            data = torch.load(config.model_path, map_location=config.device_name)
            self.encoder.load_state_dict(data['encoder'])
            self.decoder.load_state_dict(data['decoder'])
            self.optimizer_enc.load_state_dict(data['optimizer_enc'])
            self.optimizer_dec.load_state_dict(data['optimizer_dec'])
            print(f'load model from {config.model_path}')

        self.start_time = time.time()
        self.steps = 0
        self.stats = {
            'sentences': 0,
            'words': 0,
            'loss': 0.0,
        }

    def save(self):
        data = {
            'encoder': self.encoder.state_dict(),
            'decoder': self.decoder.state_dict(),
            'optimizer_enc': self.optimizer_enc.state_dict(),
            'optimizer_dec': self.optimizer_dec.state_dict(),
        }
        torch.save(data, self.config.model_path)
        print(f'save model to {self.config.model_path}')

    def _get_optimizer(self, model):
        return optim.Adam(model.parameters(), lr=1.0, betas=(0.9, 0.98), eps=1e-9)

    def _get_scheduler(self, optimizer):
        dim = self.config.dim
        warmup = self.config.warmup_steps

        def update(step):
            current_step = step + 1
            return 2 * dim ** -0.5 * min(current_step ** -0.5, current_step * warmup ** -1.5)

        return optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=update)

    def _get_batch_copy_task(self):
        vocab_size = self.config.vocab_size
        batch_size = self.config.batch_size
        n_words = self.config.n_words

        data = np.random.randint(PAD_ID+1, vocab_size, size=(batch_size, n_words))

        min_words = 5
        eos_indexes = np.random.randint(min_words, n_words, size=batch_size)
        for i in range(batch_size):
            index = eos_indexes[i]
            data[i][index] = EOS_ID
            data[i][index+1:] = PAD_ID

        data[:, 0] = BOS_ID
        data = torch.from_numpy(data).requires_grad_(False).to(self.config.device, dtype=torch.int)
        return (data.clone(), data)

    def __generate(self, x):
        self.encoder.eval()
        self.decoder.eval()

        max_len = self.config.n_words
        src_mask = x == PAD_ID
        enc_output = self.encoder(x)

        batch_size, _ = x.shape
        generated = torch.empty(batch_size, max_len).fill_(PAD_ID)
        generated[:,0] = BOS_ID

        unfinished_sents = torch.ones(batch_size)

        for i in range(1, max_len):
            dec_output = self.decoder(generated[:, :i], enc_output, src_mask, True)
            gen_output = self.decoder.predict(dec_output[:, -1])
            _, next_words = torch.max(gen_output, dim=1)

            generated[:, i] = next_words * unfinished_sents + PAD_ID * (1 - unfinished_sents)

            unfinished_sents.mul_(next_words.ne(EOS_ID).long())
            if unfinished_sents.max() == 0:
                break

        return generated

    def step(self, data=None):
        self.encoder.train()
        self.decoder.train()

        if data is None:
            x, y = self._get_batch_copy_task()
        else:
            (x, y) = data

        enc_output = self.encoder(x)
        dec_output = self.decoder(y[:, :-1], enc_output, x == PAD_ID, True)

        gen_output = self.decoder.predict(dec_output)

        self.optimizer_enc.zero_grad()
        self.optimizer_dec.zero_grad()

        nwords = (y[:, 1:] != PAD_ID).sum().item()

        loss = self.criterion(gen_output, y[:, 1:], nwords)
        loss.backward()

        self.optimizer_enc.step()
        self.optimizer_dec.step()

        self.stats['loss'] = loss.item()
        self.stats['sentences'] += x.size(0)
        self.stats['words'] += nwords

    def step_end(self, step):
        self.steps += 1
        self._print_log()

        self.scheduler_enc.step()
        self.scheduler_dec.step()

    def _print_log(self):
        if self.steps % args.log_interval != 0:
            return

        current_time = time.time()
        elapsed_time = current_time - self.start_time
        lr = self.optimizer_enc.param_groups[0]['lr']
        print('step: {}, loss: {:.2f}, tokens/sec: {:.1f}, lr: {:.6f}'.format(self.steps, self.stats['loss'], self.stats['words'] / elapsed_time, lr))

        self.start_time = current_time
        self.stats['sentences'] = 0
        self.stats['words'] = 0

    def generate_test(self):
        self.encoder.eval()
        self.decoder.eval()

        data = MTDataset(args, 'test')
        dataloader = torch.utils.data.DataLoader(data, batch_size=args.batch_size)

        x, _ = next(iter(dataloader))
        x = x.to(device)

        generated = self.__generate(x)

        for i in range(x.size(0)):
            print(' '.join([str(id) for id in x[i].tolist()]))
            print(' '.join([str(int(id)) for id in generated[i].tolist()]))
            # print('input : {}'.format(x[i]))
            # print('output: {}'.format(word_ids[i])) 
            # print('') 

class LabelSmoothing(nn.Module):
    def __init__(self, size, smoothing):
        super(LabelSmoothing, self).__init__()
        self.criterion = nn.KLDivLoss(reduction='sum')
        self.smoothing = smoothing
        self.size = size

    def forward(self, x, target, nwords):
        x = x.contiguous().view(-1, self.size)
        target = target.contiguous().view(-1)

        true_dist = x.data.clone()
        true_dist.fill_(self.smoothing / (self.size - 2))
        true_dist.scatter_(1, target.unsqueeze(1).to(dtype=torch.long), 1.0 - self.smoothing)
        true_dist[:, PAD_ID] = 0

        mask = torch.nonzero(target.data == PAD_ID)
        if mask.dim() > 0:
            true_dist.index_fill_(0, mask.squeeze(), 0.0)

        return self.criterion(x, true_dist.requires_grad_(False)) / nwords

class MTDataset(torch.utils.data.Dataset):
    def __init__(self, config, type):
        self.config = config
        self.data = {} 

        dataroot = config.dataroot
        for lang in [config.src, config.tgt]:
            path = "{}/{}.{}".format(dataroot, type, lang)
            if not os.path.isfile(path):
                continue

            with open(path, 'r') as f:
                lines = f.read().splitlines()
                data = np.empty((len(lines), self.config.n_words))
                data.fill(PAD_ID)
                for row in range(len(lines)):
                    line = lines[row]
                    if not line:
                        continue

                    array = line.split(' ')
                    word_count = len(array)
                    assert word_count <= self.config.n_words, f'the sentence that has many words we expected. row: {row}, words: {word_count}'

                    for col in range(len(array)):
                        data[row][col] = int(array[col])

            self.data[lang] = torch.from_numpy(data).to(dtype=torch.int)

    def __len__(self):
        if self.config.src in self.data:
            return len(self.data[self.config.src])

        return 0

    def __getitem__(self, idx):
        return (self.data[self.config.src][idx], self.data[self.config.tgt][idx])



if __name__ == '__main__':
    parser = argparse.ArgumentParser(add_help=True)
    parser.add_argument('--cpu', action='store_true', help='use cpu')
    parser.add_argument('--dataroot', default='data', help='path to data')
    parser.add_argument('--src', default='ja', help='source language')
    parser.add_argument('--tgt', default='en', help='target language')
    parser.add_argument('--epochs', type=int, default=10, help='epoch count')
    parser.add_argument('--batch_size', type=int, default=2, help='size of batch')
    parser.add_argument('--log_interval', type=int, default=5, help='step num to display log')
    parser.add_argument('--vocab_size', type=int, default=8, help='vocabulary size for copy task')
    parser.add_argument('--n_layers', type=int, default=3, help='number of layers')
    parser.add_argument('--n_heads', type=int, default=8, help='number of heads for multi head attention')
    parser.add_argument('--n_words', type=int, default=10, help='number of words max')
    parser.add_argument('--dim', type=int, default=8, help='dimention of word embeddings')
    parser.add_argument('--dropout', type=int, default=0.1, help='rate of dropout')
    parser.add_argument('--warmup_steps', type=int, default=4000, help='adam lr increases until this steps have passed')
    parser.add_argument('--model', default='model.pth', help='file to save model parameters')
    parser.add_argument('--generate_test', action='store_true', help='only generate translated sentences')
    parser.add_argument('--train_test', action='store_true', help='training copy task with random value')
    args = parser.parse_args()
    print(args)

    is_cpu = args.cpu or not torch.cuda.is_available()
    device_name = "cpu" if is_cpu else "cuda:0"
    device = torch.device(device_name)

    args.device_name = device_name
    args.device = device
    args.model_path = f'{args.dataroot}/{args.model}'

    trainer = Trainer(args)

    if args.generate_test:
        args.src = 'en'
        args.tgt = 'en'
        trainer.generate_test()
        sys.exit()

    data_train = MTDataset(args, 'train')
    dataloader = torch.utils.data.DataLoader(data_train, batch_size=args.batch_size)

    for epoch in range(args.epochs):
        start_time = time.time()

        if args.train_test:
            for i in range(100):
                trainer.step()
                trainer.step_end(i)
            trainer.save()
            continue

        print(f'start epoch {epoch}')
        for i, (x, y) in enumerate(dataloader):
            x = x.to(device)
            y = y.to(device)
            trainer.step((x, y))
            trainer.step_end(i)
                      
        trainer.save()

結果

#学習データ
1 5 6 7 2
1 4 5 7 7 4 5 7 5 6
input  : 1 5 6 7 2 3 3 3 3 3
output : 1 5 6 7 2 3 3 3 3 3
input  : 1 4 5 7 7 4 5 7 5 6
output : 1 4 5 7 7 4 5 7 5 2

6. 物体検知・セグメンテーション

 物体認識タスクは4種類ある。

  • 分類:クラスラベル(画像に対して単一/複数)
  • 物体検知:Bounding Boxを作成
  • Semantic Segmentation : 各ピクセルに対し単一のクラスラベル
  • Instance Segmentation : 各ピクセルに対し単一のクラスラベル

分類以外では位置が考慮されている。また、Semantic SegmentationとInstance Segmantationの違いは前者ではインスタンスの区別分類をしていないのに対して後者ではしている点である。

 物体検知を行う上で、代表的なデータセットについて紹介する。

クラスTrain+ValBox/画像Instance Annotation補足
VOC122011,5402.4
ILSVRC17200476,6681.1ImageNetのサブセット
MS COCO1880123,2877.3物体位置推定に対する評価指標追加
OICOD185001,743,0427.0Open Images V4のサブセット
サイズは一様ではない。

 物体検知の評価指標について説明する。まず、物体位置の予測精度の評価を行うIoU(Intersection over Union)の定義は以下のとおりである。

$$IoU = \frac{Area\ of\ Overlap}{Area\ of\ Union}=\frac{TP}{TP+FP+FN}$$

IoUは物体検知において、よく使われる指標であるが、実際にはconfidenceとIoUで閾値を用いる。
 次にAverage Precision; APについて説明する。confidenceの閾値を動かすことによってPrecision-Recall Curveを動かすことができる。この時の平均を考えたのがAPである。

$$ AP = \int^1_0 P(R)dR $$

厳密には書くRecallのレベルに対して最大のPrecisionにスムージングを行い、クラスラベル固定の元で計算を行う。これに対してクラスの平均をとったのがmAP(mean Average Precision)である。

$$ mAP = \frac{1}{C}\sum_{i=1}^C AP_i $$

 以上は検出精度に関わる指標であるが、応用上の問題として、検出精度に加えて検出速度も重要な指標となる。検出速度の指標として、FPS( Flames per Second )がよく用いられる。

 物体検出の手法としてSSD(; Single Shot Detector)がある。SSDでは

  1. Default Boxを用意
  2. Default Boxを変形して、Confidenceを出力する

ネットワークアーキテクチャとしては

  • 入力は300 × 300 or 512 × 512
  • VGGの全結合層のうち、2層がConv.層に変換され、最後の全結合層は削除
  • マルチスケール特徴マップ(解像度の異なる特徴マップから出力を構成)を作成

する点が特徴となる。特徴マップからの出力はDefault Boxの数をk, 特徴マップのサイズがm × nとすると「k × ( #Class + 4 ) × m × n 」となる。クラス数+4となるのはオフセット項分である。「k × m × n」が特徴マップごとに用意するDefault Boxの数であることを考えると、Box数×クラス数となっていることが分かる。
 ただし、多数のDefault Boxを用意にしたことで弊害が生じ、これによる工夫について2つ説明する。まず、Non-Maximum Suppressionについては移っている物体が1つであったとしてもDefault Boxが多いことで無駄な計算をしてしまうことの解消手法である。具体的には、IoUを計算して、IoUが一定以上重なる場合はconfidenceの大きいものだけ残すという手法である。次に、Hard Negative Miningは背景(Negative クラス)と非背景のクラスが不均等になることを防ぐ手法である。具体的には、非背景:背景 = 1 : 3になるようにNagativeのBounding Boxを用意することで、不均等を防ぐ。

 最後にSemantic Segmantationについて概要をまとめる。Semantic Segmentationで重要な点は、畳み込みをして解像度が小さくなった情報を、Unpoolingしてもとのサイズまで戻す点である。SSDではDeconvolution/ Transposed convolutionを行なっている。これは、通常のConv.と同様にkernel size, padding, strideを指定し、

  1. 特徴マップのpixel間隔をstrideだけあける
  2. 特徴マップのまわりに(kernel size -1 ) -paddingだけ余白を作成する。
  3. 畳み込み演算を行う

ことによりUp-samplingをする。ただし、単純にUp-samplingを行うと、ローカルな情報(輪郭)が失われてしまう。そこで低レイヤーPooling層の出力をelement-wise additionすることでローカルな情報を補完してからUp-samplingする手法が用いられる。これを双方実装したモデルとしてU-Netがある。

 また、これとは別の手法としてUnpoolingがある。UnpoolingではPooling時に、どこに最大値が入っていたのかを覚えておき(switch variables)、復元時はその場所に値を戻すという手法である。畳みこみを行う手法にも、需要野を畳み込み時に広げる工夫がある( Dilated Convolution)。

 物体検知の歴史について講義でも触れられていたが、以下の論文に流れやaccuracyがどのようにモデルの発展に伴って遷移してきたかがまとめられている。
https://arxiv.org/pdf/1905.05055.pdf

 常に”最新”の手法は変わっていくが、根本となる重要なモデルは何かを見極めて学習をすることが大事となる。名前は変われど似たようなものが無数ある中で、ベースとなるモデルは限りがあるはずである。ベースを覆す新しいモデルの発明にも期待しつつ、情報収集を欠かさないようにしたい。

コメント

タイトルとURLをコピーしました