ナード戦隊データマン

データサイエンスを用いて悪と戦うぞ

Scrapy Cloudのデータをmongodbへ自動的に格納

Scrapy Cloudとは、scrapyやportiaをscrapinghubのクラウド上で実行・監視するサービスです。スクレイピングをやっていると、スクレイピングのプロフェッショナル集団が作り上げたシステム"scrapinghub"が如何に便利で優れているか思い知らされます。

今回は、Scrapy CloudのジョブをScrapy Cloud APIを使って監視し、終了したジョブのアイテムを自動的にmongodbへ格納するモジュールを作成します。

手順

  1. ジョブリストを取得。
  2. ジョブリストの中でDBに格納していない&実行完了しているIDを取得。
  3. 取得したID名でコレクションを作成。
  4. 取得したIDのスクレイピング済みアイテムをjsonで取得。
  5. jsonデータを作成したコレクションにbulk insertする。

※ これらの手順をモジュール化・スクリプト化し、cronから実行できるようにすると自動化できる。

 コード

import logging
import pymongo
import requests

log = logging.getLogger("jobwatcher")


def watch_job(token, project):
    headers = {'Accept': 'application/json'}
    url_template = "https://{}:@app.scrapinghub.com/api/jobs/list.json?project={}"
    r = requests.get(url_template.format(token, project), headers=headers)
    return eval(r.content)


def download_data(token, job_id):
    headers = {'Accept': 'application/json'}
    r = requests.get("https://{}:@storage.scrapinghub.com/items/{}".format(token, job_id), headers=headers)
    return eval(r.content)


def insert_scrapinghub(token, db, project):
    col_names = db.collection_names()
    inserted_ids = []
    for job in watch_job(token, project)['jobs']:
        if job['id'] in col_names or job['state'] != "finished":
            continue
        else:
            try:
                db.create_collection(job['id'])
                col = db[job['id']]
                data = download_data(token, job['id'])
                col.insert_many(data)
                inserted_ids.append(job['id'])
            except Exception as e:
                log.exception("insert_scrapinghub: fail")
                log.exception(e)
        log.info("{} inserted.".format(job['id']))
    return inserted_ids


if __name__ == "__main__":
    import sys
    token = "YOUR SCRAPINGHUB TOKEN"
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    db = client["scrapinghub"]
    project_id = sys.argv[1]
    insert_scrapinghub(token, db, project_id)

参考

[0] https://doc.scrapinghub.com/scrapy-cloud.html

scrapinghub/portiaの使い方

ビジュアルスクレイピングツールとは、スクレイピングGUIから行うことのできるツールのことです。ここでは、scrapinghubの提供するportiaの使い方を書きます。

2種類の使い方

使い方には2種類あります。

  1. scrapinghubにサインアップし、SaaSとして使う。
  2. portiaを自前のサーバに入れて使う。

scrapinghubから使う場合、portiaで作成したスパイダーをscrapy cloudから実行する形態なので、料金プランはscrapy cloudの料金になります。

無料プランの制約は以下です。

  1. 同時クロール数: 1
  2. データ保存期間: 7日
  3. クロールの最大継続実行時間: 1時間

有料プランは、unitの追加という形態をとります。

1 unit = 1GB RAM + 同時クロール数プラス1 + 最大継続時間のリミット解除 = $9/月

https://scrapinghub.com/scrapy-cloud#pricing

それに対し、portiaを自前のサーバに入れて使う場合は、scrapinghubの料金を支払う必要もなく、制約もありません。

portiaを実行する

それでは、portiaの最も簡単な実行方法を書きます。 dockerをインストールしてから、以下のコマンドを実行してください。

$ mkdir ~/data
$ docker pull scrapinghub/portia
$ docker run -i -t --rm -v ~/data:/app/data/:rw -p 9001:9001 scrapinghub/portia

http://localhost:9001 にアクセスすればportiaが使えます。

portiaの基本

portiaの基本を説明します。

基本1: プロジェクトとスパイダー

portiaは、「プロジェクト」と「スパイダー」から構成されます。各プロジェクト内に複数のスパイダーを作成できますが、スパイダーとは、あるドメインに対してクローリング・スクレイピングする方法を定義したものだと考えてください。

基本2: スタートページとリンク抽出方法

スタートページとは、クローリングの起点です。スパイダーは起点からリンクを辿ることでデータを抽出します。典型的な方法として、サイトの記事一覧ページ等にページ番号を渡すことで起点を定義する方法があります。

リンク抽出方法には、以下の種類があります。

  • 自動
  • 内部リンクをすべてたどる
  • 正規表現に合致するリンクをたどる
  • たどらない

基本3: サンプルページとデータ抽出

サンプルページとは、データ抽出を行いたいページを定義するものです。サンプルページの例としては、ニュース記事の記事ページ、ショッピングサイトの特定の商品ページ、などがあります。

サンプルページを特定すると、サンプルページからどのデータを抽出するのか選ぶことができます。Web UIベースのツールなので、表示されたサンプルページから抽出したい箇所をクリックすると抽出箇所として選ばれます。例えば、商品名、価格、出版日、記事本文などを抽出します。

抽出箇所として選ばれたフィールドは、正規化することができます。出版日をdate型として定義すると、日付として正規化されます。価格をprice型として定義すれば、数値だけが抽出されます。

基本4: スパイダーの実行

スパイダーの実行方法は、Scrapy Cloudを使う方法と、portiacrawlコマンドを使う方法、scrapyスクリプトを実行する方法があります。

実行方法の例

GUIベースのツールであるため、静的なスクリーンショットを貼るよりも、動画を見たほうが早いと思います。 https://learn.scrapinghub.com/portia/

portiacrawlでスパイダーを実行

スパイダーを実行するには、portiaのdockerコンテナにログインして以下のコマンドを実行します。

$ mkdir /app/data/exported_data
$ portiacrawl /app/data/projects/PROJECT_NAME SPIDER_NAME -o /app/data/exported_data/SPIDER_NAME.jl

参考

[1] https://www.youtube.com/channel/UCYb6YWTBfD0EB53shkN_6vA [2] https://learn.scrapinghub.com/portia/ [3] https://scrapinghub.com/ [4] https://github.com/scrapinghub/portia [5] http://portia.readthedocs.io/en/latest/installation.html

スクレイピングのための2つの課題とブラックハット的解決

スクレイピングとは、Web上のコンテンツを自動的に抽出するテクニックの総称です。ここでは、スクレイピング時に生じる2つの技術的問題についてまとめ、「ブラックハット的解決策」を考えていきます。

課題1: 速度が遅い

スクレイピングの一つの課題は、「速度」です。ちまちまと、一つ一つ情報取得していては、いくらCPUが優れていても無意味です。なぜならこれはCPUの問題ではなく、ネットワークの問題だからです。

一般的に、ネットワークの並列的アクセスは、ダウンロード速度を向上させます。しかし、これはDoS攻撃のように、対象サイトへと大量アクセスをすることを意味しています。

ともあれ、速度の問題を一旦解決する単純なコードの例を見てみましょう。

import pymp
import pandas as pd
import diffbot
import json

token = "diffbotのトークンを設定してください"
urls = pd.read_csv("example_urls.csv")['url'].tolist()

with pymp.Parallel(100) as p:
    for index in p.range(0, len(urls)):
        try:
            with open("{}.json".format(index), "w") as f:
                json.dump(diffbot.article(urls[index], token=token), f) 
        except:
            print("ERROR:{}".format(index))

これは、ターゲットのurlリストを用意した上で、そのURLから並列にコンテンツ抽出を実行するコードです。urllistが特定のドメインに限定されている場合、大量アクセスを仕掛けることになります。しかし、速度はとんでもなく向上します。

課題2: アクセスブロックされる

Google検索のスクレイピングは、困難なタスクの一つです。Googleは異常なトラフィックを即座に検出し、ボットをブロックしてしまいます。

トラフィックの検出では、以下の要素が考慮されます:

  1. 単位時間あたりのリクエスト数
  2. 同時アクセス数
  3. 通常のリクエストとの挙動の差 (トラフィックパターン)
  4. HTTP Request Header (UAなど)
  5. Cookie
  6. JavaScript

ブロックされないためには、通常のリクエスト、つまり人間が普通に使うトラフィックと似せる必要があります。

例えば、ヘッドレスブラウザを使うのが一つの方法です。ヘッドレスブラウザを使ったコードの例は以下です:

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from random import randrange
import time
import pickle
import pandas as pd

url = "https://example.com/?q={}"
querys = pd.read_csv("queries.csv")['query'].tolist()

options = Options()
options.binary_location = '/usr/bin/chromium-browser'
options.add_argument('--headless')
options.add_argument('--window-size=1280,1024')
options.add_argument('--no-sandbox')

with open("proxylist.pkl", "rb") as f:
    proxies = pickle.load(f)

def build_driver(proxies, options, executable_path)
    proxy = proxies[randrange(0, len(proxies))]
    options.add_argument('--proxy-server=http://%s' % proxy['server'])
    options.add_argument('--proxy-auth=%s' % proxy['auth'])
    return webdriver.Chrome(executable_path=executable_path, chrome_options=options)

results = []
for query in queries:
   driver = build_driver(proxies, options, "chromedriverへのパス")
   driver.get(url.format(query))
   results.append(driver.page_source)
   time.sleep(randrange(10, 20))

ここで、「プロキシリスト」が用いられています。シングルIPの場合、ボットだと検出されやすくなります。一方で、プロキシのIPが150件ぐらい用意されていれば、ボットだと検出されにくくなります。

また、スリープ時間(インターバル)を設定し、ランダムな時間にすれば挙動を人間と区別することが困難になります。スリープ時間が長いほど、ブロックされる可能性が減るので、速度と検出される確率はトレードオフになります。

まとめ

  1. 速度を向上させるにはマルチスレッドが良い。
  2. アクセスブロックを回避するには、1)プロキシリストを使う, 2)ヘッドレスブラウザを使う, 3)インターバルの設定, が有効。

カンニングした生徒の割合を調べる方法

ベイズ推定を使えば、生徒から集めたアンケートを「正しく」用いてカンニングした生徒の割合を求めることができます。ここでは、そのアルゴリズムと算出方法を書いておきます。

プライバシーを守るアルゴリズム

各学生に面接をする方法を取る場合、生徒に直接「カンニングしましたか?」と聞いてしまっては、プライバシーが守られません。そこで、次のアルゴリズムでアンケートを収集します。

  1. 学生は面接者(先生)に"見られないように"、コイントスする。
  2. 表なら正直に答えてもらう。
  3. 裏が出たら、再度コイントスし、表なら「カンニングした」と言ってもらう。裏なら「カンニングしていない」と言ってもらう。

これにより先生は、その生徒が二回目のコイントスの結果として「カンニングした」といったのか、正直に告白したのかがわからなくなります。

モデル化

それでは、問題をモデル化します。

P(YES) = P(1回目表)P(カンニングした) + P(1回目裏)P(2回目表)
= p/2 + 1/4

pymcを使います。カンニングした割合は一様分布に従うと仮定します。また、アンケートデータは適当に収集済みであると仮定し、生徒100人中35人が「カンニングした」と答えたと仮定します。アンケートは二項分布であるとします。

モデル化したら、MCMCで事後分布からサンプリングし、カンニングした確率をトレースしてプロットします。このプロットされた確率は、事前分布の一様分布に、証拠である「アンケート」を取り入れた事後分布です。

with pm.Model() as model:
    p = pm.Uniform("freq_cheating", 0, 1)
    p_skewed = pm.Deterministic("p_skewed", 0.5*p + 0.25)
    yes_responses = pm.Binomial("number_cheaters", 100, p_skewed, observed=35)
    step = pm.Metropolis()
    trace = pm.sample(25000, step=step)
    burned_trace = trace[2500:]

figsize(12.5, 3)
p_trace = burned_trace["freq_cheating"]
plt.hist(p_trace, histtype="stepfilled", normed=True, alpha=0.85, bins=30, 
         label="posterior distribution", color="#348ABD")
plt.vlines([.05, .35], [0, 0], [5, 5], alpha=0.2)
plt.xlim(0, 1)
plt.legend();

download.png

事後分布からわかること

この事後分布からわかることは、カンニングはあった可能性が高いということです。観測データは、p=0の可能性を排除したので、カンニングが存在したという確信が高いと言えます。

参考

[1] https://github.com/CamDavidsonPilon/Probabilistic-Programming-and-Bayesian-Methods-for-Hackers

機械学習を使わずにニュースイベントを分類する

前回前々回、マルコフクラスタリングによってニュースイベントを分類しました。今回は、機械学習を使わず、nnlm-ja-dim128による分散表現と類似度に対するスレッショルドだけを用いてどの程度分類できるのか試します。

ニュースイベントの分類についての概要

ニュースの分類では、分類の抽象度を決める必要があります。

最も具体的なレベルの分類は「同一イベント」による分類です。例えば、「18日, 九州で地震」と「18日, 熊本で地震」はおそらく同一イベントとして考えることができますが、「22日, 関東で地震」は別のイベントです。

しかし、抽象度を上げて「日本の地震」という分類で考えると、前述の3つは同じ「タイプ」として分類されることになります。さらに「地震」という分類ならより抽象的になり、世界中の地震情報が同じタイプとして分類されます。さらに抽象度を上げれば「自然災害」ということになるでしょう。

一般的には、トピックモデリングやkmeansのような手法を用いる場合、トピック数やkの数が少ないほど抽象度が上がります。しかし、これらの手法で「同一イベント」という具体的なイベントを分類しようとすると、事前に「イベント数」をわかっている必要があるため、非現実的です。

そこで類似度の行列(を時間減衰関数と演算したもの)に対してマルコフクラスタリングを行うような手法を使えば同一イベントで分類できます。ただ、類似度の行列は記事数×記事数なので、計算量はO(n2)となり、さらに必要なメモリ領域もαn2だけ必要になります。

今回書く手法も、O(n2)の手法ですが、「機械学習を使わずに」どの程度分類できるかを試します。

パイプライン

  1. イベントチェーンを生成する。
  2. 各チェーンの中で同一のイベントがあればマージする。
  3. マージされなかった同一イベントを再グルーピングする。

モジュール

import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import pandas as pd


def cossim(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))


def texts_encoder(texts):
    with tf.Graph().as_default():
        embed = hub.Module("https://tfhub.dev/google/nnlm-ja-dim128-with-normalization/1")
        embeddings = embed(texts)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            sess.run(tf.tables_initializer())
            result = sess.run(embeddings)
    return result


def similar_event(word_vectors, source_idx, range_idx, threshold=0.85):
    similar_events = []
    source_vector = word_vectors[source_idx]
    for i in range(range_idx[0], range_idx[0]+range_idx[1]):
        if cossim(source_vector, word_vectors[i]) > threshold:
            similar_events.append(i)
    return similar_events


def grouping_events(events, word_vectors, threshold=0.8):
    results = []
    current_group = []
    
    for i, e in enumerate(events):
        current_group.append(events[i])
        if i == len(events)-1:
            results.append(current_group)
            current_group = []
        else:        
            if cossim(word_vectors[events[i]], word_vectors[events[i+1]]) > threshold:
                current_group.append(events[i+1])
            else:
                results.append(current_group)
                current_group = []

    if current_group:
        results.append(current_group)
        
    return results


def compute_instances_and_chains(df, word_vectors, size=None, day_column="day_fixed", time_decay=30, 
                                 chain_threshold=0.8, grouping_threshold=0.87):
    all_events = []
    event_instances = []
    event_chains = []
    
    if size is None:
        size = df.shape[0]
    
    for i in range(size):
        if i%1000 == 0:
            print("{}".format(i*100/df.shape[0]))
        if i in all_events:
            continue
        after_week = df.iloc[i][day_column] + pd.DateOffset(time_decay)
        today = df.iloc[i][day_column]
        len_data = df[today <= df[day_column]][df[day_column] <= after_week].shape[0]
        events = similar_event(word_vectors, i, (i, len_data), threshold=chain_threshold)
        if events == []:
            continue
        grouped_events = grouping_events(events, word_vectors, threshold=grouping_threshold)
        event_instances += grouped_events
        event_chains.append(events)
        all_events += events
        all_events = list(set(all_events))
    return event_chains, event_instances


def regrouping(grouped_events, word_vectors, threshold=0.87):
    results = []
    all_events = []
    current_group = []
    for i, ge1 in enumerate(grouped_events):
        if i in all_events:
            continue
        current_group = []
        current_group += ge1
        for j, ge2 in enumerate(grouped_events[i+1:]):
            if i+j+1 in all_events:
                continue
            if cossim(word_vectors[ge1[0]], word_vectors[ge2[0]]) > threshold:
                current_group += ge2
                all_events += [i, i+j+1]
        results.append(list(set(current_group)))
    return results


def execute(df, word_vectors, size=None, day_column="day_fixed", time_decay=14, 
            chain_threshold=0.8, grouping_threshold=0.87, 
            regrouping_threshold=0.87, regrouping_level=1):

    event_chains, grouped_events = compute_instances_and_chains(
        df, word_vectors, size, day_column, time_decay, chain_threshold, grouping_threshold)

    regrouped_events = grouped_events
    for i in range(regrouping_level):
        regrouped_events = regrouping(regrouped_events, word_vectors, regrouping_threshold)
    
    return event_chains, regrouped_events

jupyterで実行

データは以下をダウンロードして展開してから使います。 https://github.com/sugiyamath/credibility_analysis/blob/master/notebook/data2.7z

In[1]:

import evolution
import pandas as pd
import MeCab
import numpy as np

df = pd.read_csv("../data/fixed_data.csv")
df['day_fixed'] = pd.to_datetime(df['day_fixed'])

tagger = MeCab.Tagger("-Owakati")

def data_define(d, tokenize=tagger.parse, c1="title", c2="body", sep="。", size=1):
    return tokenize('。'.join([d[1][c1]] + d[1][c2].split(sep)[:size]))

df['data'] = list(map(data_define, df.iterrows()))
df = df[['title', 'data', 'day_fixed']].sort_values(by="day_fixed")
word_vectors = evolution.texts_encoder(df['data'].tolist())
chains, instances = evolution.execute(df, word_vectors, size=5000, chain_threshold=0.85, regrouping_level=1)

In[2]:

for i, e in enumerate(regrouped_instances):
    if df.iloc[e].shape[0] > 1:
        print("[event {}]".format(i))
        print(df.iloc[e].sort_values(by="day_fixed").drop_duplicates()['title'])
        print()

Out[2]:

...[略]...

[event 1553]
38502       日本経済にグローバルな追い風、YCCが効果を増幅=日銀総裁
37729      日銀総裁、トランプ政策で世界経済加速 物価目標に追い風=報道
37727      米製造業改善で雇用引き締まり、経済の健全さ示唆=地区連銀報告
37552    判断維持「一部に改善遅れも、緩やかな回復基調」=1月月例経済報告
Name: title, dtype: object

[event 1560]
38577      米政権移行で米中関係は新たな不確実性に直面へ=中国外相
37458    米新政権と対話の用意、「一つの中国」尊重など前提=王毅外相
15440      米大統領報道官、中国への対抗措置を示唆 南シナ海問題で
36698      公平な対中貿易競争環境、トランプ氏「かなり早く実現へ」
Name: title, dtype: object

[event 1562]
12111    沖縄でオスプレイ「事故」 米海兵隊5人負傷
12089      オスプレイ、「事故」受け飛行を一時停止
Name: title, dtype: object

[event 1569]
12109        訪日前のプーチン氏、秋田犬「ゆめ」をお披露目
15676    プーチン大統領、日本から贈られた秋田犬を連れて会見に
Name: title, dtype: object

[event 1575]
38654    ベルリンのトラック突入、イスラム国が犯行声明=AMAQ通信
11982          IS、独トラック突入で犯行声明 「兵士が実行」
15639      独トラック突入、実行犯は依然逃走か ISISが犯行声明
15559        トルコ銃乱射、ISISが犯行声明 容疑者の捜索続く
Name: title, dtype: object

[event 1585]
12100    IS、シリア・パルミラで政府軍が放棄した戦闘装備を入手 米国防総省
12093              IS、パルミラで防空兵器入手か 米軍幹部が指摘
Name: title, dtype: object

[event 1591]
38888    米鉱工業生産予想以上の落ち込み、公益・製造業重しに
38563         米個人消費支出伸び鈍化、景気減速の可能性
Name: title, dtype: object

[event 1592]
38887             11月米小売売上高は微増、消費依然底堅く
38563             米個人消費支出伸び鈍化、景気減速の可能性
38091       米11月の卸売在庫は前月比1%増、2年ぶりの高い伸び
37755    米12月CPIは前年比2.1%上昇 2年半ぶりの大きな伸び
Name: title, dtype: object

[event 1594]
38892    南シナ海の人工島すべてに防衛施設、中国設置か=米シンクタンク
15678             中国、南シナ海の人工島に兵器システムを配備
38765      アングル:南シナ海の防衛施設、中国は反撃準備する権利主張
Name: title, dtype: object

[event 1596]
12119    チュニジア裁判所、妊娠した13歳少女の結婚認める NGOは怒り
15670          妊娠の13歳少女の結婚承認、親類男性と チュニジア
Name: title, dtype: object

[event 1600]
38893    米FOMCが0.25%利上げ、来年は3回の利上げ予想
38863     米FOMC、1年ぶりの利上げを決定:識者はこうみる
38890               イエレン米FRB議長の会見要旨
37114       米FOMC、金利据え置き 景気判断は依然前向き
37112           FOMC、金利据え置き:識者はこうみる
Name: title, dtype: object

[event 1602]
38891            米国株式市場は反落、来年の利上げペース加速を懸念
38836            米国株式市場は反発、利上げ加速観測で金融株に買い
38709              米国株は反発、独のトラック突入受け上げ幅縮小
38394                  米国株式市場は反落、幅広い銘柄に売り
38375                  米国株式市場は小幅続落、金融株に売り
38348    米国株が続落、アップルなどハイテク株に売り 年間ではプラスで終了
38319              米国株式市場は反発、ハイテク株や通信株に買い
38277              米国株式市場は続伸、消費関連株主導で買われる
37945          米国株は反落、決算発表前の買い手控えや政策不透明感で
37803             米国株は下落、米大統領選後の勝ち組銘柄売られる
37485           米国株式市場は反発、S&Pとナスダックが最高値更新
Name: title, dtype: object

...[略]...

考察

コードが行っているのは

1)起点記事との類似度が一定を超える記事をチェーンに追加 2)チェーン内の隣接ノードの類似度が一定を超えれば同一イベントとしてマージ 3)その他にマージしたほうがよさそうなものを類似度で評価してregroupingする

という三段階をとっています。

つまり、2つの文のベクトル表現の類似度と、スレッショルドの値だけで分類してこれだけ分類できてしまうわけです。

スレッショルドの値を変えてみればわかりますが、precisionとrecallのトレードオフ問題が生じます。thresholdが低すぎると、分類の抽象性が上がり、同一イベントではないものも含まれてしまいます。しかしthresholdが高すぎると、同一イベントが検出されなくなってしまいます。

試してみた感覚では、チェーン生成の段階ではthresholdを少し低めに設定して、同一イベント検出の段階で高いthresholdを設定すると良い結果が出ます。

ただ、計算量がO(n2)なので、この点では改善が必要です。特にregroupingの処理で結構時間がかかってしまうみたいです。

ちなみに、コサイン類似度にscikit-learnのものを使ってみましたが、速度が遅かったので私の先輩が実装した高速版の関数に変更しました。

参考

[1] http://kbcom.org/papers/KBCOM_2018_paper_1.pdf

MCLによるクラスタに後から割り当てる

前回の記事では、マルコフクラスタリングによってニュースイベントを分類しました。今回は、こちらの論文 (Unsupervised Event Clustering and Aggregation from Newswire and Web Articles) を日本語で実行します。

パイプライン

[クラスタリングフェーズ] 1. 記事をスクレイピングしておく。 2. 記事をBoWでベクトル化。 3. ベクトル化した記事をAPSSで行列化。 4. 類似度行列をMCLに渡す。

[割り当てフェーズ] 1. 割り当てたい記事をmonolingual word alignment systemでスコアリング。 2. 各クラスタに対し、スコアの最も高い記事を、クラスタリングフェーズで求めたクラスタに割り当てる。

monolingual word alignment system (MWAS)

まず、このMWALをイチから実装する必要があります。実装のために、TDDを使いました。テストケースから先に書いて、小さなステップを繰り返す方法です。

MWASは、論文内の以下の図と論文の説明を見て、おおよそ推測しました:

Screenshot_2018-07-13_19-36-52.png

まあ、テストケースを見てもらえばわかると思います。

テストケースは何度か修正しましたが、リファクタリングを繰り返した結果として以下のようなモジュールが完成しました。

import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from functools import partial
from sklearn.metrics.pairwise import cosine_similarity


def texts_encoder(texts):
    with tf.Graph().as_default():
        embed = hub.Module("https://tfhub.dev/google/nnlm-ja-dim128/1")
        embeddings = embed(texts)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            sess.run(tf.tables_initializer())
            result = sess.run(embeddings)
    return result


def align(base, target, threshold=0.50):
    word_vectors = texts_encoder(base+target)
    base_vectors = word_vectors[:len(base)]
    target_vectors = word_vectors[len(base):]
    return solve(target, base_vectors, target_vectors, threshold)


def solve(target, base_vectors, target_vectors, threshold=0.50):
    similarities = cosine_similarity(base_vectors, target_vectors)
    results = []
    for similarity in similarities:
        if max(similarity) > threshold:
            results.append(target[np.argmax(similarity)])
        else:
            results.append(None)
    return results


def aligns(base, targets, threshold=0.50):
    ls = [len(base)] + [len(target) for target in targets]
    pt = 0

    data = base
    for target in targets:
        data += target
    
    word_vectors = texts_encoder(data)
    target_vectors = []
    for i, l in enumerate(ls):
        if i == 0:
            base_vector = word_vectors[:l]
        else:
            target_vectors.append(word_vectors[pt:pt+l])
        pt += l

    return [solve(target, base_vector, target_vector, threshold)
                 for target, target_vector in zip(targets, target_vectors)]


def sum_aligns(Bs):
    return np.sum(Bs, axis=0)


def W(A, Bs):
    A = np.array(A)
    tmp_W = []
    for B in Bs:
        tmp_W.append(A * B)
    return np.array(tmp_W)
    

def aligns2Bs(aligns):
    return [list(map(lambda x: x is not None, align)) for align in aligns]


def score(Ws):
    return np.sum(Ws, axis=1)


def execute(base_sentence, target_sentences, tokenize, stps, threshold=0.5):
    func = partial(fmt_sentence, tokenize=tokenize, stps=stps)
    base = func(base_sentence)
    targets = list(map(func, target_sentences))
    als = aligns(base, targets, threshold=threshold)
    Bs = aligns2Bs(als)
    A = sum_aligns(Bs)
    Ws = W(A, Bs).tolist()
    return score(Ws)


def fmt_sentence(sentence, tokenize, stps):
    return [term for term in tokenize(sentence).split() if term not in stps]

最終的に、テストケースが以下のようになりました:

import unittest
import numpy as np
import sys
sys.path.append("..")
import align
from nltk.corpus import stopwords
import MeCab


class TestAlign(unittest.TestCase):
    def test_align_1(self):
        base_sentence = "京都市 洪水 被害".split(" ")
        target_sentence = "洪水 3人 死亡 京都市".split(" ")
        aligned = align.align(base_sentence, target_sentence)
        true_align = ["京都市", "洪水", None]
        self.assertEqual(aligned, true_align)

    def test_align_2(self):
        base_sentence = "アメリカ 保護貿易 蔓延 関税 引き上げ".split(" ")
        target_sentence = "関税 問題 米国 保護貿易 推進".split(" ")
        aligned = align.align(base_sentence, target_sentence)
        true_align = ["米国", "保護貿易", None, "関税", None]
        self.assertEqual(aligned, true_align)

    def test_aligns(self):
        base_sentence = "アメリカ 保護貿易 蔓延 関税 引き上げ".split(" ")
        targets =[
            "関税 問題 米国 保護貿易 推進".split(),
            "アメリカ 主導 世界 経済".split(),
            "保護貿易 自由貿易 比較".split()
        ]
        aligns = align.aligns(base_sentence, targets, threshold=0.50)
        true_aligns = [
            ["米国", "保護貿易", None, "関税", None],
            ["アメリカ", None, None, None, None],
            [None, "保護貿易", None, None, None]
        ]

        for aligned, true_aligns in zip(aligns, true_aligns):
            self.assertListEqual(aligned, true_aligns)
        
    def test_aligns2Bs(self):
        base_sentence = "アメリカ 保護貿易 蔓延 関税 引き上げ".split(" ")
        targets =[
            "関税 問題 米国 保護貿易 推進".split(),
            "アメリカ 主導 世界 経済".split(),
            "保護貿易 自由貿易 比較".split()
        ]
        aligns = align.aligns(base_sentence, targets, threshold=0.50)
        Bs = align.aligns2Bs(aligns)
        true_Bs = [
            [True, True, False, True, False],
            [True, False, False, False, False],
            [False, True, False, False, False]
        ]

        for B, true_B in zip(Bs, true_Bs):
            self.assertListEqual(B, true_B)

    def test_sum_aligns(self):
        base_sentence = "アメリカ 保護貿易 蔓延 関税 引き上げ".split(" ")
        targets =[
            "関税 問題 米国 保護貿易 推進".split(),
            "アメリカ 主導 世界 経済".split(),
            "保護貿易 自由貿易 比較".split()
        ]
        aligns = align.aligns(base_sentence, targets, threshold=0.50)
        Bs = align.aligns2Bs(aligns)
        A = align.sum_aligns(Bs).tolist()
        true_A = [2,2,0,1,0]

        self.assertListEqual(A, true_A)

    def test_W(self):
        base_sentence = "アメリカ 保護貿易 蔓延 関税 引き上げ".split(" ")
        targets =[
            "関税 問題 米国 保護貿易 推進".split(),
            "アメリカ 主導 世界 経済".split(),
            "保護貿易 自由貿易 比較".split()
        ]
        aligns = align.aligns(base_sentence, targets, threshold=0.50)
        Bs = align.aligns2Bs(aligns)
        A = align.sum_aligns(Bs)
        Ws = align.W(A, Bs).tolist()
        true_Ws = [
            [2,2,0,1,0],
            [2,0,0,0,0],
            [0,2,0,0,0]
        ]
        for W, true_W in zip(Ws, true_Ws):
            self.assertListEqual(W, true_W)

    def test_fmt_sentence(self):
        tagger = MeCab.Tagger("-Owakati")
        tokenize = tagger.parse
        stps = stopwords.words("japanese")        
        test_sentence = "アメリカが主導する世界の経済の行方は!?"
        sent_fmt = align.fmt_sentence(test_sentence, tokenize, stps)
        true_fmt = ["アメリカ", "主導", "世界", "経済", "行方"]
        self.assertListEqual(sent_fmt, true_fmt)

    def test_score(self):
        Ws = [
            [2,2,0,1,0],
            [2,0,0,0,0],
            [0,2,0,0,0]
        ]
        score = align.score(Ws).tolist()
        true_score = [5, 2, 2]
        self.assertListEqual(score, true_score)

    def test_execute(self):
        tagger = MeCab.Tagger("-Owakati")
        tokenize = tagger.parse
        stps = stopwords.words("japanese")        
        base_sentence = "アメリカ、保護貿易蔓延、関税引き上げか"
        targets =[
            "関税問題で米国、保護貿易を推進",
            "アメリカ主導の世界経済",
            "保護貿易と自由貿易を比較"
        ]
        ss = align.execute(base_sentence, targets, tokenize, stps, threshold=0.5).tolist()
        self.assertListEqual([9, 5, 7], ss)


if __name__ == "__main__":
    unittest.main()

jupyter notebook

クラスタリングフェーズでは、前回の記事とほぼ同じことを実行しますが、時間減衰関数をかけ合わせます。記事数が多い場合にも対応できるよう、スパース化します。

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from nltk.corpus import stopwords
import itertools
import markov_clustering as mc
import pandas as pd
import numpy as np

target_afp = pd.read_csv("target_afp.csv")

def mul_sim(similarities, days):
    for i, d1 in enumerate(tqdm_notebook(days)):
        d = np.array([1/np.exp(diff_date(d1, d2)/24.0) for d2 in days])
        similarities[i, :] = similarities[i, :].multiply(d)
    return similarities

days =targets_afp['day_fixed'].tolist()
stps = stopwords.words("japanese")
stps = stps + [''.join(list(stp)) for stp in list(itertools.product(stps, stps))]
count_vectors = CountVectorizer(stop_words=stps).fit_transform(targets_afp['data'])
similarities = cosine_similarity(count_vectors, dense_output=False)
similarities = mul_sim(similarities, days)
result = mc.run_mcl(similarities)
clusters = mc.get_clusters(result)

クラスタ番号10は以下のようになりました:

[[cluster 10]]]
0) :金 正男 氏 殺害 で 女 1 人 を 逮捕 遺体 安置 の 病院 に は 北朝鮮 大使館 員 の 姿 も 北朝鮮 の 金 正 恩 ( キム ・ ジョン ウン 、 Kim Jong - Un ) 朝鮮 労働党 委員 長 の 異母 兄 、 金 正男 ( キム ・ ジョンナム 、 Kim Jong - Nam ) 氏 が マレーシア ・ クアラルンプール ( Kuala Lumpur ) の 空港 で 殺害 さ れ た 事件 で 、 マレーシア の 警察 当局 は 15 日 、 容疑 者 の 女 1 人 を 逮捕 し た 

1) :韓国 政府 、 金 正男 氏 の 暗殺 を 確認 大統領 代行 「 北 の 残忍 性 示す 」 ( 更新 ・ 写真 追加 ) 韓国 政府 は 15 日 、 北朝鮮 の 金 正 恩 ( キム ・ ジョン ウン 、 Kim Jong - Un ) 朝鮮 労働党 委員 長 の 異母 兄 、 金 正男 ( キム ・ ジョンナム 、 Kim Jong - Nam ) 氏 が マレーシア ・ クアラルンプール ( Kuala Lumpur ) の 空港 で 暗殺 さ れ た と の 報道 は 事実 だ と 確認 し た 

2) :北朝鮮 の 金 正男 氏 、 マレーシア で 暗殺 か 韓国 メディア 報道 ( 更新 、 写真 追加 ) 北朝鮮 の 金 正 恩 ( キム ・ ジョン ウン 、 Kim Jong - Un ) 朝鮮 労働党 委員 長 の 異母 兄 、 金 正男 ( キム ・ ジョンナム 、 Kim Jong - Nam ) 氏 が 、 マレーシア で 北朝鮮 の 工作 員 の 女 により 毒 針 を 使っ て 暗殺 さ れ た と 、 韓国 メディア が 14 日 、 報じ た 

このクラスタ10に、新たな記事を割り当てます。MWASを使います。

import sys
sys.path.append("../module/")
import align

targets = pd.read_csv("targets.csv")

scores = align.execute(base_sentence=targets_afp['title'].iloc[clusters[10][0]], target_sentences=targets['title'].tolist(), tokenize=tagger.parse, stps=stps, threshold=0.5).tolist()
scores = np.array(scores)

threshold = 1500

targets['title'][scores > threshold]

クラスタ10に新しく割り当てた記事↓

10989    金正男氏殺害で女1人を逮捕 遺体安置の病院には北朝鮮大使館員の姿も
36523           金正男氏は北朝鮮の女性工作員2人が殺害=韓国情報当局
36545            金正男氏の死亡、米は北朝鮮が殺害との見方=米政府筋

どうやら、イベントは概ね正しく分類できたようです。

おわりに

実は、スクレイピングしたデータのサイズが大きすぎてgithubにアップロードできなかったので、再現してもらうには自前でスクレイピングしてもらう必要があります。ここでは、マルコフクラスタリングによるクラスタリング結果に対し、新たなデータを割り当てる方法を試していき、それなりの結果を出すことがわかりました。

参考

[0] http://www.aclweb.org/anthology/W17-4211

追記

2018/07/13 20:03

何気に、nltkのストップワードでjapaneseを指定していますが、これはnltk_dataに以下を入れて実行しています。 https://github.com/sugiyamath/credibility_analysis/blob/master/japanese