ナード戦隊データマン

データサイエンスを用いて悪と戦うぞ

csvに出力されたグラフをTitanに読み込んでPageRankを求める

Titanとは、分散型のグラフデータベースです。グラフ操作フレームワークのtinkerpopと組み合わせて使うことができます。ここでは、csvに出力されたグラフ(ノードとエッジ)をTitanに読み込んでからpagerankを求めます。

注意: Titanは開発が放置状態!

2018/05/19現在、Titanのgithubページを見てみると、開発が3年ほど停止していました。そのため、JanusGraphというForkバージョン(The Linux Foundationが開発)を使うことをおすすめします。ただし、ここではあえてTitanを使って説明します。(titanのほうが検索されやすいので)

ダウンロードと展開

wget http://s3.thinkaurelius.com/downloads/titan/titan-1.0.0-hadoop1.zip
unzip titan-1.0.0-hadoop1.zip
cd titan-1.0.0-hadoop1

gremlinについて

gremlinとは、グラフを操作するための専用の言語のことです。groovyによって操作が可能ですが、gremlin-serverを立ち上げればpythonからアクセスできます。gremlin-serverを立ち上げるには、以下を実行します:

bin/gremlin-server.sh

gremlinpythonを使えば、pythonからgremlin-serverへアクセスできます:

pip install --user gremlinpython==3.0.1

groovyスクリプトを実行したい場合は、以下のようなコマンドを実行します:

bin/gremlin.sh -e targetscript.groovy

pythonからアクセスするには、以下のようなコードを実行します:

from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
g = Graph().traversal().withRemote(DriverRemoteConnection('ws://localhost:8182/gremlin','g'))
print(g.V().count().next())

今回は、PageRankを求めるだけなので、gremlinについて知りたい人は以下の記事などを見てください. https://qiita.com/wan-liner/items/737741805f4c5cfb0cb9

csvをTitanに読み込むgroovyスクリプトの作成

ノードとエッジがcsvに出力されていると仮定します。ノードリストはノードのidを改行で区切ったもの、エッジは "fromノードid,toノードid"を改行で区切ったものとします。例えば、クロールしたデータからurlとoutlinksを抽出してエッジとして出力します。

import com.thinkaurelius.titan.core.TitanFactory
import com.thinkaurelius.titan.core.Cardinality
import com.thinkaurelius.titan.core.Multiplicity
import com.thinkaurelius.titan.core.schema.ConsistencyModifier
import com.thinkaurelius.titan.core.TitanTransaction
import org.apache.tinkerpop.gremlin.structure.T
import java.util.logging.Logger

Logger logger = Logger.getLogger("")

graph = TitanFactory.open('/home/shun/work/titan-1.0.0-hadoop1/conf/titan-berkeleyje-es.properties')
m = graph.openManagement()
k = m.makePropertyKey('urlId').dataType(String.class).cardinality(Cardinality.SINGLE).make()
m.buildIndex('byid', Vertex.class).addKey(k).buildCompositeIndex()
m.makeEdgeLabel('linkTo').multiplicity(Multiplicity.MULTI).make()
m.commit()

Logger logger = Logger.getLogger("")

graph = TitanFactory.open('/home/shun/work/titan-1.0.0-hadoop1/conf/titan-berkeleyje-es.properties')
g = graph.traversal()
tx = graph.newTransaction() 


counter = 0
added = false
Vertex getVertexById(String id){
    p = g.V().has('urlId', id)
    if(p.hasNext()){
    v = p.next()
    } else {
        v = tx.addVertex(T.label, 'website', 'urlId', id)
        added = true
    }
    return v
}

logger.info("Step1")
new File('/home/shun/work/titan-1.0.0-hadoop1/mydata/data/nodes.csv').eachLine {
    theid = String.valueOf(it)
    v = getVertexById(theid)
}

if(added){
  tx.commit()
}


counter = 0
e_added = false
logger.info("Step2")
new File('/home/shun/work/titan-1.0.0-hadoop1/mydata/data/edges.csv').eachLine {
    if(!it.startsWith("from")) {
      (fid, tid) = it.split(',')
      fid = String.valueOf(fid)
      tid = String.valueOf(tid)

      fromVertex = getVertexById(fid)
      toVertex = getVertexById(tid)

      if(!g.V(fromVertex).outE('linkTo').filter(inV().is(toVertex)).hasNext()){
         fromVertex.addEdge('linkTo', toVertex)
         e_added = true
      }
   }
}

if(e_added){
  graph.tx().commit()
}

logger.info("Step3")
prvp = PageRankVertexProgram.build().create()
result = graph.compute().program(prvp).submit().get()

graph.io(IoCore.graphson()).writeGraph("/home/shun/work/titan-1.0.0-hadoop1/mydata/data/graph_with_pagerank.json")

logger.info("DONE")

フローは以下です:

  1. グラフの読み込み。
  2. プロパティキー、エッジ名、インデクスを追加。
  3. ノードの読み込み
  4. エッジの読み込み。
  5. ページランクを求める。
  6. ページランクを求めたグラフをjson出力。

SparkGraphComputerも使える

ページランクをSparkGraphComputerを使って求めるには、graph.computeで指定します。

graph = GraphFactory.open('/usr/iop/current/titan-client/conf/hadoop-graph/hadoop-gryo.properties')
prvp = PageRankVertexProgram.build().create()
result = graph.compute(SparkGraphComputer).program(prvp).submit().get()
result.memory().getRuntime()
result.memory().asMap()
g = result.graph().traversal(computer(SparkGraphComputer))
g.V().valueMap('name', PageRankVertexProgram.PAGE_RANK)

参考

[1] JanusGraph : http://janusgraph.org/ [2] Titan spark graph computing in IBM Open Platform : https://developer.ibm.com/hadoop/2017/05/21/titan-spark-graph-computing-ibm-open-platform/ [3] TITAN Distributed Graph Database : http://titan.thinkaurelius.com/

P.S. groovyなんて初めてですが、とりあえず実行はできました。