データナード

機械学習と自然言語処理についての備忘録 (旧ナード戦隊データマン)

wordgraph: 単語グラフを記事テキストから生成する

記事テキストのコーパスから、単語の共起関係を使って単語グラフを生成するだけの単純なスクリプトを書きました。

コード

共起関係の構築

wordgraphは共起語のDBを使うため、それを構築する必要があります。

import sys
import json
import MeCab
import pickle
from tqdm import tqdm
from collections import defaultdict

tagger = MeCab.Tagger(
    "-d /usr/local/lib/mecab/dic/mecab-ipadic-neologd/")

class DataBuilder:
    def __init__(self, dbfile="db.pkl", tokenizer=None):
        self._dbfile = dbfile
        self._db = defaultdict(dict)
        if tokenizer is None:
            self._tokenizer = self._default_tokenizer
        else:
            self._tokenizer = tokenizer

    def __call__(self, text):
        self._insert(text)

    def save(self):
        with open(self._dbfile, "wb") as f:
            pickle.dump(self._db, f, protocol=4)

    def _insert(self, text):
        words = self._tokenizer(text)
        for word1 in words:
            for word2 in words:
                if word1 and word2:
                    self._update_db(word1, word2)
                    self._update_db(word2, word1)

    def _default_tokenizer(self, text):
        out = set()
        for line in tagger.parse(text).split("\n"):
            line = line.strip()
            if "\t" in line:
                word, pos = line.split("\t")
                if "名詞" in pos:
                    out.add(word)
        return out

    def _update_db(self, word1, word2):
        if word2 in self._db[word1]:
            self._db[word1][word2] += 1
        else:
            self._db[word1][word2] = 1
            

if __name__ == "__main__":
    import sys
    import gzip

    builder = DataBuilder(sys.argv[2])

    with gzip.open(sys.argv[1], "rt", encoding='utf-8') as f:
        texts = [line.strip() for line in f]

    for text in tqdm(texts):
        builder(text)
    builder.save()
$ python3 data_builder.py ./data/data.txt.gz db.kch

data.txt.gzは単なる例なので、好きなテキストデータを使えます。(ただし、行ごとに内容を分けてください。)

グラフの構築

WordGraphクラスはnetworkxと共起語DBからグラフを構築します。パラメータは3つあります。

  1. ルート語 (key)
  2. ルート語からの深さ(デフォルト 2)
  3. 重みのしきい値 (デフォルト 0.5)

ルート語 -> 深さ1の子 -> 深さ2の子 -> ... -> 深さnの子

各々の深さの子の重み率の合計は重みのしきい値以下になるように調整されます。重みの大きい順に選択されます。

import json
import pickle
import networkx as nx


class WordGraphBuilder:
    def __init__(self, dbfile="db.pkl"):
        self.load(dbfile)

    def load(self, dbfile):
        with open(dbfile, "rb") as f:
            self._db = pickle.load(f)

    def __call__(self, key, depth=2, threshold=0.5):
        graph = nx.Graph()
        visited = {key}
        childs = self._db[key]
        if not childs:
            return graph.copy()
        stack = [(key, childs)]
        tmp_stack = []
        for i in range(depth):
            while stack:
                prev_word, childs = stack.pop(0)
                childs_tmp = sorted(childs.items(),
                                    key=lambda x: x[1],
                                    reverse=True)
                total = sum(x[1] for x in childs_tmp)
                p = 0.0
                for word, count in childs_tmp:
                    p, visited, tmp_stack = self._add_edge(
                        graph,
                        tmp_stack, visited,
                        prev_word, word,
                        p, threshold,
                        count, total)
            stack = tmp_stack[:]
            tmp_stack = []
        return graph.copy()

    def _add_edge(self,
                  graph,
                  tmp_stack,
                  visited,
                  prev_word,
                  word,
                  p,
                  threshold,
                  count,
                  total):
        if word == prev_word:
            return p, visited, tmp_stack
        is_visited = word in visited
        if p < threshold:
            child_p = float(count) / float(total)
            graph.add_edge(prev_word, word, weight=child_p)
            p += child_p
            if not is_visited:
                visited.add(word)
                tmp = self._db[word]
                if tmp:
                    tmp_stack.append((word, tmp))
        return p, visited, tmp_stack

class WordGraph:
    def __init__(self, dbfile="db.pkl"):
        self._builder = WordGraphBuilder(dbfile)
        self.graph = None

    def build_graph(self, key, depth=2, threshold=0.5):
        self.graph = self._builder(key, depth, threshold)
from wordgraph import WordGraph

g = WordGraph("db.kch")
g.build_graph("イチロー", depth=2, threshold=0.5)
g.graph # networkxによる重み付きグラフ

グラフ構築後に色々と操作したい場合はnetworkxのマニュアルを読んでください。(https://networkx.github.io/)

グラフを使う例

2つのWordGraphに対し、2つの異なるクエリによってグラフを構築し、その共通グラフを抜き出してから、両方のクエリに対する最短距離が指定値以下のノードのみを保持した新しいグラフを生成します。 その新しいグラフの中で、最も次数の多いノード名とその次数を出力します。

if __name__ == "__main__":
    import sys
    import networkx as nx
    from wordgraph import WordGraph
    
    query1 = sys.argv[1]
    query2 = sys.argv[2]
    sp_length = int(sys.argv[3])
    graph1 = WordGraph()
    graph2 = WordGraph()
    graph1.build_graph(query1, threshold=0.5)
    graph2.build_graph(query2, threshold=0.5)
    R = graph1.graph.copy()
    R.remove_nodes_from(n for n in graph1.graph if n not in graph2.graph)
    edges = [(x[0], x[1], x[2]["weight"]) for x in R.edges(data=True)
             if nx.shortest_path_length(R, query1, x[0]) < sp_length
             and nx.shortest_path_length(R, query2, x[0]) < sp_length]
    G = nx.Graph()
    for e in edges:
        G.add_edge(e[0], e[1], weight=e[2])
    print(max(G.degree(), key=lambda x: x[1]))
    nx.write_weighted_edgelist(G, "test.txt")
$ python3 example.py 大谷 ダルビッシュ 3

[output]

('大リーグ', 161)

追記

WordGraphよりもWordGraphBuilderのほうが良い気がしたので、コードを変更しました。サンプルの実行コード(example.py)は以下です:

import sys
import networkx as nx
from wordgraph import WordGraphBuilder

if __name__ == "__main__":
    query1 = sys.argv[1]
    query2 = sys.argv[2]
    sp_length = int(sys.argv[3])
    depth = int(sys.argv[4])
    threshold = float(sys.argv[5])
    builder = WordGraphBuilder()
    graph1 = builder(query1, depth=depth, threshold=threshold)
    graph2 = builder(query2, depth=depth, threshold=threshold)
    R = graph1.copy()
    R.remove_nodes_from(n for n in graph1 if n not in graph2)
    edges = [(x[0], x[1], x[2]["weight"]) for x in R.edges(data=True)
             if nx.shortest_path_length(R, query1, x[0]) < sp_length
             and nx.shortest_path_length(R, query2, x[0]) < sp_length]
    G = nx.Graph()
    for e in edges:
        G.add_edge(e[0], e[1], weight=e[2])
    print(max(G.degree(), key=lambda x: x[1]))
    pr = nx.pagerank(R)
    print(sorted(pr.items(), key=lambda x: x[1], reverse=True)[:10])
    nx.write_weighted_edgelist(G, "test.txt")

参考

  1. Kyoto Cabinet: a straightforward implementation of DBM
  2. NetworkX — NetworkX documentation