ナード戦隊データマン

機械学習と自然言語処理についてのブログ

warc_crawler: warcファイルでWebをクロール

Web ARChive(WARC)1アーカイブ形式は、複数のデジタルリソースを関連情報とともに集約アーカイブファイルに結合する方法です。 WARC形式は、インターネットアーカイブのARCファイル形式の改訂版で、World Wide Webから収集したコンテンツブロックのシーケンスとして「ウェブクロール」を保存するために伝統的に使用されてきました。WARC形式は、古い形式を一般化して、アーカイブ組織の収集、アクセス、および交換のニーズをより適切にサポートします。 現在記録されているプライマリコンテンツの他に、割り当てられたメタデータ、短縮された重複検出イベント、後の日付の変換など、関連するセカンダリコンテンツに対応します。

github.com

概要

bitextor2のようなツールでは、クロールしたデータをwarc形式で保持しています。そのため、自前の外部クローラを使う場合、warcで出力すれば、bitextorに読み込むことが可能です。

warc_crawler

warc_crawler3というクローラを作ってみましたが、これはWebをwarc形式で保存しながらクロールするツールです。実行フローは以下のようになっています:

  1. urls.txtを読み込む。
  2. urls.txtをスレッド数だけ分割する。
  3. それぞれのスレッドに分割されたurls.txtの最初のURLを読み込む。最初のURLに対応するwarcファイルが存在する場合は次を読み込む。
  4. wgetでwarcファイルとしてミラーを取得。timeoutも指定。
  5. warcファイル内に存在するすべてのホスト名を各スレッドのurls.txtに追加する。
  6. 3へ戻る。

コード

extract_links.py

warcからリンクを抽出するモジュール。

import re

regex = re.compile(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+")
regex2 = re.compile(r"https?://")


def extract(text):
    return {re.sub(regex2, "", x) for x in re.findall(regex, text)}


def extract_from_warc(warc_file):
    with open(warc_file, encoding="utf-8", errors="ignore") as f:
        urls = extract(f.read())
    return urls

warc_crawler.py

クローラ本体。

import os
import subprocess
import pymp
import extract_links as el
from shutil import copyfile

template_wget = 'timeout 100 wget --mirror -4 -q {HOST} -P {DPATH} --warc-file "{WPATH}/{HOST}" -A.html -o wget.log'
template_sed = 'sed -i \'1d\' "{}"'


def wget(host, data_path="data", warc_pach="warcs"):
    cmd = template_wget.format(HOST=host, DPATH=data_path, WPATH=warc_pach)
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    (output, err) = p.communicate()
    p_status = p.wait()
    return output, err


def update_urlfile(urlfile, urls):
    first_line = False
    with open(urlfile, "a") as f:
        f.write('\n'.join(list(urls)))
    with open(urlfile) as f:
        for line in f:
            first_line = line.strip()
            break
    cmd = template_sed.format(urlfile)
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    (output, err) = p.communicate()
    p_status = p.wait()
    return first_line


def process(i, urls, tmp_dir, url_prefix, data_path, warc_path):
    urlfile = os.path.join(tmp_dir, url_prefix + str(i) + ".txt")
    first_line = update_urlfile(urlfile, urls)
    print(first_line)
    warc_file = os.path.join(warc_path, first_line + ".warc")
    if not os.path.isfile(warc_file):
        wget(first_line, data_path, warc_path)
        urls = el.extract_from_warc(warc_file)
    else:
        process(i, [], tmp_dir, url_prefix, data_path, warc_path)
    return urls


def crawl(tmp_dir="tmp",
          url_prefix="urls",
          data_path="data",
          warc_path="warcs",
          n_thread=8):
    urlfile = url_prefix + ".txt"
    for i in range(n_thread):
        copyfile(urlfile, os.path.join(tmp_dir, url_prefix + str(i) + ".txt"))
    urls_list = [[] for i in range(n_thread)]
    with pymp.Parallel(n_thread) as p:
        while (True):
            for index in p.range(0, n_thread):
                urls_list[index] = process(index, urls_list[index], tmp_dir,
                                           url_prefix, data_path, warc_path)


if __name__ == "__main__":
    crawl()

Note: 初期urls.txtは、スレッド数以上のurlを記述する必要があります。

参考