spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Casey <gerardhughca...@gmail.com>
Subject Shortest path performance in Graphx with Spark
Date Tue, 10 Jan 2017 18:20:55 GMT
Hello everyone,

I am creating a graph from a `gz` compressed `json` file of `edge` and `vertices` type.

I have put the files in a dropbox folder [here][1]

I load and map these `json` records to create the `vertices` and `edge` types required by
`graphx` like this:

    val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
    val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
    val verticesRDD: RDD[(VertexId, Long)] = vertices
    val edges_raw = sqlContext.read.json("path/edges.json.gz")
    val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong,
row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
    val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
    
I then use this `dijkstra` implementation I found to compute a shortest path between two vertices:
    
    def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
              var g2 = g.mapVertices(
            (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
              )
              for (i <- 1L to g.vertices.count - 1) {
                val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
                  .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
                    (a, b) => if (a._2._2 < b._2._2) a else b)
                  ._1
    
                val newDistances: VertexRDD[(Double, List[VertexId])] =
                  g2.aggregateMessages[(Double, List[VertexId])](
                ctx => if (ctx.srcId == currentVertexId) {
                  ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
                },
                (a, b) => if (a._1 < b._1) a else b
              )
            g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
              val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
              (
                vd._1 || vid == currentVertexId,
                math.min(vd._2, newSumVal._1),
                if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
                )
            })
            }
    
              g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
            (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
              .productIterator.toList.tail
              ))
            }

I take two random vertex id's:

    val v1 = 4000000028222916L
    val v2 = 4000000031019012L
    
and compute the path between them:
    
    val results = dijkstra(my_graph, v1).vertices.map(_._2).collect

I am unable to compute this locally on my laptop without getting a stackoverflow error. I
have 8GB RAM and 2.6 GHz Intel Core i5 processor. I can see that it is using 3 out of 4 cores
available. I can load this graph and compute shortest on average around 10 paths per second
with the `igraph` library in Python on exactly the same graph. Is this an inefficient means
of computing paths? At scale, on multiple nodes the paths will compute (no stackoverflow error)
but it is still 30/40seconds per path computation. I must be missing something. 

Thanks 

  [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_ck_ykB8KXPXa?dl=0
Mime
View raw message