データナード

機械学習と自然言語処理についての備忘録 (旧ナード戦隊データマン)

sentence embeddingとdeep metric learning

paraphrase retrievalは、大量の文を保存したシステムに対してクエリを投げて、パラフレーズを高速に検索するタスクを指します。今回は、ベクトル検索のためにパラフレーズコーパスからsentence embeddingを学習する方法についてのメモを書きます。

triplet loss

triplet lossは、次のように表される損失関数です:

 L(A, P, N) = max(|| \phi(A) - \phi(P)||^2 - || \phi(A) -\phi(N)||^2 + \alpha, 0)

 \phi はエンコーダです。パラフレーズからsentence embeddingを学習する場合、Aは入力文で、Pは入力文に対応する正例です。Nは負例を意味します。Lは損失関数を意味します。

Nの負例のとり方として最も単純なものは、ランダムに負例をサンプリングすることです。ただ、このような方法を使ってエンコーダを学習すると、入力文ベクトルと負例ベクトルの距離を最大化することを中心に学習する恐れがあり、エンコーダの出力が均一化する可能性があります。そのようなエンコーダは文エンコーダとして無価値です。

そこで、Nの負例として「負例であることを判定するのが難しい」ようなものを使います。このような方法を使えば、PとNを見分けるのが難しくなるため、学習が正しい形で収束する可能性が高まります。「正しい形で」とは、「出力の均一化による見せかけの損失最小化ではない」という意味です。

コード

ダメだったコード

以下のコードは、文エンコーダとして機能しなかったものです。文エンコーダの出力が均一化してしまい、類似度をとるとどんな文ペアでも0.9以上になるような状態になってしまいました。

import sys
import random

import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K
from sklearn.utils import shuffle
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Embedding, SpatialDropout1D, SeparableConv1D, GlobalMaxPooling1D, MaxPooling1D, Input, Dense, Dropout, Flatten, BatchNormalization, GRU

bs = 5

X_t = np.load("./prepared_data/qvecs_train_features.npy")
Y_t = np.load("./prepared_data/dvecs_train_features.npy")
X_train = np.vstack([X_t, Y_t])
Y_train = np.vstack([Y_t, X_t])

tsize = (X_train.shape[0] // bs) * bs
X_train = X_train[:tsize]
Y_train = Y_train[:tsize]
Y_rand = shuffle(Y_train)
print(X_train.shape)
print(Y_train.shape)
X_d = np.load("./prepared_data/qvecs_dev_features.npy")
Y_d = np.load("./prepared_data/dvecs_dev_features.npy")
X_dev = np.vstack([X_d, Y_d])
Y_dev = np.vstack([Y_d, X_d])

ns = [None for _ in range(bs)]


def generate_vdata():
    while True:
        x, y = shuffle(X_dev, Y_dev)
        yield x[:bs], y[:bs]


def dummy_metrics(x, y):
    return 0


def triplet_loss(model, data, margin, ns):
    def pred(model, x):
        x_cp = x
        for i in range(len(model.layers)):
            x_cp = model.layers[i](x_cp)
        return x_cp

    def loss(y_true, y_pred):
        for j in range(bs):
            ns[j] = random.choice(data)
        va = pred(model, y_true)
        vp = y_pred
        vn = pred(model, np.array(ns))
        dx = tf.norm(va - vp)
        dy = tf.norm(va - vn)
        T = dx - dy
        T = tf.add(T, tf.constant(margin))
        T = tf.maximum(T, 0.0)
        valid_triplets = tf.cast(tf.math.greater(T, 1e-16), dtype=tf.float32)
        num_positive_triplets = tf.reduce_sum(valid_triplets)
        T = tf.reduce_sum(T) / (num_positive_triplets + 1e-16)
        return T

    return loss


def trainB(model, X, y, loss_function, epochs=3, outdir="./logs"):
    writer = tf.summary.create_file_writer(logdir=outdir)

    with writer.as_default():
        for j in range(epochs):
            print("epoch:", j + 1)
            for i in range(X.shape[0] // bs):
                for j in range(bs):
                    ns[j] = random.choice(Y_rand)
                X_t = X[i * bs:(i + 1) * bs]
                y_t = y[i * bs:(i + 1) * bs]
                loss = model.train_on_batch(X_t, y_t)[0]
                tf.summary.scalar("train_loss", loss,
                                  i + (X.shape[0] // bs) * j)
                writer.flush()
                if i % 10 == 0:
                    sys.stdout.write("Iter:{0: >5}, loss:{1: >20}\r".format(
                        i, loss))
                    sys.stdout.flush()
            print("\nloss:{}".format(
                model.test_on_batch(*next(generate_vdata()))))


def build_model(data,
                margin=0.5,
                max_features=8000,
                max_len=300,
                dim1=50,
                dim2=200,
                drate=0.25):
    global ns
    model = Sequential()
    model.add(Input(shape=(300, )))
    model.add(
        Embedding(max_features + 1,
                  dim1,
                  input_length=max_len,
                  embeddings_initializer="he_normal"))
    model.add(Flatten())
    model.add(Dense(dim2))
    model.add(Dropout(drate))
    model.add(Dense(dim2))
    model.add(Dropout(drate))
    model.add(Dense(dim2))
    lf = triplet_loss(model, data, margin, ns)
    model.compile(loss=lf,
                  optimizer=tf.keras.optimizers.Adam(lr=1e-6),
                  metrics=[dummy_metrics])
    return model, lf


if __name__ == "__main__":
    model, lf = build_model(X_train, margin=5.0)
    trainB(model, X_train, Y_train, lf, epochs=1)
    model.save("model.h5")

これは、ランダムに負例をサンプリングしたものです。

前述の方法よりも文エンコーダとして機能したコード

hard-negativeを用意して訓練した以下のコードは、一応もう少し正しく文エンコーダとして機能しました。

import sys
import random

import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K
from sklearn.utils import shuffle
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Embedding, SpatialDropout1D, SeparableConv1D, GlobalMaxPooling1D, MaxPooling1D, Input, Dense, Dropout, Flatten, BatchNormalization, GRU

bs = 5
lr = 1e-6

X_t = np.load("./prepared_data/qvecs_train_features.npy")
Y_t = np.load("./prepared_data/dvecs_train_features.npy")
X_n = np.load("./prepared_data/qvecs_n_train_features.npy")
Y_n = np.load("./prepared_data/dvecs_n_train_features.npy")

X_train = np.vstack([X_t, Y_t])
Y_train = np.vstack([Y_t, X_t])

tsize = (X_train.shape[0] // bs) * bs
X_train = X_train[:tsize]
Y_train = Y_train[:tsize]
Y_rand = shuffle(Y_train)
print(X_train.shape)
print(Y_train.shape)
X_d = np.load("./prepared_data/qvecs_dev_features.npy")
Y_d = np.load("./prepared_data/dvecs_dev_features.npy")
X_dev = np.vstack([X_d, Y_d])
Y_dev = np.vstack([Y_d, X_d])


def generate_vdata():
    while True:
        x, y = shuffle(X_dev, Y_dev)
        yield x[:bs], y[:bs]


def dummy_metrics(x, y):
    return 0


def triplet_loss(model, X_n, Y_n, margin):
    def pred(model, x):
        x_cp = x
        for i in range(len(model.layers)):
            x_cp = model.layers[i](x_cp)
        return x_cp

    def loss(y_true, y_pred):
        ns_x, ns_y = shuffle(X_n, Y_n)
        ns_x = ns_x[:bs]
        ns_y = ns_y[:bs]
        va = pred(model, y_true)
        vp = y_pred
        vn_x = pred(model, np.array(ns_x))
        vn_y = pred(model, np.array(ns_y))
        dx = tf.norm(va - vp)
        dy = tf.norm(vn_x - vn_y)
        T = dx - dy
        T = tf.add(T, tf.constant(margin))
        T = tf.maximum(T, 0.0)
        valid_triplets = tf.cast(tf.math.greater(T, 1e-16), dtype=tf.float32)
        num_positive_triplets = tf.reduce_sum(valid_triplets)
        T = tf.reduce_sum(T) / (num_positive_triplets + 1e-16)
        return T

    return loss


def trainB(model, X, y, loss_function, epochs=3, outdir="./logs"):
    writer = tf.summary.create_file_writer(logdir=outdir)

    with writer.as_default():
        for j in range(epochs):
            print("epoch:", j + 1)
            for i in range(X.shape[0] // bs):
                X_t = X[i * bs:(i + 1) * bs]
                y_t = y[i * bs:(i + 1) * bs]
                loss = model.train_on_batch(X_t, y_t)[0]
                tf.summary.scalar("train_loss", loss,
                                  i + (X.shape[0] // bs) * j)
                writer.flush()
                if i % 10 == 0:
                    sys.stdout.write("Iter:{0: >5}, loss:{1: >20}\r".format(
                        i, loss))
                    sys.stdout.flush()
            print("\nloss:{}".format(
                model.test_on_batch(*next(generate_vdata()))))


def build_model(data,
                margin=0.5,
                max_features=8000,
                max_len=300,
                dim1=50,
                dim2=200,
                drate=0.25):
    model = Sequential()
    model.add(Input(shape=(300, )))
    model.add(
        Embedding(max_features + 1,
                  dim1,
                  input_length=max_len,
                  embeddings_initializer="he_normal"))
    model.add(Flatten())
    model.add(Dense(dim2))
    model.add(Dropout(drate))
    model.add(Dense(dim2))
    model.add(Dropout(drate))
    model.add(Dense(dim2))
    lf = triplet_loss(model, X_n, Y_n, margin)
    model.compile(loss=lf,
                  optimizer=tf.keras.optimizers.Adam(lr=lr),
                  metrics=[dummy_metrics])
    return model, lf


if __name__ == "__main__":
    model, lf = build_model(X_train, margin=5.0)
    trainB(model, X_train, Y_train, lf, epochs=1)
    model.save("model.h5")

Note: 学習経過の収束具合、訓練したモデルから文をエンコードして類似度をとったらどうなるか、などは来週加筆します。

考察

後者のコードは、厳密にはtriplet lossの数式と違います。負例は確かにhard-negativeですが、Aに対応したものではなく、Aとは対応しない「ペア」です。このようなコードを書かざるを得なかったのは、訓練データ内に入力文の重複がなかったためです。重複が存在する場合は、入力文に対応したPとNを使うことができますが、代替的な方法として(正しいかは不明ですが)この方法を試しました。

paraphraseから学習されたsentence embeddingがほしい理由は、laserなどの手法よりも高い精度で同義文を見つけたい、というようなタスクが存在したためです。laserは多言語への対応性が高く、同義文検索である程度は使うことが可能ですが、どちらかといえばそれは「同義」というより「類似」と言えます。「同義」というレベルで文を検索したい場合はもっと精密なsentence embeddingが必要になります。

ちなみに、paraphrase retrievalは検索効率も要求されるため、効率面ではベクトル検索を使うことは理にかなっていると考えられます。faissのようなライブラリと組み合わせれば、大量の文を持つシステムに対するベクトル検索やクラスタリングを高速に行うことができます。

一般に、今回のtriplet lossを使った方法は「パラフレーズ」というドメインへの適応をしたsentence embeddingと言えそうなので、別のタイプのドメインに対してtriplet lossを使ってsentence embeddingを学習できれば、そのドメインへの適応性が高いような表現を獲得できるかもしれません。例えば、含意タスクでこの手法を使えるかどうか、というのは検証したい事柄です。

参考

  1. [1905.12786] Large Scale Question Paraphrase Retrieval with Smoothed Deep Metric Learning
  2. GitHub - omoindrot/tensorflow-triplet-loss: Implementation of triplet loss in TensorFlow