ナード戦隊データマン

データサイエンスを用いて悪と戦うぞ

TensorFlowで多層ニューラルネットワークを実装して回帰問題を解く

TensorFlowを使って機械学習アルゴリズムについて理解したいと思ったため、多層ニューラルネットワークを実装してみます。

多層ニューラルネットワークのクラス

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

class nnet:
    
    def __init__(self, X, random_state=3):
        self.sess = tf.Session()
        seed = random_state
        tf.set_random_seed(seed)
        np.random.seed(seed)
        self.x_data = tf.placeholder(shape=[None, X.shape[1]], dtype=tf.float32)
        self.y_target = tf.placeholder(shape=[None, 1], dtype=tf.float32)

    def init_weight(self, shape, st_dev):
        return tf.Variable(tf.random_normal(shape, stddev=st_dev))

    def init_bias(self, shape, st_dev):
        return tf.Variable(tf.random_normal(shape, stddev=st_dev))

    def fully_connected(self, input_layer, weights, biases):
        return tf.nn.relu(tf.add(tf.matmul(input_layer, weights), biases))

    def fit(self, X, y, hidden_size, batch_size=100, iter_size=200):

        x_data = self.x_data
        y_target = self.y_target

        final_output = self.build_hidden_layer(hidden_size, X.shape[1])
    
        self.loss = tf.reduce_mean(tf.abs(y_target - final_output))
        self.opt = tf.train.AdamOptimizer(0.05)
        self.train_step = self.opt.minimize(self.loss)
    
        init = tf.global_variables_initializer()
        self.sess.run(init)
    
        loss_vec = self.train(X, y, iter_size, batch_size)
        plt.plot(loss_vec)
        return final_output

    def predict(self, final_output, X):
        x_data = self.x_data
        return [val[0] for val in self.sess.run(final_output, feed_dict={x_data: X})]


    def train(self, X, y, iter_size, batch_size):
        loss_vec = []
        x_data = self.x_data
        y_target = self.y_target
        for i in range(iter_size):
            rand_index = np.random.choice(len(X), size=batch_size)
            rand_x = X[rand_index]
            rand_y = np.transpose([y[rand_index]])
            self.sess.run(self.train_step, feed_dict={x_data:rand_x, y_target: rand_y})
            loss_vec.append(self.sess.run(self.loss, feed_dict={x_data:rand_x, y_target: rand_y}))
        
            if (i+1)%25==0:
                print('Generation:'+str(i+1)+', Loss = '+str(loss_vec[-1]))

        return loss_vec

    
    def build_hidden_layer(self, hidden_size, col_size):
        weights = []
        biases = []
        layers = []
        tmp_size = col_size
        x_data = self.x_data
        last_layer = x_data
    
        for hsize in hidden_size:
            weights.append(self.init_weight(shape=[tmp_size, hsize], st_dev=10.0))
            biases.append(self.init_bias(shape=[hsize], st_dev=10.0))
            layers.append(self.fully_connected(last_layer, weights[-1], biases[-1]))
            tmp_size = hsize
            last_layer = layers[-1]
    
        weights.append(self.init_weight(shape=[tmp_size, 1], st_dev=10.0))
        biases.append(self.init_bias(shape=[1], st_dev=10.0))
        layers.append(self.fully_connected(last_layer, weights[-1], biases[-1]))
        final_output = layers[-1]
        
        return final_output

メソッド名を見れば何をやっているのかわかると思います。

隠れ層をfitメソッドで指定できるように作りました。重みの更新はtrain_stepで指定したOptimizerによるコスト関数最小化で行ってくれるので、バックプロパゲーションを手動で構築する必要はありません。

Jupyter notebook上で出生体重予測

In[1]:

import requests
import csv
import os

birthdata_url = 'https://raw.githubusercontent.com/nfmcclure/tensorflow_cookbook/master/01_Introduction/07_Working_with_Data_Sources/birthweight_data/birthweight.dat'
birth_file = requests.get(birthdata_url)
birth_data = birth_file.text.split('\r\n')
birth_header = birth_data[0].split('\t')
birth_data = [[float(x) for x in y.split('\t') if len(x) >= 1]  
              for y in birth_data[1:] if len(y) >= 1]

with open("birth_weight.csv", "w") as f:
    writer = csv.writer(f)
    writer.writerows([birth_header])
    writer.writerows(birth_data)
    f.close()

exec(open("nnet.py").read())

tensorflow_cookbookのgithubページからデータをダウンロードします。また、先程作ったnnetクラスのファイルを実行します。

In[2]:

import pandas as pd
from sklearn.model_selection import train_test_split

def normalize_cols(m):
    col_max = m.max(axis=0)
    col_min = m.min(axis=0)
    return np.nan_to_num((m-col_min)/(col_max-col_min))

df = pd.read_csv("birth_weight.csv")
target_name = ["BWT"]
X, y = df[df.columns.drop(target_name)], df[target_name]
X, y = X.as_matrix(), y.as_matrix()
X = normalize_cols(X)
X_train, X_test, y_train, y_test = train_test_split(X,y, random_state=0)

データを読み込み、訓練データとテストデータに分けます。このとき、min-maxスケーリングをすると収束が早まるので、実装します。

In[3]:

%matplotlib inline
nnetc = nnet(X_train)
model = nnetc.fit(X_train, y_train.ravel(), hidden_size=[25, 10, 3], iter_size=200)

Out[3]: f:id:mathgeekjp:20171005133950p:plain

訓練データにフィットさせます。損失関数の出力をプロットするように作ったため、プロットされます。reluを使っているため、収束は早いです。

In[4]:

from sklearn.metrics import confusion_matrix
preds = nnetc.predict(model, X_test)
preds_binary = np.array([1.0 if val<3000.0 else 0.0 for val in preds])
y_test_binary = np.array([1.0 if val<3000.0 else 0.0 for val in y_test.ravel()])

confusion_matrix(y_test_binary, preds_binary)

Out[4]:

array([[20,  5],
       [ 8, 15]])

体重3000g以上と予測したかどうかにより、2値化します。これにより、confusion_matrixでチェックします。多くは正しく予測されていますが、13例が間違った予測をしていることがわかります。

参考

1.Documentation scikit-learn: machine learning in Python — scikit-learn 0.19.0 documentation 2.GitHub - nfmcclure/tensorflow_cookbook: Code for Tensorflow Machine Learning Cookbook