spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ankur Dave (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-1931) Graph.partitionBy does not reconstruct routing tables
Date Tue, 27 May 2014 21:52:02 GMT

    [ https://issues.apache.org/jira/browse/SPARK-1931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14010340#comment-14010340
] 

Ankur Dave edited comment on SPARK-1931 at 5/27/14 9:50 PM:
------------------------------------------------------------

Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the edges before
constructing the graph, as follows:

{code}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]]
= {
  val numPartitions = edges.partitions.size
  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
    .partitionBy(new HashPartitioner(numPartitions))
    .mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val g = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))

val gPart = Graph(
  g.vertices,
  partitionBy(g.edges, PartitionStrategy.EdgePartition2D))
assert(gPart.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
{code}


was (Author: ankurd):
Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the edges before
constructing the graph, as follows:

{code}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]]
= {
  val numPartitions = edges.partitions.size
  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
    .partitionBy(new HashPartitioner(numPartitions))
    .mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val g = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))

val gPart = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2), PartitionStrategy.EdgePartition2D))
assert(gPart.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
{code}

> Graph.partitionBy does not reconstruct routing tables
> -----------------------------------------------------
>
>                 Key: SPARK-1931
>                 URL: https://issues.apache.org/jira/browse/SPARK-1931
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.0.0
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>             Fix For: 1.0.1
>
>
> Commit 905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in partitionBy where,
after repartitioning the edges, it reuses the VertexRDD without updating the routing tables
to reflect the new edge layout. This causes the following test to fail:
> {code}
>       import org.apache.spark.graphx._
>       val g = Graph(
>         sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
>         sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
>       assert(g.triplets.collect.map(_.toTuple).toSet ==
>         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
>       val gPart = g.partitionBy(PartitionStrategy.EdgePartition2D)
>       assert(gPart.triplets.collect.map(_.toTuple).toSet ==
>         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message