データナード

機械学習と自然言語処理についての備忘録 (旧ナード戦隊データマン)

Zipporahの特徴量設計をpythonで実装

ZipporahはWebクロールによって収集したパラレルコーパスのノイズを除去するツールですが、bashによる複雑なスクリプトで実装されていてモジュール性がありません。そこで、特徴量設計部分をpythonで実装してみます。

論文の要点

特徴量は2つあります。

  • adequacy
  • fluency

adequacyが確率的辞書とBoWに基づいているのに対し、fluencyは言語モデルに基づいています。

詳細は論文から確認できます。

https://www.aclweb.org/anthology/D17-1319.pdf

ノイズ生成

訓練データとしてクリーンコーパスを指定しますが、クリーンコーパスから負例(ノイズ)を生成する必要があります。論文中では、low adequacy, low fluency, bothという3種のノイズを生成していますが、より一般的なパラレルコーパスを再現するためのノイズ生成手法を適用します。

  • Misaligned: アライメントの失敗
  • Misordered: 語順がおかしい
  • Short segments: 語数が少なすぎる
  • Wrong language: ソースとターゲットの言語が一致している

実装

zipporah_features.py

import math
import kenlm


class Zipporah:
    def __init__(self, dic_src, dic_tgt, lm_src, lm_tgt):
        self._dic_src = self._load_dic(dic_src)
        self._dic_tgt = self._load_dic(dic_tgt)
        self._lm_src = self._load_lm(lm_src)
        self._lm_tgt = self._load_lm(lm_tgt)

    def _load_lm(self, lm_file):
        return kenlm.Model(lm_file)

    def _load_dic(self, dicfile):
        dic = {}
        with open(dicfile) as f:
            for line in f:
                line = line.strip().split()
                if len(line) == 3:
                    if line[1] not in dic:
                        dic[line[1]] = {}
                    else:
                        dic[line[1]][line[0]] = float(line[2])
        return dic

    def _v(self, sentence_words, word):
        count = sentence_words.count(word)
        return count / len(sentence_words)

    def _vprime(self, word_2, sentence_1_words, direction="src"):
        out = []
        for word_1 in set(sentence_1_words):
            try:
                if direction == "src":
                    p = self._dic_src[word_1][word_2]
                elif direction == "tgt":
                    p = self._dic_tgt[word_1][word_2]
            except KeyError:
                p = 0.0
            out.append(self._v(sentence_1_words, word_1) * p)
        return sum(out)

    def _xent(self, sentence_1_words, sentence_2_words, direction="src"):
        out = []
        for word2 in set(sentence_2_words):
            out.append(
                self._v(sentence_2_words, word2) *
                math.log(1/(0.0001+self._vprime(word2, sentence_1_words, direction))))
        return sum(out)

    def adequacy(self, sentence1, sentence2):
        words1 = sentence1.strip().split()
        words2 = sentence2.strip().split()
        return self._xent(words1, words2, direction="src") + self._xent(words2, words1, direction="tgt")

    def fluency(self, sentence1, sentence2):
        sscore = self._lm_src.score(sentence1, bos=True, eos=True)
        tscore = self._lm_tgt.score(sentence2, bos=True, eos=True)
        return sscore + tscore

noisegen.py

import pandas as pd
from sklearn.utils import shuffle
from tqdm import tqdm


class NoiseGen:
    def __init__(self, file_src, file_tgt):
        self._clean_data = self._load_clean_data(file_src, file_tgt)
        self._data = None
        self._len_clean = len(self._clean_data)

    def generate(self):
        if self._data is None:
            self._data = pd.concat([
                self._clean_data,
                self._gen_misaligned(),
                self._gen_short_segments(),
                self._gen_misordered(),
                self._gen_wrong_language()
            ])
        return self._data

    def _finalize(self, data):
        return shuffle(data)[:self._len_clean]

    def _load_clean_data(self, file_src, file_tgt):
        clean_data = []
        with open(file_src) as fs, open(file_tgt) as ft:
            for line_s, line_t in tqdm(zip(fs, ft)):
                clean_data.append({
                    "src": line_s.strip(),
                    "tgt": line_t.strip(),
                    "label": "Okay"
                })
        return pd.DataFrame(clean_data)

    def _gen_misaligned(self):
        misaligned = []
        for line_s, line_t in tqdm(zip(self._clean_data["src"],
                                   shuffle(self._clean_data["tgt"]))):
            misaligned.append({
                "src": line_s,
                "tgt": line_t,
                "label": "Misaligned"
            })
        return pd.DataFrame(self._finalize(misaligned))

    def _gen_short_segments(self, n_tokens=[2, 3, 4, 5]):
        short_segments = []
        for n in n_tokens:
            for line_s, line_t in tqdm(zip(self._clean_data["src"],
                                           self._clean_data["tgt"])):
                tmp_s = line_s.split()
                tmp_t = line_t.split()
                len_s = len(tmp_s)
                len_t = len(tmp_t)
                sizes = [(n, n), (n, len_t), (len_s, n)]

                for s in sizes:
                    short_segments.append({
                        "src": ' '.join(tmp_s[:s[0]]),
                        "tgt": ' '.join(tmp_t[:s[1]]),
                        "label": "Short"
                    })
        return pd.DataFrame(self._finalize(short_segments))

    def _gen_misordered(self):
        misordered = []
        for line_s, line_t in tqdm(zip(self._clean_data["src"],
                                       self._clean_data["tgt"])):
            tmp_s = line_s.split()
            tmp_t = line_t.split()
            tmp_s_s = shuffle(tmp_s)
            tmp_t_s = shuffle(tmp_t)
            data = [(tmp_s_s, tmp_t_s), (tmp_s_s, tmp_t), (tmp_s, tmp_t_s)]
            for d in data:
                misordered.append({
                    "src": ' '.join(d[0]),
                    "tgt": ' '.join(d[1]),
                    "label": "Misordered"
                })
        return pd.DataFrame(self._finalize(misordered))

    def _gen_wrong_language(self):
        wrong_language = []
        for line_s, line_t, line_s_s, line_t_s \
            in tqdm(zip(
                self._clean_data["src"],
                self._clean_data["tgt"],
                shuffle(self._clean_data["src"]),
                shuffle(self._clean_data["tgt"]))):
            data = [
                (line_s, line_s),
                (line_t, line_t),
                (line_s, line_s_s),
                (line_t, line_t_s)
            ]
            for d in data:
                wrong_language.append({
                    "src": d[0],
                    "tgt": d[1],
                    "label": "WrongLang"
                })
        return pd.DataFrame(self._finalize(wrong_language))

train.py

import sys
from tqdm import tqdm
import MeCab
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
sys.path.append("module")
from zipporah_features import Zipporah
from noisegen import NoiseGen


z = Zipporah(
    "dic/dict-ja",
    "dic/dict-en",
    "lm/lm.ja",
    "lm/lm.en")
tagger = MeCab.Tagger("-Owakati")


def calc_feats(datafile_src, datafile_tgt):
    feats = []
    df = NoiseGen(datafile_src, datafile_tgt).generate()
    true_df = df[df["label"] == "Okay"]
    false_df = df[df["label"] != "Okay"].iloc[:true_df.shape[0]]
        
    for line_s, line_t in tqdm(zip(true_df["src"], true_df["tgt"])):
        s1 = tagger.parse(line_s)
        s2 = tagger.parse(line_t)
        adeq = z.adequacy(s1, s2)
        flue = z.fluency(s1, s2)
        feats.append({"adequacy": adeq, "fluency": flue, "label":True})

    for line_s, line_t in tqdm(zip(false_df["src"], false_df["tgt"])):
        s1 = tagger.parse(line_s)
        s2 = tagger.parse(line_t)
        adeq = z.adequacy(s1, s2)
        flue = z.fluency(s1, s2)
        feats.append({"adequacy": adeq, "fluency": flue, "label":False})

    return shuffle(pd.DataFrame(feats))


def build_model(X, y):
    scaler = MinMaxScaler().fit(X)
    X_scaler = scaler.transform(X)
    X_train, X_test, y_train, y_test = train_test_split(X_scaler, y, test_size=0.1)
    clf = LogisticRegression().fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    print(classification_report(y_test, y_pred))
    return clf, scaler


def run():
    feats = calc_feats("/tmp/good.ja-en.ja", "/tmp/good.ja-en.en")
    X = feats[["adequacy", "fluency"]]
    y = feats["label"]
    return build_model(X, y)


if __name__ == "__main__":
    run()

考察

これらのスクリプトを実行して観察された結果は以下です。

  • 確率的辞書と言語モデルドメインに依存する。
  • ホールドアウト法でテストデータを用意した場合は精度が極端に高く出る。

ドメインへの依存性があるため、訓練データに利用するデータは複数ドメインを混在させたほうが、汎化性能は上がるかもしれません。一般的なドメインへ適用する場合、利用する辞書や言語モデルも一般的なドメインから学習される必要があります。

精度が極端に高く出る理由は、生成されたノイズの殆どが辞書的な対応の有無によって判定可能であり、adequqcyが一種のleakage化しているためだと考えられます。そのため、テストデータは辞書や言語モデルの訓練で利用されたコーパスとは異なるものを利用するべきかもしれません。

参考

  1. Zipporah: a Fast and Scalable Data Cleaning System for Noisy Web-Crawled Parallel Corpora - ACL Anthology
  2. On the Impact of Various Types of Noise on Neural Machine Translation - ACL Anthology