データナード

機械学習と自然言語処理についての備忘録 (旧ナード戦隊データマン)

記事に対するローカルクローラの抽象化

ローカルクローラは、「特定のサイト内に限定したクローラ」とここでは定義するとすると、さまざまなサイトに対して異なるクローラが存在するはずです。しかし、それらのローカルクローラの共通の特徴はなんでしょうか。

グローバルクローラとローカルクローラ

グローバルクローラ、ローカルクローラという用語は、単にここで勝手に私が思いつきで名付けたものです。

以下の違いがあります。

  • グローバルクローラ: 外部へのリンクを含むすべてのリンクをたどる。
  • ローカルクローラ: ターゲットサイトの内部リンクのみをたどる。

今回考えるのは、内部リンクのみをたどるローカルクローラです。

サイトの種類

サイトにはいくつかの種類があります。

自然言語処理で必要になるデータは、主にテキストデータなので、「記事」という形態のサイトをクロールすることはよくあります。今回考えるのは、記事に対するローカルクローラについてです。

一般的な流れ

  1. ニュースサイト内で記事を検索すると、記事の一覧を見ることができます。
  2. 記事の一覧ページでは、各々の記事のユニークURLを取得することができます。
  3. ユニークURLへ移動すると、記事をみることができます。
  4. この記事URLから必要なコンテンツを選び出してダウンロードします。ただし、コンテンツから必要な部分を選び出す、という処理はダウンロード後にできるので、HTMLをそのままダウンロードします。
  5. これを、すべての記事に対して繰り返します。
  6. もしすべての記事がダウンロードできたら、最新のコンテンツを監視し、更新があればダウンロードします。

抽象化

今回行った最初の設計は、以下のようになっています:

f:id:mathgeekjp:20191202081206j:plain

これは単なるラフスケッチですが、"agent"という用語を選択した理由がわかると思います。この場合、agentはターゲットサイトとのやり取りの代理者として書かれています。クラス図として詳細化すると、以下のようになります。

f:id:mathgeekjp:20191129135034g:plain

  • サイトを徘徊するには、ブラウザへの接続が必要です。それをWebDriverと呼びます。WebDriverはDriverManagerによって管理されます。
  • マルチリンガルなサイトは言語ごとに表示の仕組みが異なります。これをlang_idで制御します。
  • 全件検索を行う際には、ページ番号が存在するため、これをpageで制御します。
  • サイト全体から記事URLを抽出する仕組みをSearchと呼びます。SearchはSearchAgentの責務です。
  • 記事URLからHTMLを取得することをDownloadと呼びます。DownloadはDownloadAgentの責務です。
  • SearchとDownloadを繰り返すことをCrawlと言います。Crawlは継続的に実行されます。Crawlerの責務です。

ただし、lang_idとpageについては具体的すぎるため、もっと抽象化する必要があるでしょう。おそらく、「query」という用語はより抽象的です。

重要な点はSearchとDownloadを繰り返すことでクロールする、という点です。これは多くのクローラの共通の動作です。

もし、クロールの目的が一般的なサーチエンジンを作ることであれば、Searchの仕組みは単にすべてのリンクを抽出することを意味します。

しかし、ローカルクローラの場合、往々にして特定のコンテンツだけが欲しい場合があるため、特定の条件を満たすURLのみを知りたいわけです。

コード

主要なのロジックは以下のように書かれています:

import os
import sys
import traceback

from selenium.common.exceptions import WebDriverException
from datetime import datetime
from kyotocabinet import DB
from downloader import DownloadAgent
from driver_manager import DriverManager
from searcher import SearchAgent


class Crawler:
    def __init__(self, outdir, logdir, opts=None, wtime=30):
        self._visited_db = DB()
        self._visited_db.open(os.path.join(logdir, "visited.kch"))
        self._drvmng = DriverManager(opts, wtime)
        self._dagent = DownloadAgent(outdir, self._drvmng.get_driver())
        self._sagent = SearchAgent(self._drvmng.get_driver())
        self._logdir = logdir
        self._outdir = outdir
        self._opts = opts
        self._wtime = wtime

    def crawl(self):
        while True:
            lang_id = 0
            while lang_id < self._sagent.len_langs:
                should_inc, max_page = self._maxpage(lang_id)
                if should_inc:
                    lang_id += 1
                page = 1
                while page <= max_page:
                    should_inc, urls = self._search(lang_id, page)
                    if should_inc:
                        page += 1
                    i = 0
                    while i < len(urls):
                        url = urls[i]
                        should_inc = self._download(lang_id, page, url)
                        if should_inc:
                            i += 1

    def _maxpage(self, lang_id):
        try:
            max_page = self._sagent.extract_max_pagenum(lang_id)
        except WebDriverException:
            self._write_log("lang_id={}".format(lang_id))
            self._refresh()
            return False, 0
        except Exception:
            self._write_log("lang_id={}".format(lang_id))
            return True, 0
        return True, max_page

    def _search(self, lang_id, page):
        try:
            urls = list(self._sagent.search(lang_id, page))
        except WebDriverException:
            self._write_log("lang_id={}, page={}".format(lang_id, page))
            self._refresh()
            return False, []
        except Exception:
            self._write_log("lang_id={}, page={}".format(lang_id, page))
            return True, []
        return True, urls

    def _download(self, lang_id, page, url):
        if self._visited_db.get(url) is None:
            try:
                data = self._dagent.download(url)
            except WebDriverException:
                self._write_log("lang_id={}, page={}, url={}".format(
                    lang_id, page, url))
                self._refresh()
                return False
            except Exception:
                self._write_log("lang_id={}, page={}, url={}".format(
                    lang_id, page, url))
                return True
            for key in data.keys():
                self._visited_db.set(key, 1)
                self._write_progress(key)
        return True

    def _refresh(self):
        self._drvmng.refresh_driver(self._opts, self._wtime)
        self._dagent = DownloadAgent(self._outdir, self._drvmng.get_driver())
        self._sagent = SearchAgent(self._drvmng.get_driver())

    def _write_progress(self, x):
        current_time = datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f")
        progress_path = os.path.join(self._logdir, "progress.log")
        with open(progress_path, "a") as f:
            f.write("{}\t{}\n".format(current_time, x))

    def _write_log(self, x):
        current_time = datetime.now().strftime("%Y-%m-%d_%H:%M:%S.%f")
        error_path = os.path.join(self._logdir, "exceptions.log")
        with open(error_path, "a") as f:
            f.write("datetime:{}\n".format(current_time))
            f.write("info:{}\n".format(x))
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback.print_exception(exc_type,
                                      exc_value,
                                      exc_traceback,
                                      file=f)
            f.write("\n")


if __name__ == "__main__":
    crawler = Crawler(outdir=sys.argv[1], logdir=sys.argv[2])
    try:
        crawler.crawl()
    finally:
        crawler._drvmng.close_driver()

継続運用性について

ローカルクローラを動かす場合、基本的には「停止せずに動き続ける」ものを作りたいと考えます。しかし、継続的に動作させることの課題には以下のようなものがあります。

  • 同じノードへの再訪コストがかかる。
  • 予期せぬエラーで止まる。
  • 進捗が不明。

まず、おなじノードへの再訪を防ぐための方法の一例として、key-value ストアを使うことができます。膨大なノードをメモリ上に置くのはコストが高いため、ストレージにおけるようにします。今回使っているのは、kyotocabinetというツールです。これを使えば、再訪ノードのスキップがかんたんに行なえます。しかも、仮になんらかの理由で停止させたとしても、再実行させたときに再訪ノードがスキップされるので、すぐに実行状態を復活させることができます。

次に、予期せぬエラーで止まってしまうと継続運用ができません。そこで、あらゆる例外を補足しておき、それをログに書き込むだけにとどめておきます。実のところ、重大なエラーが発生した場合は即座に実行停止したほうがいいのですが、ログに書き込んでおけばそれを通知する仕組みをつくることもできるので、重大な通知が来たら止めるようにして、それ以外は無視します。

進捗が不明であるという問題は、単に進捗をログに書き込めばよいだけです。この進捗ログが増加していれば、クローラは継続運用できています。進捗ログが止まる可能性は、エラーが発生したか、または訪問していないノードが見つからないかのどちらかです。

フィルター&パイプ手法の是非

SearchAgentとDownloadAgentをそれぞれ独立のフィルターとして作成し、

cat search_keyword.txt | python3 searcher.py | python3 downloader.py

のような実行方法も考えられなくはありません。しかし、この実行方法には「それぞれのプログラムが異なるドライバーを持つ」「リクエストが増加する」というデメリットがあります。

もし、グローバルクローラであれば並列的に実行させることは合理的かもしれませんが、ローカルクローラの場合、特定のサイトに対してリクエストが送られるため、リクエストの増加がDoS攻撃的な挙動になってしまうことになります。それを防ぐには、Crawlerというようなクラスを作ることで、SearchとDownloadを統合するほうが良いでしょう。統合によってリクエストは減り、time.sleepのタイミングもとりやすくなります。