ナード戦隊データマン

機械学習と自然言語処理についてのブログ

加速度センサーだけで移動手段を予測する

加速度センサーによる移動手段予測とは、スマホの加速度センサーから収集したデータを利用し、車・電車・歩行などの移動手段を予測する方法です。

tmd_experiments/e2e_model at master · sugiyamath/tmd_experiments · GitHub

※ コードは上記プロジェクトから見れます。

概要

ある記事1で紹介されていた手法を参考にします。TMDデータセット2を今回も使用します。

加速度センサーは、x, y, zの情報を持っています。これらの情報(生データ)をサイズ512程度のWindowに分割し、このWindowをそのままCNNへ入力します。

コード

前処理モジュール (preprocessing.py)

import numpy as np


ACC = "android.sensor.accelerometer"
GYRO = "android.sensor.gyroscope"
SOUND = "sound"


def data_loader(infile, sensor_type):
    with open(infile) as f:
        data = []
        for line in f:
            line = line.strip()
            line = line.split(",")
            if line[1] == sensor_type:
                try:
                    line[0] = int(line[0])
                    if line[0] < 0:
                        line[0] *= -1
                    line[2:] = list(map(float, line[2:]))
                    data.append(line)
                except ValueError:
                    print(infile)
    return data


def window_segmentation(data, window_length=512):
    out = []
    row = []
    for d in data:
        if len(row) < window_length:
            x = list(map(float, d[2:]))
            row.append(x)
        elif len(row) == window_length:
            out.append(row)
            row = []
            x = list(map(float, d[2:]))
            row.append(x)
        else:
            raise Exception("somehting wrong")
    if len(row) == window_length:
        out.append(row)
    out = np.array(out)
    print(out.shape)
    return out

データ生成器 (data_generator.py)

root_pathをTMDデータセットのパスにしてください。

import os
import numpy as np
from tqdm import tqdm
from preprocessing import data_loader, window_segmentation
from preprocessing import ACC

root_path = "/root/work/tmd/raw_data/"

DATA = [
    "U1", "U2", "U3", "U4", "U5", "U6", "U7", "U8", "U9", "U10", "U11", "U13",
    "U14", "U16", "U12", "U15"
]

label2id = {"Bus": 0, "Car": 1, "Still": 2, "Train": 3, "Walking": 4}


def data_extraction(path):
    acc = data_loader(path, sensor_type=ACC)
    acc = window_segmentation(acc)
    return acc, acc.shape[0]


def logging(path, e):
    with open("training.log", "a") as f:
        f.write("file:{}, error: {}".format(path, repr(e)))
        f.write("\n")


def define_labels(filename, window_length, label2id):
    labelid = label2id[filename.split("_")[2]]
    labels = [labelid for _ in range(window_length)]
    return labels


def data_processing(X, y, paths):
    for path, filename in tqdm(paths):
        try:
            acc, window_length = data_extraction(path)
        except Exception as e:
            logging(path, e)
            continue
        try:
            labels = define_labels(filename, window_length, label2id)
            acc = acc.tolist()
            assert len(acc) == len(labels)
        except Exception as e:
            logging(path, e)
            continue
        X += acc
        y += labels
    return X, y


def data_create(users, test=False):
    ds = [
        os.path.join(root_path, d) for d in os.listdir(root_path) if d in users
    ]
    X = []
    y = []
    for d in tqdm(ds):
        paths = [(os.path.join(d, path), path) for path in os.listdir(d)
                 if ".csv" in path]
        X, y = data_processing(X, y, paths)
    X = np.array(X)
    y = np.array(y)
    print(X.shape)
    print(y.shape)
    return X, y

訓練・テスト(train.py)

CNNの構造はこのコードを見てください。

import pandas as pd
import pickle
import numpy as np
from keras.models import Sequential, load_model
from keras.layers import SeparableConv1D, MaxPooling1D, Flatten
from keras.layers import Dropout, Dense
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from data_generator import data_create, DATA


def build_model(nlabels=5):
    print("prepareing model")
    model = Sequential([
        SeparableConv1D(48, 4, 1, input_shape=(512, 3)),
        SeparableConv1D(48, 4, 1),
        MaxPooling1D(2),
        SeparableConv1D(64, 4, 1),
        SeparableConv1D(64, 4, 1),
        MaxPooling1D(4),
        SeparableConv1D(80, 4, 1),
        SeparableConv1D(80, 4, 1),
        MaxPooling1D(4),
        Flatten(),
        Dense(nlabels, activation="softmax")
    ])

    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer="nadam",
                  metrics=["accuracy"])

    return model


def debug_do(model):
    print("debug do")
    X_train, y_train = data_create("U1")
    model.fit(X_train, y_train)


def load_data(load=False):
    print("load data")
    if load:
        with open("training.pkl", "rb") as f:
            X, y = pickle.load(f)
    else:
        X, y = data_create(DATA)
        with open("training.pkl", "wb") as f:
            pickle.dump((X, y), f)
    return X, y


def fixing_data(X, y):
    tmp_X = []
    tmp_y = []
    min_indices = X.shape[0]
    for label in [0, 1, 2, 3, 4]:
        indices = np.where(y == label)[0]
        if indices.shape[0] < min_indices:
            min_indices = indices.shape[0]
        tmp_X.append(X[indices])
        tmp_y.append(y[indices])
    for i, (x_data, y_data) in enumerate(zip(tmp_X, tmp_y)):
        tmp_X[i] = x_data[:min_indices]
        tmp_y[i] = y_data[:min_indices]
    return np.concatenate(tmp_X), np.concatenate(tmp_y)


if __name__ == "__main__":
    debug = False
    load = False
    model = build_model()
    callbacks = [
        ModelCheckpoint("./model_best.h5",
                        monitor="val_loss",
                        save_best_only=True,
                        mode="min")
    ]
    if debug:
        debug_do(model)
    else:
        X, y = load_data(load)
        X, y = fixing_data(X, y)
        X_train, X_test, y_train, y_test = train_test_split(X,
                                                            y,
                                                            train_size=0.7)
        X_valid, X_test, y_valid, y_test = train_test_split(X_test,
                                                            y_test,
                                                            train_size=0.5)
        model.fit(X_train,
                  y_train,
                  validation_data=(X_valid, y_valid),
                  epochs=200,
                  batch_size=1000,
                  callbacks=callbacks)
        model = load_model("./model_best.h5")
        y_pred = model.predict_classes(X_test)
        print(classification_report(y_test, y_pred))

※ holdout分割を使っているので、精度が高めに出てしまいます。

精度

              precision    recall  f1-score   support

           0       0.82      0.93      0.87        29
           1       0.87      0.73      0.80        45
           2       0.76      0.88      0.81        40
           3       0.79      0.69      0.74        39
           4       0.93      0.98      0.95        43

    accuracy                           0.84       196
   macro avg       0.83      0.84      0.83       196
weighted avg       0.84      0.84      0.83       196

考察

加速度センサーのみで分類できるということは、それぞれの移動手段において、加速度の傾向に有意な差が存在している可能性があります。

例えば、以下はTrainとCarの加速度をプロットしたものです。

f:id:mathgeekjp:20190719092355p:plain
電車の加速度

f:id:mathgeekjp:20190719092524p:plain
車の加速度

要は、加速度センサーの生データだけを使って分類しても、ある程度はいけるみたいです。おそらく、ジャイロスコープなどの特徴量を追加すればより精度は上がります。また、CNNのネットワークを変えれば精度が上る可能性はあります。

このようにして、加速度センサーだけで移動手段を予測できることがわかりました。

参考