spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Joseph K. Bradley (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15389) DataFrame filter by isNotNull fails in complex, large case
Date Wed, 18 May 2016 19:22:13 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Joseph K. Bradley updated SPARK-15389:
--------------------------------------
    Attachment: final.spark1.6.1.hadoop2.bug

> DataFrame filter by isNotNull fails in complex, large case
> ----------------------------------------------------------
>
>                 Key: SPARK-15389
>                 URL: https://issues.apache.org/jira/browse/SPARK-15389
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Joseph K. Bradley
>         Attachments: final.spark1.6.1.hadoop2.bug
>
>
> h2. Problem description
> I have the following code (part of GraphFrames prototyping) which does the following:
> * Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
> * Add index to vertices, from 0
> * Do several joins with edges to change the original src,dst in edges to use the new
vertex indices
> * The complex thing is that this handles high-degree vertices (HDV) separately from low-degree
vertices.  It uses a broadcast join for the small number of HDVs, and a default one for the
others.
> The bug which appears is:
> * At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many
null values.
> * I filter the DataFrame to select rows where that column is not null.
> * The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
> The strange thing is that this works on a graph with a small number of vertices &
edges (100 - 10000), but it fails on a large graph (100 million).
> I'm listing 3 sections below:
> * data generation
> * code which does the joins
> * output which shows incorrect values
> h2. Data generation
> Below, look at the "Create a star" section in the code.  The setting below should work.
 This setting makes the code in the next section fail:
> * numPartitions = 512
> * numStars = 10
> * starSize = 10000000
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
>   require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
>   val newEdges = edges.select((col("src") + i).as("src"),
>     (col("dst") + i).as("dst"))
>   newEdges
> }
> def star(n: Int, numPartitions: Int): DataFrame = {
>   val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"),
lit(0L).as("dst"))
>   edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> case class Edge(src: Int, dst: Int)
> /** n: side of grid */
> def grid(n: Int, numPartitions: Int): DataFrame = {
>   val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
>     Seq(
>       Edge(i, i - n),
>       Edge(i, i - 1),
>       Edge(i, i + 1),
>       Edge(i, i + n))
>   }.toDF
>   edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
>   val edges0 = Range(0, numStars).map { i =>
>     val edges = star(starSize, numPartitions)
>     shiftVertexIds(edges, starSize * i)
>   }.reduce((e0,e1) => e0.unionAll(e1))
>   edges0.repartition(numPartitions)
> }
> def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
>   val srcs = e.select(e("src").as("id"))
>   val dsts = e.select(e("dst").as("id"))
>   val v = srcs.unionAll(dsts).distinct
>   v.repartition(numPartitions)
> }
> val dataPath = "/tmp/joseph/graphs"
> // Create a star
> val numPartitions = 16
> val numStars = 3
> val starSize = 10
> val edges = chainOfStars(numStars, starSize, numPartitions)
> val vertices = verticesFromEdges(edges, numPartitions)
> val path = dataPath + s"/star-$numStars-$starSize"
> dbutils.fs.rm(path, recurse=true)
> dbutils.fs.mkdirs(path)
> edges.write.format("parquet").save(path + "/edges")
> vertices.write.format("parquet").save(path + "/vertices")
> {code}
> h2. Code which does the joins
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.storage.StorageLevel
> val DST = "dst"
> val ID = "id"
> val SRC = "src"
> val LONG_ID = "long_id"
> val LONG_SRC = "long_src"
> val LONG_DST = "long_dst"
> val ORIG_ID = "ORIG_ID"
> val COMPONENT_ID = "component"
> import scala.collection.mutable
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
>   val sqlContext = df.sqlContext
>   val schema = df.schema
>   val outputSchema = StructType(Seq(
>     StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType,
false)))
>   val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
>   sqlContext.createDataFrame(rdd, outputSchema)
> }
> def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
>   def summary(name: String, df: DataFrame): Unit = {
>     //return
>     println(name)
>     df.printSchema()
>     println(s"${name}.count: ${df.count()}")
>     df.show(20)
>   }
>   val edges = edges0.select(SRC, DST)
>   summary("idxV", idxV)
>   summary("edges", edges)
>   // Separately index high-degree vertices and other vertices.
>   val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
>   val LARGE_DEGREE = 2000000
>   val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
>   summary("highDegreeVertices", highDegreeVertices)
>   val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
>   summary("idxHDV", idxHDV)
>   // Indexed low-degree vertices 
>   val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
>     .where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
>   summary("idxLDV", idxLDV)
>   def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
>     // Use broadcast join to index high-degree vertices and filter HDVs from edges
>     val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
>     summary("tmpIdxHDV", tmpIdxHDV)
>     val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer")
// SIDEx2, LONG_SIDE, OTHER
>     summary("edgesWithHDVmark", edgesWithHDVmark)
>     val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
>       .select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
>     summary("idxHDVEdges", idxHDVEdges)
>     val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
>       .select(edges(SIDE), edges(OTHER))
>     summary("ldvEdges", ldvEdges)
>     // Use hash join to index low-degree vertices
>     val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
>     summary("tmpIdxLDV", tmpIdxLDV)
>     val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
>     summary("idxLDVEdges", idxLDVEdges)
>     // Join ldv, hdv edges
>     val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
>     summary("indexedSideEdges", indexedSideEdges)
>     // Sanity check; remove later
>     // assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
>     // println("yay not null")
>     indexedSideEdges
>   }
>   println("COMPUTING indexedSrcEdges")
>   val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
>   println("COMPUTING indexedDstEdges")
>   val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
>   val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
>     (indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST)
=== indexedDstEdges(DST)),
>     "inner")
>     .select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
>   summary("indexedEdges", indexedEdges)
>   indexedEdges
> }
> def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame,
DataFrame) = {
>   assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue},
but this was given $numVertices vertices.")
>   val indexedV0: DataFrame = {
>     val indexedVertices = zipWithUniqueIdFrom0(vertices)
>     indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
>   }
>   // remove self-edges, and index edges
>   val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
>   val indexedVertices: DataFrame = indexedV0
>     .select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
>   val indexedEdges: DataFrame = indexedE0
>     .select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
>   (indexedVertices, indexedEdges)
> }
> val internalNumPartitions = 32
> def split(array: Array[Int], blockSize: Int = 1 << 24): Iterator[(Int, Array[Int])]
= {
>   val m = math.ceil(array.length.toDouble / blockSize).toInt
>   Iterator.tabulate(m) { j =>
>     (j, array.slice(j * blockSize, math.min((j + 1) * blockSize, array.length)))
>   }
> }
> def assemble(arrays: Array[(Int, Array[Int])]): Array[Int] = {
>   val builder = mutable.ArrayBuilder.make[Int]
>   arrays.sortBy(_._1).foldLeft(builder) { case (b, (_, array)) =>
>     builder ++= array
>   }
>   builder.result()
> }
> def sum(array: Array[Int]): Long = {
>   var s = 0L
>   var i = 0
>   val n = array.length
>   while (i < n) {
>     s += array(i)
>     i += 1
>   }
>   s
> }
> /**
>  * @param edges  (source vertices, destination vertices), in matching arrays
>  * @param numNodes  Number of vertices in graph
>  * @return  single-row RDD with Array of length numNodes containing component assignments
>  */
> def connectedComponents(edges: RDD[(Array[Int], Array[Int])], numNodes: Int): RDD[Array[Int]]
= {
>   var clusters = edges.sparkContext.parallelize(1 to 1, 1).map { i =>
>     Array.tabulate(numNodes)(i => i)
>   }
>   var prevSum = numNodes.toLong * (numNodes + 1) / 2
>   var converged = false
>   var iter = 0
>   while (!converged) {
>     println(s"connectedComponents: iteration $iter")
>     clusters = update(edges, clusters).cache()
>     val s = clusters.map(x => sum(x)).first()
>     if (s == prevSum) {
>       converged = true
>     }
>     prevSum = s
>     println(s"  prevSum = $prevSum")
>     iter += 1
>   }
>   clusters
> }
> def localUpdate(ss: Array[Int], dd: Array[Int], cc: Array[Int]): Unit = {
>   val maxIter = 5
>   var converged = false
>   var i = 0
>   while (!converged && i < maxIter) {
>     converged = subgraphUpdate(ss, dd, cc)
>     if (!converged) {
>       clusterUpdate(cc)
>     }
>     i += 1
>   }
> }
> def subgraphUpdate(ss: Array[Int], dd: Array[Int], cc: Array[Int]): Boolean = {
>   var i = 0
>   val n = ss.length
>   var converged = true
>   while (i < n) {
>     val s = ss(i)
>     val d = dd(i)
>     val cs = cc(s)
>     val cd = cc(d)
>     if (cs < cd) {
>       cc(d) = cs
>       converged = false
>     } else if (cs > cd) {
>       cc(s) = cd
>       converged = false
>     }
>     i += 1
>   }
>   converged
> }
> def clusterUpdate(cc: Array[Int]): Unit = {
>   var j = cc.length - 1
>   while (j >= 0) {
>     val cj = cc(j)
>     cc(j) = cc(cj)
>     j -= 1
>   }
> }
> def minArrays(a: Array[Int], b: Array[Int]): Array[Int] = {
>   val m = a.length
>   if (m == 0) {
>     b.clone()
>   } else {
>     var i = 0
>     while (i < m) {
>       if (a(i) > b(i)) a(i) = b(i)
>       i += 1
>     }
>     a
>   }
> }
> def update(edges: RDD[(Array[Int], Array[Int])], clusters: RDD[Array[Int]]): RDD[Array[Int]]
= {
>   edges.cartesian(clusters).flatMap { case ((ss, dd), cc) =>
>     localUpdate(ss, dd, cc)
>     split(cc)
>   }.aggregateByKey(Array.empty[Int])(minArrays, minArrays)
>     .map(x => (0, x)).groupByKey(1)
>     .values
>     .map { cc =>
>       assemble(cc.toArray)
>     }
> }
> def connectedComponentsNew(vertices: DataFrame, edges: DataFrame): DataFrame = {
>   val sqlContext = vertices.sqlContext
>   import sqlContext.implicits._
>   // Index vertices consecutively from 0.
>   val numOrigVertices = vertices.count()
>   val (origVertices0: DataFrame, edges0: DataFrame) =
>     getIndexedGraph(vertices, edges, numOrigVertices)
>   val edgeRDD = edges0.select("src", "dst").coalesce(internalNumPartitions).as[(Int,
Int)].rdd
>   val edgeArrays = edgeRDD.mapPartitions { part =>
>     val srcArray = mutable.ArrayBuilder.make[Int]
>     val dstArray = mutable.ArrayBuilder.make[Int]
>     part.foreach { case (src, dst) =>
>       srcArray += src
>       dstArray += dst
>     }
>     Iterator.single(srcArray.result(), dstArray.result())
>   }.persist(StorageLevel.MEMORY_AND_DISK)
>   println(s"Beginning connected components with $numOrigVertices vertices.")
>   val components: RDD[Array[Int]] = connectedComponents(edgeArrays, numOrigVertices.toInt)
>   println(s"Finished connected components.")
>   val componentsBc = sqlContext.sparkContext.broadcast(components.first())
>   edgeArrays.unpersist(blocking = false)
>   val getComponent = udf { (vId: Int) => componentsBc.value.apply(vId).toLong }
>   val origWithComponent =
>     origVertices0.select(getComponent(col("id")).as(COMPONENT_ID), col(ORIG_ID).as("id"))
>   vertices.join(origWithComponent, "id")
> }
> // Adjust the data path for whatever you are testing on
> val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
> val edges = sqlContext.read.parquet(dataPath + "/edges")
> val vertices = sqlContext.read.parquet(dataPath + "/vertices")
> (vertices.cache().count(), edges.cache().count())
> // This runs the code and prints out a bunch of stuff which shows the bug.
> val numOrigVertices = vertices.count()
> val (origVertices0: DataFrame, edges0: DataFrame) =
>   getIndexedGraph(vertices, edges, numOrigVertices)
> origVertices0.cache()
> edges0.cache()
> println(s"origVertices0.count: ${origVertices0.count()}")
> println(s"edges0.count: ${edges0.count()}")
> origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)),
min(col("component")), max(col("component"))).show()
> edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
> {code}
> h2. Output which shows incorrect values
> I'm attaching a text file with the output.  Search for "HERE IS THE BUG" and "ALSO SHOWS
BUG"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message