データナード

機械学習と自然言語処理についての備忘録 (旧ナード戦隊データマン)

slurm上でlangstat_generatorを実行

langstatは、各言語のデータが各ドメインにどのくらいあるのかを表す統計ですが、これを生成するためにlangstat_generatorというものを作っております。CommonCrawlは膨大なデータがある上に、cc_netと同様の方法で生成するという要件があったため、slurm上で実行できるようにしました。

github.com

概要

実行方法の詳細は割愛しますが、コード全体は以下の私のgithubプロジェクトからダウンロードできます:

https://github.com/sugiyamath/langstat_generator

sbatchに渡すスクリプトは以下のようになっています:

[run.sh]

#!/bin/bash

#SBATCH -N 7
#SBATCH -n 7
#SBATCH -c 16
#SBATCH -t 08:00:00


sudo timedatectl set-timezone UTC
sudo timedatectl set-ntp true
sudo timedatectl set-local-rtc 0

PY_PATH=./venv/bin/python
OUT_DIR=./out
WET_PATH=./wet.paths.gz
BIN_DIR=./langstat/bin
MAX_SHARD_NUM=4
MAX_NODE_NUM=6

mkdir -p ${OUT_DIR}
mkdir -p ./cc_data
mkdir -p ./langstat

for i in $(seq 0 ${MAX_NODE_NUM}); do
    srun -N 1 -n 1 bash execute.sh ${WET_PATH} ${PY_PATH} ${i} ${MAX_SHARD_NUM} ${OUT_DIR} ${BIN_DIR} &
done
wait

実行するタスクはexecute.sh内で定義されています:

[execute.sh]

#!/bin/bash

WET_PATH=$1
PY_PATH=$2
NODE_ID=$3
MAX_SHARD_NUM=$4
OUT_DIR=$5
BIN_DIR=$6

sudo timedatectl set-timezone UTC
sudo timedatectl set-ntp true
sudo timedatectl set-local-rtc 0

rm -r /tmp/tmp*
sudo umount -l ./cc_data
sudo umount -l ./langstat
./goofys commoncrawl ./cc_data
./goofys langstat ./langstat
#zcat ${WET_PATH} | head -n 1120 | awk '{print "https://commoncrawl.s3.amazonaws.com/"$0}' | ${PY_PATH} main.py ${NODE_ID} ${MAX_SHARD_NUM} ${BIN_DIR} ${OUT_DIR}
zcat ${WET_PATH} | head -n 1120 | awk '{print "./cc_data/"$0}' | ${PY_PATH} main.py ${NODE_ID} ${MAX_SHARD_NUM} ${BIN_DIR} ${OUT_DIR} | ${PY_PATH} scripts/timer_out.py

execute.sh内のmain.pyは以下のようになっています:

[main.py]

import gc
import os
import sys
import time
import shutil
import tempfile
import datetime

import hash_creator
import lang_separator
import lm_scoring
import data_downloader
import utils
import sharding
from functools import partial


def _pt(st):
    return str(datetime.timedelta(seconds=time.time() - st))


def _plog(msg, nid, sid, st):
    print("{}\t{}\tnid:{}\tsid:{}\tmsg:{}".format(
        datetime.datetime.now(), _pt(st), nid, sid, msg),
          flush=True)


def _rmall(target_dir):
    for fname in os.listdir(target_dir):
        fpath = os.path.join(target_dir, fname)
        os.unlink(fpath)


def _mvall(suffix, source_dir, target_dir):
    for fname in os.listdir(source_dir):
        if fname.endswith(suffix):
            shutil.move(os.path.join(source_dir, fname),
                        os.path.join(target_dir, fname))


def main(node_id,
         max_shard_num,
         bin_dir,
         out_dir,
         total_nodes=7,
         wet_per_shard=50,
         num_cpus=12,
         num_dl_parallel=0):
    node_id = int(node_id)
    max_shard_num = int(max_shard_num)
    total_nodes = int(total_nodes)
    wet_per_shard = int(wet_per_shard)
    num_cpus = int(num_cpus)
    num_dl_parallel = int(num_dl_parallel)
    is_target_http = None

    st = time.time()
    ss = ','.join(map(str, list(range(max_shard_num))))
    _plog_t = partial(_plog, nid=node_id, sid=ss, st=st)
    _plog_t("TASK START")
    langs = utils.available_langs(bin_dir)
    
    if num_dl_parallel > 0:
        all_targets = [x.strip() for x in sys.stdin]
        is_target_http = True
    else:
        all_targets = [x.strip() for x in sys.stdin if "CC-MAIN" in x]
        is_target_http = False

    for shard_id in range(max_shard_num):
        _plog_s = partial(_plog, nid=node_id, sid=shard_id, st=st)
        _plog_s("start shard")
        score_outpath = None
        langstat_outpath = None
        with tempfile.TemporaryDirectory(dir="/tmp") as tmp_dir:
            try:
                #targets = all_targets
                targets = list(
                    sharding.sharding(all_targets,
                                      node_id,
                                      shard_id,
                                      total_nodes,
                                      wet_per_shard))
                if is_target_http:
                    files = data_downloader.download_bulk(targets, tmp_dir,
                                                          num_dl_parallel)
                    files = [os.path.join(tmp_dir, x)
                             for x in os.listdir(tmp_dir)
                             if x.startswith("CC-MAIN")]
                else:
                    files = targets

                fprefix = utils.random_string(20)

                _plog_s("hash_creator: start")
                hashes = hash_creator.create_hashes(files, num_cpus)
                _plog_s("hash_creator: done")

                _plog_s("lang_separator: start")
                lang_separator.do(
                    files, hashes, tmp_dir, fprefix, langs, bin_dir, num_cpus)
                _plog_s("lang_separator: done")

                _plog_s("hash_creaning: start")
                del hashes
                gc.collect()
                _plog_s("hash_creaning: done")

                _plog_s("lm_scoring: start")
                score_outpath = os.path.join(
                    tmp_dir, "lmscore_{}_{}.txt".format(node_id, shard_id))
                langstat_outpath = os.path.join(
                    tmp_dir, "langstat_{}_{}.txt".format(node_id, shard_id))
                lm_scoring.do(
                    fprefix, score_outpath, langstat_outpath, bin_dir,
                    tmp_dir, num_cpus)
                _plog_s("lm_scoring: done")
            except Exception as e:
                _plog_s("Error: {}".format(str(e)))
            finally:
                _mvall(".txt", tmp_dir, out_dir)
                _rmall(tmp_dir)
            
    _plog("TASK DONE", node_id, ss, st)


if __name__ == "__main__":
    main(*sys.argv[1:])

最終出力

最終出力はoutディレクトリに保存されますが、以下の2つが保存されます。

  • shardごとの、langstat。
  • shardごとに、各URLの言語検出モデルのスコアの平均と言語モデルスコアの平均を付与したもの。

実行にかかった時間

WETデータ数は56000ほどありますが、今回は1/50で実行し、ノード数も少なめにして実行しました。

  • 実行にかかった時間: 約2時間20分
  • ノード数: 7
  • 1ノードあたりのCPU数: 16
  • CPU数の合計: 112
  • WETデータ数: 1120
  • 1shardあたりのWET数: 50
  • 1ノードあたりのshard数: 4 (順次実行)

実行ログ

実行ログを見るといくつかエラーがありますが、発生しても問題のない類のエラーです。

エラーの種類:

  • 事前準備フェーズでのエラー。念の為、tmp内をクリアしたり、s3バケットをアンマウントしているが、問題がないのでエラーが発生。
  • p.closeという関数が存在しないエラー。timer_outで発生する問題で、発生するタイミングはタスクが完全に完了したあとなので問題なし。

[実行ログ]

rm: cannot remove '/tmp/tmp*': No such file or directory
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
rm: cannot remove '/tmp/tmp*': No such file or directory
rm: cannot remove '/tmp/tmp*': No such file or directory
rm: cannot remove '/tmp/tmp*': No such file or directory
rm: cannot remove '/tmp/tmp*': No such file or directory
rm: cannot remove '/tmp/tmp*': No such file or directory
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
rm: cannot remove '/tmp/tmp*': No such file or directory
umount: ./cc_data: not mounted.
umount: ./langstat: not mounted.
2020-01-13 23:40:45.542509      0:00:00.000028  nid:0   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:45.619038      0:00:00.076558  nid:0   sid:0   msg:start shard
2020-01-13 23:40:45.619420      0:00:00.076934  nid:0   sid:0   msg:hash_creator: start
2020-01-13 23:40:45.878058      0:00:00.000022  nid:2   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:45.947057      0:00:00.069020  nid:2   sid:0   msg:start shard
2020-01-13 23:40:45.947387      0:00:00.069347  nid:2   sid:0   msg:hash_creator: start
2020-01-13 23:40:46.020819      0:00:00.000022  nid:1   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:46.092201      0:00:00.071406  nid:1   sid:0   msg:start shard
2020-01-13 23:40:46.092595      0:00:00.071795  nid:1   sid:0   msg:hash_creator: start
2020-01-13 23:40:46.533779      0:00:00.000022  nid:6   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:46.612496      0:00:00.078737  nid:6   sid:0   msg:start shard
2020-01-13 23:40:46.612823      0:00:00.079062  nid:6   sid:0   msg:hash_creator: start
2020-01-13 23:40:46.619376      0:00:00.000024  nid:3   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:46.666887      0:00:00.047534  nid:3   sid:0   msg:start shard
2020-01-13 23:40:46.667205      0:00:00.047849  nid:3   sid:0   msg:hash_creator: start
2020-01-13 23:40:46.720374      0:00:00.000022  nid:4   sid:0,1,2,3     msg:TASK START
2020-01-13 23:40:46.826267      0:00:00.000028  nid:5   sid:0,1,2,3     msg:TASK START

[]

2020-01-14 02:00:31.272302      2:19:45.251509  nid:1   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
srun: error: ip-10-0-6-150: task 0: Exited with exit code 1
2020-01-14 02:00:45.662089      2:19:59.128344  nid:6   sid:3   msg:lm_scoring: done
2020-01-14 02:00:46.985021      2:20:00.451269  nid:6   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
2020-01-14 02:01:11.392680      2:20:24.566451  nid:5   sid:3   msg:lm_scoring: done
2020-01-14 02:01:12.704224      2:20:25.877987  nid:5   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
srun: error: ip-10-0-11-247: task 0: Exited with exit code 1
2020-01-14 02:02:25.102237      2:21:39.224214  nid:2   sid:3   msg:lm_scoring: done
2020-01-14 02:02:26.403545      2:21:40.525515  nid:2   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
2020-01-14 02:02:28.611396      2:21:43.068930  nid:0   sid:3   msg:lm_scoring: done
2020-01-14 02:02:29.928488      2:21:44.386009  nid:0   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
srun: error: ip-10-0-0-137: task 0: Exited with exit code 1
2020-01-14 02:03:32.917409      2:22:46.298071  nid:3   sid:3   msg:lm_scoring: done
2020-01-14 02:03:34.200838      2:22:47.581489  nid:3   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
srun: error: ip-10-0-9-75: task 0: Exited with exit code 1
2020-01-14 02:04:39.976661      2:23:53.256321  nid:4   sid:3   msg:lm_scoring: done
2020-01-14 02:04:41.536959      2:23:54.816611  nid:4   sid:0,1,2,3     msg:TASK DONE
Traceback (most recent call last):
  File "scripts/timer_out.py", line 20, in <module>
    p.close()
AttributeError: 'Process' object has no attribute 'close'
srun: error: ip-10-0-11-72: task 0: Exited with exit code 1