ナード戦隊データマン

機械学習と自然言語処理についてのブログ

XNLIとは何か

Cross-lingual Natural Language Inference (XNLI) コーパス1とは、MultiNLIコーパス2のために、集められたデータです。データは前提・仮説・ラベルから成り、14の言語に翻訳されています。このコーパスは、英語のNLIデータだけをトレーニング時に利用し、そのモデルがどの言語でも推論を実行できるような方法を評価するために作成されています。

今回は、BERTをファインチューニングしてMultiNLI(英語)で訓練したあと、そのモデルをそのまま使ってXNLIによって多言語対応するとどうなるのかを確かめます。

コード

import json
import os
 
import bert
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
from bert import run_classifier
 
langs = "ar bg de el en es fr hi ru sw th tr ur vi zh".split()
BERT_MODEL_HUB = "https://tfhub.dev/google/bert_multi_cased_L-12_H-768_A-12/1"
 
LABELS = {"entailment":int(0) ,"neutral":int(1), "contradiction":int(2)}
 
def makedir():
    outdir = "model"
    tf.gfile.MakeDirs(outdir)
    return outdir
 
 
def load_dataset(directory="./XNLI_fixed/", lang="en", test=False):
    if test:
        with open(os.path.join(directory,
                               "xnli.dev.cl.{}".format(lang))) as f1:
            with open(os.path.join(directory,
                                   "xnli.dev.hyp.{}".format(lang))) as f2:
                with open(
                        os.path.join(directory,
                                     "xnli.dev.prem.{}".format(lang))) as f3:
                    test_df = pd.DataFrame([{
                        "cl": int(cl.strip()),
                        "hyp": hyp.strip(),
                        "prem": prem.strip()
                    } for cl, hyp, prem in zip(f1, f2, f3)])
        return test_df
 
    else:
        train_df = pd.read_csv(os.path.join(
            directory, "multinli_1.0_train.txt"),
                               sep="\t", error_bad_lines=False)
        train_df = train_df[["sentence1", "sentence2", "gold_label"]]
        train_df.columns = ["prem", "hyp", "cl"]
        train_df["cl"] = [LABELS[x] for x in train_df["cl"]]
        train_df["hyp"] = [str(x) for x in train_df["hyp"]]
        train_df["prem"] = [str(x) for x in train_df["prem"]]
        return train_df
 
 
def create_tokenizer_from_hub_module():
    """Get the vocab file and casing info from the Hub module."""
    with tf.Graph().as_default():
        bert_module = hub.Module(BERT_MODEL_HUB)
        tokenization_info = bert_module(signature="tokenization_info",
                                        as_dict=True)
        with tf.Session() as sess:
            vocab_file, do_lower_case = sess.run([
                tokenization_info["vocab_file"],
                tokenization_info["do_lower_case"]
            ])
 
    return bert.tokenization.FullTokenizer(vocab_file=vocab_file,
                                           do_lower_case=do_lower_case)
 
 
def create_model(is_predicting, input_ids, input_mask, segment_ids, labels,
                 num_labels):
    """Creates a classification model."""
 
    bert_module = hub.Module(BERT_MODEL_HUB, trainable=True)
    bert_inputs = dict(input_ids=input_ids,
                       input_mask=input_mask,
                       segment_ids=segment_ids)
    bert_outputs = bert_module(inputs=bert_inputs,
                               signature="tokens",
                               as_dict=True)
 
    # Use "pooled_output" for classification tasks on an entire sentence.
    # Use "sequence_outputs" for token-level output.
    output_layer = bert_outputs["pooled_output"]
 
    hidden_size = output_layer.shape[-1].value
 
    # Create our own layer to tune for politeness data.
    output_weights = tf.get_variable(
        "output_weights", [num_labels, hidden_size],
        initializer=tf.truncated_normal_initializer(stddev=0.02))
 
    output_bias = tf.get_variable("output_bias", [num_labels],
                                  initializer=tf.zeros_initializer())
 
    with tf.variable_scope("loss"):
        # Dropout helps prevent overfitting
        output_layer = tf.nn.dropout(output_layer, keep_prob=0.9)
 
        logits = tf.matmul(output_layer, output_weights, transpose_b=True)
        logits = tf.nn.bias_add(logits, output_bias)
        log_probs = tf.nn.log_softmax(logits, axis=-1)
 
        # Convert labels into one-hot encoding
        one_hot_labels = tf.one_hot(labels, depth=num_labels, dtype=tf.float32)
 
        predicted_labels = tf.squeeze(
            tf.argmax(log_probs, axis=-1, output_type=tf.int32))
        # If we're predicting, we want predicted labels and the probabiltiies.
        if is_predicting:
            return (predicted_labels, log_probs)
 
        # If we're train/eval, compute loss between predicted and actual label
        per_example_loss = -tf.reduce_sum(one_hot_labels * log_probs, axis=-1)
        loss = tf.reduce_mean(per_example_loss)
        return (loss, predicted_labels, log_probs)
 
 
def model_fn_builder(num_labels, learning_rate, num_train_steps,
                     num_warmup_steps):
    """Returns `model_fn` closure for TPUEstimator."""
 
    def model_fn(features, labels, mode, params):  # pylint: disable=unused-argument
        """The `model_fn` for TPUEstimator."""
 
        input_ids = features["input_ids"]
        input_mask = features["input_mask"]
        segment_ids = features["segment_ids"]
        label_ids = features["label_ids"]
 
        is_predicting = (mode == tf.estimator.ModeKeys.PREDICT)
 
        # TRAIN and EVAL
        if not is_predicting:
 
            (loss, predicted_labels,
             log_probs) = create_model(is_predicting, input_ids, input_mask,
                                       segment_ids, label_ids, num_labels)
 
            train_op = bert.optimization.create_optimizer(loss,
                                                          learning_rate,
                                                          num_train_steps,
                                                          num_warmup_steps,
                                                          use_tpu=False)
 
            # Calculate evaluation metrics.
            def metric_fn(label_ids, predicted_labels):
                accuracy = tf.metrics.accuracy(label_ids, predicted_labels)
                return {"eval_accuracy": accuracy}
 
            eval_metrics = metric_fn(label_ids, predicted_labels)
 
            if mode == tf.estimator.ModeKeys.TRAIN:
                return tf.estimator.EstimatorSpec(mode=mode,
                                                  loss=loss,
                                                  train_op=train_op)
            else:
                return tf.estimator.EstimatorSpec(mode=mode,
                                                  loss=loss,
                                                  eval_metric_ops=eval_metrics)
        else:
            (predicted_labels,
             log_probs) = create_model(is_predicting, input_ids, input_mask,
                                       segment_ids, label_ids, num_labels)
 
            predictions = {
                'probabilities': log_probs,
                'labels': predicted_labels
            }
            return tf.estimator.EstimatorSpec(mode, predictions=predictions)
 
    # Return the actual model function in the closure
    return model_fn
 
 
def load_examples(langs,
                  tokenizer,
                  texta_col="hyp",
                  textb_col="prem",
                  label_col="cl"):
    test_datas = []
    test_features_list = []
    train = load_dataset()
    train_InputExamples = train.apply(lambda x: bert.run_classifier.
                                      InputExample(guid=None,
                                                   text_a=x[texta_col],
                                                   text_b=x[textb_col],
                                                   label=x[label_col]),
                                      axis=1)
    train_features = bert.run_classifier.convert_examples_to_features(
        train_InputExamples, [0, 1, 2], 128, tokenizer)
 
    for lang in langs:
        test = load_dataset(lang=lang, test=True)
        test_InputExamples = test.apply(lambda x: bert.run_classifier.
                                        InputExample(guid=None,
                                                     text_a=x[texta_col],
                                                     text_b=x[textb_col],
                                                     label=x[label_col]),
                                        axis=1)
        test_datas.append(test_InputExamples)
        test_features = bert.run_classifier.convert_examples_to_features(
            test_InputExamples, [0, 1, 2], 128, tokenizer)
        test_features_list.append(test_features)
    return train_features, test_features_list
 
 
def setup_model(train_features, test_features_list, outdir="model"):
    test_input_fns = []
    BATCH_SIZE = 2
    LEARNING_RATE = 2e-5
    NUM_TRAIN_EPOCHS = 1.0
    WARMUP_PROPORTION = 0.1
    SAVE_CHECKPOINTS_STEPS = 500
    SAVE_SUMMARY_STEPS = 100
    num_train_steps = int(len(train_features) / BATCH_SIZE * NUM_TRAIN_EPOCHS)
    num_warmup_steps = int(num_train_steps * WARMUP_PROPORTION)
    run_config = tf.estimator.RunConfig(
        model_dir=outdir,
        save_summary_steps=SAVE_SUMMARY_STEPS,
        save_checkpoints_steps=SAVE_CHECKPOINTS_STEPS)
    model_fn = model_fn_builder(num_labels=3,
                                learning_rate=LEARNING_RATE,
                                num_train_steps=num_train_steps,
                                num_warmup_steps=num_warmup_steps)
 
    estimator = tf.estimator.Estimator(model_fn=model_fn,
                                       config=run_config,
                                       params={"batch_size": BATCH_SIZE})
 
    train_input_fn = bert.run_classifier.input_fn_builder(
        features=train_features,
        seq_length=128,
        is_training=True,
        drop_remainder=False)
 
    for test_features in test_features_list:
        test_input_fn = run_classifier.input_fn_builder(features=test_features,
                                                        seq_length=128,
                                                        is_training=False,
                                                        drop_remainder=False)
        test_input_fns.append(test_input_fn)
 
    return estimator, train_input_fn, test_input_fns, num_train_steps, num_warmup_steps
 
 
def train_it(estimator, train_input_fn, num_train_steps):
    estimator.train(input_fn=train_input_fn, max_steps=num_train_steps)
    return estimator
 
 
def evaluate_them(estimator, test_input_fns):
    return [
        estimator.evaluate(input_fn=test_input_fn, steps=None)
        for test_input_fn in test_input_fns
    ]
 
 
def save_results(results):
    print(results)
    results = [float(result["eval_accuracy"]) for result in results]
    with open("results.json", "w") as f:
        json.dump(results, f, indent=4, sort_keys=True)
 
 
if __name__ == "__main__":
    tokenizer = create_tokenizer_from_hub_module()
    outdir = makedir()
    train, test = load_examples(langs, tokenizer)
    estimator, train, test, num_train_steps, _ = setup_model(
        train, test, outdir)
    estimator = train_it(estimator, train, num_train_steps)
    results = evaluate_them(estimator, test)
    save_results(results)

結果

{'ar': 0.6265060305595398,
 'bg': 0.6502007842063904,
 'de': 0.6931726932525635,
 'el': 0.6397590637207031,
 'en': 0.7971887588500977,
 'es': 0.7240963578224182,
 'fr': 0.7104417681694031,
 'hi': 0.5831325054168701,
 'ru': 0.6522088646888733,
 'sw': 0.511646568775177,
 'th': 0.5004016160964966,
 'tr': 0.5963855385780334,
 'ur': 0.5598393678665161,
 'vi': 0.6775100231170654,
 'zh': 0.676706850528717}

このように、英語だけで訓練したものが、多言語に対応できているようです。ただし、facebookresearch/LASERのほうが高い精度が出ています。

参考

[0] https://github.com/google-research/bert/blob/master/run_classifier_with_tfhub.py [1] https://github.com/google-research/bert/blob/master/run_classifier.py [2] https://colab.research.google.com/github/google-research/bert/blob/master/predicting_movie_reviews_with_bert_on_tf_hub.ipynb