ナード戦隊データマン

データサイエンスを用いて悪と戦うぞ

CapsuleNetをMNISTで試す

CapsuleNetとは、Google brainの人たちによって考案されたモデルです。ここでは、kaggleのdigit recognizerのコンペでCapsuleNetを使う方法を紹介します。ただ、参考にしたgithubリンクのソースをほとんど真似ているだけなので、日本語を読むのが面倒くさい人はリンクへ飛んでください。

(Jupyter Notebookで実行)

実装について

CapusleNetの実装は以下に基づいています。

レイヤーを定義する

まず、CapusleNetレイヤーを定義し、いくつかの便利な関数を定義します。今の所、まだ最適化はされていません。

In[1]:

import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

class Length(layers.Layer):
    
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

class Mask(layers.Layer):

    def call(self, inputs, **kwargs):
        if type(inputs) is list:  
            inputs, mask = inputs
        else:  
            x = inputs
            x = (x - K.max(x, 1, True)) / K.epsilon() + 1
            mask = K.clip(x, 0, 1)  

        inputs_masked = K.batch_dot(inputs, mask, [1, 1])
        return inputs_masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  
            return tuple([None, input_shape[0][-1]])
        else:
            return tuple([None, input_shape[-1]])


def squash(vectors, axis=-1):

    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm)
    return scale * vectors


class CapsuleLayer(layers.Layer):

    def __init__(self, num_capsule, dim_vector, num_routing=3,
                 kernel_initializer='glorot_uniform',
                 bias_initializer='zeros',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_vector = dim_vector
        self.num_routing = num_routing
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.bias_initializer = initializers.get(bias_initializer)

    def build(self, input_shape):
        self.input_num_capsule = input_shape[1]
        self.input_dim_vector = input_shape[2]

        self.W = self.add_weight(shape=[self.input_num_capsule, self.num_capsule, self.input_dim_vector, self.dim_vector],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.bias = self.add_weight(shape=[1, self.input_num_capsule, self.num_capsule, 1, 1],
                                    initializer=self.bias_initializer,
                                    name='bias',
                                    trainable=False)
        self.built = True

    def call(self, inputs, training=None):

        inputs_expand = K.expand_dims(K.expand_dims(inputs, 2), 2)

        inputs_tiled = K.tile(inputs_expand, [1, 1, self.num_capsule, 1, 1])

        inputs_hat = tf.scan(lambda ac, x: K.batch_dot(x, self.W, [3, 2]),
                             elems=inputs_tiled,
                             initializer=K.zeros([self.input_num_capsule, self.num_capsule, 1, self.dim_vector]))

        for i in range(self.num_routing):
            c = tf.nn.softmax(self.bias, dim=2)
            outputs = squash(K.sum(c * inputs_hat, 1, keepdims=True))

            if i != self.num_routing - 1:
                self.bias += K.sum(inputs_hat * outputs, -1, keepdims=True)
        return K.reshape(outputs, [-1, self.num_capsule, self.dim_vector])

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_vector])

def PrimaryCap(inputs, dim_vector, n_channels, kernel_size, strides, padding):
    output = layers.Conv2D(filters=dim_vector*n_channels, kernel_size=kernel_size, strides=strides, padding=padding)(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_vector])(output)
    return layers.Lambda(squash)(outputs)

モデルをビルドする

次に、モデルをビルドします。以下が参考となるネットワーク構造です。

Screenshot from 2017-11-11 19-57-27.png

注意点としては、X->yではなく、(X, y) -> (y, X)という方法を使っていることです。これは、GANと呼ばれる手法の一つで、クラス予測と同時に画像生成します。

In[2]:

from sklearn.model_selection import train_test_split
import pandas as pd
from keras import layers, models, optimizers
from keras import backend as K
from keras.utils import to_categorical
import numpy as np

def CapsNet(input_shape, n_class, num_routing):

    x = layers.Input(shape=input_shape)

    conv1 = layers.Conv2D(filters=256, kernel_size=9, strides=1, padding='valid', activation='relu', name='conv1')(x)
    primarycaps = PrimaryCap(conv1, dim_vector=8, n_channels=32, kernel_size=9, strides=2, padding='valid')
    digitcaps = CapsuleLayer(num_capsule=n_class, dim_vector=16, num_routing=num_routing, name='digitcaps')(primarycaps)
    out_caps = Length(name='out_caps')(digitcaps)


    y = layers.Input(shape=(n_class,))
    masked = Mask()([digitcaps, y])
    x_recon = layers.Dense(512, activation='relu')(masked)
    x_recon = layers.Dense(1024, activation='relu')(x_recon)
    x_recon = layers.Dense(np.prod(input_shape), activation='sigmoid')(x_recon)
    x_recon = layers.Reshape(target_shape=input_shape, name='out_recon')(x_recon)

    return models.Model([x, y], [out_caps, x_recon])


def margin_loss(y_true, y_pred):
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))

    return K.mean(K.sum(L, 1))


def train(model, data, epoch_size=100):

    (x_train, y_train), (x_test, y_test) = data

    model.compile(optimizer="adam",
                  loss=[margin_loss, 'mse'],
                  loss_weights=[1., 0.0005],
                  metrics={'out_caps': 'accuracy'})

    model.fit([x_train, y_train],[y_train, x_train], batch_size=100, epochs=epoch_size,
              validation_data=[[x_test, y_test], [y_test, x_test]])


    return model


def combine_images(generated_images):
    num = generated_images.shape[0]
    width = int(np.sqrt(num))
    height = int(np.ceil(float(num)/width))
    shape = generated_images.shape[1:3]
    image = np.zeros((height*shape[0], width*shape[1]),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1]] = \
            img[:, :, 0]
    return image


def test(model, data):
    x_test, y_test = data
    y_pred, x_recon = model.predict([x_test, y_test], batch_size=100)
    print('-'*50)
    print('Test acc:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1))/y_test.shape[0])

    import matplotlib.pyplot as plt
    from PIL import Image

    img = combine_images(np.concatenate([x_test[:50],x_recon[:50]]))
    image = img * 255
    Image.fromarray(image.astype(np.uint8)).save("real_and_recon.png")
    print()
    print('Reconstructed images are saved to ./real_and_recon.png')
    print('-'*50)
    plt.imshow(plt.imread("real_and_recon.png", ))
    plt.show()


def load_mnist(filename):
    data_train = pd.read_csv(filename)
    X_full = data_train.iloc[:,1:]
    y_full = data_train.iloc[:,:1]
    x_train, x_test, y_train, y_test = train_test_split(X_full, y_full, test_size = 0.3)
    x_train = x_train.values.reshape(-1, 28, 28, 1).astype('float32') / 255.
    x_test = x_test.values.reshape(-1, 28, 28, 1).astype('float32') / 255.
    y_train = to_categorical(y_train.astype('float32'))
    y_test = to_categorical(y_test.astype('float32'))
    return (x_train, y_train), (x_test, y_test)

訓練とテスト

上記のtrain関数を使って訓練します。訓練データはkaggleからダウンロードしたものです。

In[3]:

(x_train, y_train), (x_test, y_test) = load_mnist("../input/train.csv")
    
model = CapsNet(input_shape=[28, 28, 1], n_class=10, num_routing=3)
train(model=model, data=((x_train, y_train), (x_test, y_test)), epoch_size=4)

訓練にかなり時間がかかります。注意してください。次いでテストします。

In[4]:

test(model=model, data=(x_test, y_test))

Out[4]:

--------------------------------------------------
Test acc: 0.99

Reconstructed images are saved to ./real_and_recon.png
--------------------------------------------------

予測する

訓練して、精度もそれなりだったので検証としてはもう十分ですが、submissionデータを生成する方法を一応書きます(kaggleなので)

In[5]:

data_test = pd.read_csv('../input/test.csv')
data_test = data_test.values.reshape(-1, 28, 28, 1).astype('float32') / 255.
y_pred, _ = model.predict([data_test, 
                           np.zeros((data_test.shape[0],10))], 
                           batch_size = 32, verbose = True)

with open('submission.csv', 'w') as out_file:
    out_file.write('ImageId,Label\n')
    for img_id, guess_label in enumerate(np.argmax(y_pred,1),1):
        out_file.write('%d,%d\n' % (img_id, guess_label))

おわりに

CapsuleNetを使ってみる、ということだけが目標でしたが、高い精度が達成できました。訓練に数時間要しますが、GANを使わなければもっと訓練が早いと思います。最近のディープラーニング界隈は、こういった論文が多いので、それをコード化することができるスキルが重要かもしれません。

参考

  1. https://github.com/XifengGuo/CapsNet-Keras/
  2. https://arxiv.org/pdf/1710.09829.pdf