[ https://issues.apache.org/jira/browse/SPARK-15389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Joseph K. Bradley updated SPARK-15389:
--------------------------------------
Description:
h2. Problem description
I have the following code (part of GraphFrames prototyping) which does the following:
* Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
* Add index to vertices, from 0
* Do several joins with edges to change the original src,dst in edges to use the new vertex
indices
* The complex thing is that this handles high-degree vertices (HDV) separately from low-degree
vertices. It uses a broadcast join for the small number of HDVs, and a default one for the
others.
The bug which appears is:
* At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many null
values.
* I filter the DataFrame to select rows where that column is not null.
* The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
The strange thing is that this works on a graph with a small number of vertices & edges
(100 - 10000), but it fails on a large graph (100 million).
I'm listing 3 sections below:
* data generation
* code which does the joins
* output which shows incorrect values
h2. Data generation
Below, look at the "Create a star" section in the code. The setting below should work. This
setting makes the code in the next section fail:
* numPartitions = 512
* numStars = 10
* starSize = 10000000
{code}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
val newEdges = edges.select((col("src") + i).as("src"),
(col("dst") + i).as("dst"))
newEdges
}
def star(n: Int, numPartitions: Int): DataFrame = {
val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"),
lit(0L).as("dst"))
edges.where(col("src").isNotNull && col("dst").isNotNull)
}
case class Edge(src: Int, dst: Int)
/** n: side of grid */
def grid(n: Int, numPartitions: Int): DataFrame = {
val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
Seq(
Edge(i, i - n),
Edge(i, i - 1),
Edge(i, i + 1),
Edge(i, i + n))
}.toDF
edges.where(col("src").isNotNull && col("dst").isNotNull)
}
def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
val edges0 = Range(0, numStars).map { i =>
val edges = star(starSize, numPartitions)
shiftVertexIds(edges, starSize * i)
}.reduce((e0,e1) => e0.unionAll(e1))
edges0.repartition(numPartitions)
}
def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
val srcs = e.select(e("src").as("id"))
val dsts = e.select(e("dst").as("id"))
val v = srcs.unionAll(dsts).distinct
v.repartition(numPartitions)
}
val dataPath = "/tmp/joseph/graphs"
// Create a star
val numPartitions = 16
val numStars = 3
val starSize = 10
val edges = chainOfStars(numStars, starSize, numPartitions)
val vertices = verticesFromEdges(edges, numPartitions)
val path = dataPath + s"/star-$numStars-$starSize"
dbutils.fs.rm(path, recurse=true)
dbutils.fs.mkdirs(path)
edges.write.format("parquet").save(path + "/edges")
vertices.write.format("parquet").save(path + "/vertices")
{code}
h2. Code which does the joins
{code}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
val DST = "dst"
val ID = "id"
val SRC = "src"
val LONG_ID = "long_id"
val LONG_SRC = "long_src"
val LONG_DST = "long_dst"
val ORIG_ID = "ORIG_ID"
val COMPONENT_ID = "component"
import scala.collection.mutable
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
val sqlContext = df.sqlContext
val schema = df.schema
val outputSchema = StructType(Seq(
StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType, false)))
val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
sqlContext.createDataFrame(rdd, outputSchema)
}
def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
def summary(name: String, df: DataFrame): Unit = {
//return
println(name)
df.printSchema()
println(s"${name}.count: ${df.count()}")
df.show(20)
}
val edges = edges0.select(SRC, DST)
summary("idxV", idxV)
summary("edges", edges)
// Separately index high-degree vertices and other vertices.
val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
val LARGE_DEGREE = 2000000
val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
summary("highDegreeVertices", highDegreeVertices)
val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
summary("idxHDV", idxHDV)
// Indexed low-degree vertices
val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
.where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
summary("idxLDV", idxLDV)
def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
// Use broadcast join to index high-degree vertices and filter HDVs from edges
val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
summary("tmpIdxHDV", tmpIdxHDV)
val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer")
// SIDEx2, LONG_SIDE, OTHER
summary("edgesWithHDVmark", edgesWithHDVmark)
val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
.select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
summary("idxHDVEdges", idxHDVEdges)
val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
.select(edges(SIDE), edges(OTHER))
summary("ldvEdges", ldvEdges)
// Use hash join to index low-degree vertices
val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
summary("tmpIdxLDV", tmpIdxLDV)
val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
summary("idxLDVEdges", idxLDVEdges)
// Join ldv, hdv edges
val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
summary("indexedSideEdges", indexedSideEdges)
// Sanity check; remove later
// assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
// println("yay not null")
indexedSideEdges
}
println("COMPUTING indexedSrcEdges")
val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
println("COMPUTING indexedDstEdges")
val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
(indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST) === indexedDstEdges(DST)),
"inner")
.select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
summary("indexedEdges", indexedEdges)
indexedEdges
}
def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame,
DataFrame) = {
assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue},
but this was given $numVertices vertices.")
val indexedV0: DataFrame = {
val indexedVertices = zipWithUniqueIdFrom0(vertices)
indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
}
// remove self-edges, and index edges
val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
val indexedVertices: DataFrame = indexedV0
.select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
val indexedEdges: DataFrame = indexedE0
.select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
(indexedVertices, indexedEdges)
}
val internalNumPartitions = 32
// Adjust the data path for whatever you are testing on
val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
val edges = sqlContext.read.parquet(dataPath + "/edges")
val vertices = sqlContext.read.parquet(dataPath + "/vertices")
(vertices.cache().count(), edges.cache().count())
// This runs the code and prints out a bunch of stuff which shows the bug.
val numOrigVertices = vertices.count()
val (origVertices0: DataFrame, edges0: DataFrame) =
getIndexedGraph(vertices, edges, numOrigVertices)
origVertices0.cache()
edges0.cache()
println(s"origVertices0.count: ${origVertices0.count()}")
println(s"edges0.count: ${edges0.count()}")
origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)),
min(col("component")), max(col("component"))).show()
edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
{code}
h2. Output which shows incorrect values
I'm attaching a text file with the output. Search for "HERE IS THE BUG" and "ALSO SHOWS BUG"
was:
h2. Problem description
I have the following code (part of GraphFrames prototyping) which does the following:
* Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
* Add index to vertices, from 0
* Do several joins with edges to change the original src,dst in edges to use the new vertex
indices
* The complex thing is that this handles high-degree vertices (HDV) separately from low-degree
vertices. It uses a broadcast join for the small number of HDVs, and a default one for the
others.
The bug which appears is:
* At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many null
values.
* I filter the DataFrame to select rows where that column is not null.
* The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
The strange thing is that this works on a graph with a small number of vertices & edges
(100 - 10000), but it fails on a large graph (100 million).
I'm listing 3 sections below:
* data generation
* code which does the joins
* output which shows incorrect values
h2. Data generation
Below, look at the "Create a star" section in the code. The setting below should work. This
setting makes the code in the next section fail:
* numPartitions = 512
* numStars = 10
* starSize = 10000000
{code}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
val newEdges = edges.select((col("src") + i).as("src"),
(col("dst") + i).as("dst"))
newEdges
}
def star(n: Int, numPartitions: Int): DataFrame = {
val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"),
lit(0L).as("dst"))
edges.where(col("src").isNotNull && col("dst").isNotNull)
}
case class Edge(src: Int, dst: Int)
/** n: side of grid */
def grid(n: Int, numPartitions: Int): DataFrame = {
val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
Seq(
Edge(i, i - n),
Edge(i, i - 1),
Edge(i, i + 1),
Edge(i, i + n))
}.toDF
edges.where(col("src").isNotNull && col("dst").isNotNull)
}
def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
val edges0 = Range(0, numStars).map { i =>
val edges = star(starSize, numPartitions)
shiftVertexIds(edges, starSize * i)
}.reduce((e0,e1) => e0.unionAll(e1))
edges0.repartition(numPartitions)
}
def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
val srcs = e.select(e("src").as("id"))
val dsts = e.select(e("dst").as("id"))
val v = srcs.unionAll(dsts).distinct
v.repartition(numPartitions)
}
val dataPath = "/tmp/joseph/graphs"
// Create a star
val numPartitions = 16
val numStars = 3
val starSize = 10
val edges = chainOfStars(numStars, starSize, numPartitions)
val vertices = verticesFromEdges(edges, numPartitions)
val path = dataPath + s"/star-$numStars-$starSize"
dbutils.fs.rm(path, recurse=true)
dbutils.fs.mkdirs(path)
edges.write.format("parquet").save(path + "/edges")
vertices.write.format("parquet").save(path + "/vertices")
{code}
h2. Code which does the joins
{code}
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
val DST = "dst"
val ID = "id"
val SRC = "src"
val LONG_ID = "long_id"
val LONG_SRC = "long_src"
val LONG_DST = "long_dst"
val ORIG_ID = "ORIG_ID"
val COMPONENT_ID = "component"
import scala.collection.mutable
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
val sqlContext = df.sqlContext
val schema = df.schema
val outputSchema = StructType(Seq(
StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType, false)))
val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
sqlContext.createDataFrame(rdd, outputSchema)
}
def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
def summary(name: String, df: DataFrame): Unit = {
//return
println(name)
df.printSchema()
println(s"${name}.count: ${df.count()}")
df.show(20)
}
val edges = edges0.select(SRC, DST)
summary("idxV", idxV)
summary("edges", edges)
// Separately index high-degree vertices and other vertices.
val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
val LARGE_DEGREE = 2000000
val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
summary("highDegreeVertices", highDegreeVertices)
val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
summary("idxHDV", idxHDV)
// Indexed low-degree vertices
val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
.where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
summary("idxLDV", idxLDV)
def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
// Use broadcast join to index high-degree vertices and filter HDVs from edges
val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
summary("tmpIdxHDV", tmpIdxHDV)
val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer")
// SIDEx2, LONG_SIDE, OTHER
summary("edgesWithHDVmark", edgesWithHDVmark)
val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
.select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
summary("idxHDVEdges", idxHDVEdges)
val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
.select(edges(SIDE), edges(OTHER))
summary("ldvEdges", ldvEdges)
// Use hash join to index low-degree vertices
val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
summary("tmpIdxLDV", tmpIdxLDV)
val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
summary("idxLDVEdges", idxLDVEdges)
// Join ldv, hdv edges
val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
summary("indexedSideEdges", indexedSideEdges)
// Sanity check; remove later
// assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
// println("yay not null")
indexedSideEdges
}
println("COMPUTING indexedSrcEdges")
val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
println("COMPUTING indexedDstEdges")
val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
(indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST) === indexedDstEdges(DST)),
"inner")
.select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
summary("indexedEdges", indexedEdges)
indexedEdges
}
def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame,
DataFrame) = {
assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue},
but this was given $numVertices vertices.")
val indexedV0: DataFrame = {
val indexedVertices = zipWithUniqueIdFrom0(vertices)
indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
}
// remove self-edges, and index edges
val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
val indexedVertices: DataFrame = indexedV0
.select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
val indexedEdges: DataFrame = indexedE0
.select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
(indexedVertices, indexedEdges)
}
val internalNumPartitions = 32
def split(array: Array[Int], blockSize: Int = 1 << 24): Iterator[(Int, Array[Int])]
= {
val m = math.ceil(array.length.toDouble / blockSize).toInt
Iterator.tabulate(m) { j =>
(j, array.slice(j * blockSize, math.min((j + 1) * blockSize, array.length)))
}
}
def assemble(arrays: Array[(Int, Array[Int])]): Array[Int] = {
val builder = mutable.ArrayBuilder.make[Int]
arrays.sortBy(_._1).foldLeft(builder) { case (b, (_, array)) =>
builder ++= array
}
builder.result()
}
def sum(array: Array[Int]): Long = {
var s = 0L
var i = 0
val n = array.length
while (i < n) {
s += array(i)
i += 1
}
s
}
/**
* @param edges (source vertices, destination vertices), in matching arrays
* @param numNodes Number of vertices in graph
* @return single-row RDD with Array of length numNodes containing component assignments
*/
def connectedComponents(edges: RDD[(Array[Int], Array[Int])], numNodes: Int): RDD[Array[Int]]
= {
var clusters = edges.sparkContext.parallelize(1 to 1, 1).map { i =>
Array.tabulate(numNodes)(i => i)
}
var prevSum = numNodes.toLong * (numNodes + 1) / 2
var converged = false
var iter = 0
while (!converged) {
println(s"connectedComponents: iteration $iter")
clusters = update(edges, clusters).cache()
val s = clusters.map(x => sum(x)).first()
if (s == prevSum) {
converged = true
}
prevSum = s
println(s" prevSum = $prevSum")
iter += 1
}
clusters
}
def localUpdate(ss: Array[Int], dd: Array[Int], cc: Array[Int]): Unit = {
val maxIter = 5
var converged = false
var i = 0
while (!converged && i < maxIter) {
converged = subgraphUpdate(ss, dd, cc)
if (!converged) {
clusterUpdate(cc)
}
i += 1
}
}
def subgraphUpdate(ss: Array[Int], dd: Array[Int], cc: Array[Int]): Boolean = {
var i = 0
val n = ss.length
var converged = true
while (i < n) {
val s = ss(i)
val d = dd(i)
val cs = cc(s)
val cd = cc(d)
if (cs < cd) {
cc(d) = cs
converged = false
} else if (cs > cd) {
cc(s) = cd
converged = false
}
i += 1
}
converged
}
def clusterUpdate(cc: Array[Int]): Unit = {
var j = cc.length - 1
while (j >= 0) {
val cj = cc(j)
cc(j) = cc(cj)
j -= 1
}
}
def minArrays(a: Array[Int], b: Array[Int]): Array[Int] = {
val m = a.length
if (m == 0) {
b.clone()
} else {
var i = 0
while (i < m) {
if (a(i) > b(i)) a(i) = b(i)
i += 1
}
a
}
}
def update(edges: RDD[(Array[Int], Array[Int])], clusters: RDD[Array[Int]]): RDD[Array[Int]]
= {
edges.cartesian(clusters).flatMap { case ((ss, dd), cc) =>
localUpdate(ss, dd, cc)
split(cc)
}.aggregateByKey(Array.empty[Int])(minArrays, minArrays)
.map(x => (0, x)).groupByKey(1)
.values
.map { cc =>
assemble(cc.toArray)
}
}
def connectedComponentsNew(vertices: DataFrame, edges: DataFrame): DataFrame = {
val sqlContext = vertices.sqlContext
import sqlContext.implicits._
// Index vertices consecutively from 0.
val numOrigVertices = vertices.count()
val (origVertices0: DataFrame, edges0: DataFrame) =
getIndexedGraph(vertices, edges, numOrigVertices)
val edgeRDD = edges0.select("src", "dst").coalesce(internalNumPartitions).as[(Int, Int)].rdd
val edgeArrays = edgeRDD.mapPartitions { part =>
val srcArray = mutable.ArrayBuilder.make[Int]
val dstArray = mutable.ArrayBuilder.make[Int]
part.foreach { case (src, dst) =>
srcArray += src
dstArray += dst
}
Iterator.single(srcArray.result(), dstArray.result())
}.persist(StorageLevel.MEMORY_AND_DISK)
println(s"Beginning connected components with $numOrigVertices vertices.")
val components: RDD[Array[Int]] = connectedComponents(edgeArrays, numOrigVertices.toInt)
println(s"Finished connected components.")
val componentsBc = sqlContext.sparkContext.broadcast(components.first())
edgeArrays.unpersist(blocking = false)
val getComponent = udf { (vId: Int) => componentsBc.value.apply(vId).toLong }
val origWithComponent =
origVertices0.select(getComponent(col("id")).as(COMPONENT_ID), col(ORIG_ID).as("id"))
vertices.join(origWithComponent, "id")
}
// Adjust the data path for whatever you are testing on
val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
val edges = sqlContext.read.parquet(dataPath + "/edges")
val vertices = sqlContext.read.parquet(dataPath + "/vertices")
(vertices.cache().count(), edges.cache().count())
// This runs the code and prints out a bunch of stuff which shows the bug.
val numOrigVertices = vertices.count()
val (origVertices0: DataFrame, edges0: DataFrame) =
getIndexedGraph(vertices, edges, numOrigVertices)
origVertices0.cache()
edges0.cache()
println(s"origVertices0.count: ${origVertices0.count()}")
println(s"edges0.count: ${edges0.count()}")
origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)),
min(col("component")), max(col("component"))).show()
edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
{code}
h2. Output which shows incorrect values
I'm attaching a text file with the output. Search for "HERE IS THE BUG" and "ALSO SHOWS BUG"
> DataFrame filter by isNotNull fails in complex, large case
> ----------------------------------------------------------
>
> Key: SPARK-15389
> URL: https://issues.apache.org/jira/browse/SPARK-15389
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Joseph K. Bradley
> Attachments: final.spark1.6.1.hadoop2.bug
>
>
> h2. Problem description
> I have the following code (part of GraphFrames prototyping) which does the following:
> * Given: DataFrames for vertices (column "id") and edges (columns "src" and "dst")
> * Add index to vertices, from 0
> * Do several joins with edges to change the original src,dst in edges to use the new
vertex indices
> * The complex thing is that this handles high-degree vertices (HDV) separately from low-degree
vertices. It uses a broadcast join for the small number of HDVs, and a default one for the
others.
> The bug which appears is:
> * At one point, I create a DataFrame "edgesWithHDVmark" which has a column with many
null values.
> * I filter the DataFrame to select rows where that column is not null.
> * The resulting DataFrame has the same number of rows; i.e., the filter does nothing.
> The strange thing is that this works on a graph with a small number of vertices &
edges (100 - 10000), but it fails on a large graph (100 million).
> I'm listing 3 sections below:
> * data generation
> * code which does the joins
> * output which shows incorrect values
> h2. Data generation
> Below, look at the "Create a star" section in the code. The setting below should work.
This setting makes the code in the next section fail:
> * numPartitions = 512
> * numStars = 10
> * starSize = 10000000
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> def shiftVertexIds(edges: DataFrame, i: Long): DataFrame = {
> require(Seq(DataTypes.LongType, DataTypes.IntegerType).contains(edges.schema("src").dataType))
> val newEdges = edges.select((col("src") + i).as("src"),
> (col("dst") + i).as("dst"))
> newEdges
> }
> def star(n: Int, numPartitions: Int): DataFrame = {
> val edges = sqlContext.range(start=1, end=n+1, step=1, numPartitions=numPartitions).toDF("src").select(col("src"),
lit(0L).as("dst"))
> edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> case class Edge(src: Int, dst: Int)
> /** n: side of grid */
> def grid(n: Int, numPartitions: Int): DataFrame = {
> val edges = sc.parallelize(n until n * n, numPartitions).flatMap { i =>
> Seq(
> Edge(i, i - n),
> Edge(i, i - 1),
> Edge(i, i + 1),
> Edge(i, i + n))
> }.toDF
> edges.where(col("src").isNotNull && col("dst").isNotNull)
> }
> def chainOfStars(numStars: Int, starSize: Int, numPartitions: Int): DataFrame = {
> val edges0 = Range(0, numStars).map { i =>
> val edges = star(starSize, numPartitions)
> shiftVertexIds(edges, starSize * i)
> }.reduce((e0,e1) => e0.unionAll(e1))
> edges0.repartition(numPartitions)
> }
> def verticesFromEdges(e: DataFrame, numPartitions: Int): DataFrame = {
> val srcs = e.select(e("src").as("id"))
> val dsts = e.select(e("dst").as("id"))
> val v = srcs.unionAll(dsts).distinct
> v.repartition(numPartitions)
> }
> val dataPath = "/tmp/joseph/graphs"
> // Create a star
> val numPartitions = 16
> val numStars = 3
> val starSize = 10
> val edges = chainOfStars(numStars, starSize, numPartitions)
> val vertices = verticesFromEdges(edges, numPartitions)
> val path = dataPath + s"/star-$numStars-$starSize"
> dbutils.fs.rm(path, recurse=true)
> dbutils.fs.mkdirs(path)
> edges.write.format("parquet").save(path + "/edges")
> vertices.write.format("parquet").save(path + "/vertices")
> {code}
> h2. Code which does the joins
> {code}
> import org.apache.spark.sql.{Column, DataFrame}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.types._
> import org.apache.spark.storage.StorageLevel
> val DST = "dst"
> val ID = "id"
> val SRC = "src"
> val LONG_ID = "long_id"
> val LONG_SRC = "long_src"
> val LONG_DST = "long_dst"
> val ORIG_ID = "ORIG_ID"
> val COMPONENT_ID = "component"
> import scala.collection.mutable
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
> val sqlContext = df.sqlContext
> val schema = df.schema
> val outputSchema = StructType(Seq(
> StructField("row", schema, false), StructField("uniq_id", DataTypes.IntegerType,
false)))
> val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
> sqlContext.createDataFrame(rdd, outputSchema)
> }
> def getIndexedEdges(idxV: DataFrame, edges0: DataFrame): DataFrame = {
> def summary(name: String, df: DataFrame): Unit = {
> //return
> println(name)
> df.printSchema()
> println(s"${name}.count: ${df.count()}")
> df.show(20)
> }
> val edges = edges0.select(SRC, DST)
> summary("idxV", idxV)
> summary("edges", edges)
> // Separately index high-degree vertices and other vertices.
> val degrees = edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree"))
> val LARGE_DEGREE = 2000000
> val highDegreeVertices = degrees.where(col("degree") > LARGE_DEGREE).select(ID)
> summary("highDegreeVertices", highDegreeVertices)
> val idxHDV = idxV.join(highDegreeVertices, ID) // columns LONG_ID, ID
> summary("idxHDV", idxHDV)
> // Indexed low-degree vertices
> val idxLDV = idxV.join(highDegreeVertices, idxV(ID) === highDegreeVertices(ID), "left_outer")
> .where(highDegreeVertices(ID).isNull).select(idxV(ID), idxV(LONG_ID))
> summary("idxLDV", idxLDV)
> def indexEdgeSide(LONG_SIDE: String, SIDE: String, OTHER: String): DataFrame = {
> // Use broadcast join to index high-degree vertices and filter HDVs from edges
> val tmpIdxHDV = broadcast(idxHDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE)))
> summary("tmpIdxHDV", tmpIdxHDV)
> val edgesWithHDVmark = edges.join(tmpIdxHDV, edges(SIDE) === tmpIdxHDV(SIDE), "left_outer")
// SIDEx2, LONG_SIDE, OTHER
> summary("edgesWithHDVmark", edgesWithHDVmark)
> val idxHDVEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNotNull)
> .select(edges(SIDE), edges(OTHER), col(LONG_SIDE))
> summary("idxHDVEdges", idxHDVEdges)
> val ldvEdges = edgesWithHDVmark.where(tmpIdxHDV(SIDE).isNull)
> .select(edges(SIDE), edges(OTHER))
> summary("ldvEdges", ldvEdges)
> // Use hash join to index low-degree vertices
> val tmpIdxLDV = idxLDV.select(col(LONG_ID).as(LONG_SIDE), col(ID).as(SIDE))
> summary("tmpIdxLDV", tmpIdxLDV)
> val idxLDVEdges = ldvEdges.join(tmpIdxLDV, ldvEdges(SIDE) === tmpIdxLDV(SIDE), "inner").drop(tmpIdxLDV(SIDE))
> summary("idxLDVEdges", idxLDVEdges)
> // Join ldv, hdv edges
> val indexedSideEdges = idxHDVEdges.unionAll(idxLDVEdges)
> summary("indexedSideEdges", indexedSideEdges)
> // Sanity check; remove later
> // assert(indexedSideEdges.where(col(LONG_SIDE).isNull).count() == 0)
> // println("yay not null")
> indexedSideEdges
> }
> println("COMPUTING indexedSrcEdges")
> val indexedSrcEdges = indexEdgeSide(LONG_SRC, SRC, DST).select(LONG_SRC, SRC, DST)
> println("COMPUTING indexedDstEdges")
> val indexedDstEdges = indexEdgeSide(LONG_DST, DST, SRC).select(LONG_DST, SRC, DST)
> val indexedEdges = indexedSrcEdges.join(indexedDstEdges,
> (indexedSrcEdges(SRC) === indexedDstEdges(SRC)) && (indexedSrcEdges(DST)
=== indexedDstEdges(DST)),
> "inner")
> .select(indexedSrcEdges(SRC), indexedSrcEdges(LONG_SRC), indexedSrcEdges(DST), indexedDstEdges(LONG_DST))
> summary("indexedEdges", indexedEdges)
> indexedEdges
> }
> def getIndexedGraph(vertices: DataFrame, edges: DataFrame, numVertices: Long): (DataFrame,
DataFrame) = {
> assert(numVertices < Int.MaxValue, s"This impl only works for numVertices < ${Int.MaxValue},
but this was given $numVertices vertices.")
> val indexedV0: DataFrame = {
> val indexedVertices = zipWithUniqueIdFrom0(vertices)
> indexedVertices.select(col("uniq_id").as(LONG_ID), col("row")(ID).as(ID))
> }
> // remove self-edges, and index edges
> val indexedE0: DataFrame = getIndexedEdges(indexedV0, edges.where(col(SRC) !== col(DST)))
> val indexedVertices: DataFrame = indexedV0
> .select(col(ID).as(ORIG_ID), col(LONG_ID).as(ID), col(LONG_ID).as(COMPONENT_ID))
> val indexedEdges: DataFrame = indexedE0
> .select(col(LONG_SRC).as(SRC), col(LONG_DST).as(DST))
> (indexedVertices, indexedEdges)
> }
> val internalNumPartitions = 32
> // Adjust the data path for whatever you are testing on
> val dataPath = "/tmp/joseph/graphs" + "/star-10-10000000"
> val edges = sqlContext.read.parquet(dataPath + "/edges")
> val vertices = sqlContext.read.parquet(dataPath + "/vertices")
> (vertices.cache().count(), edges.cache().count())
> // This runs the code and prints out a bunch of stuff which shows the bug.
> val numOrigVertices = vertices.count()
> val (origVertices0: DataFrame, edges0: DataFrame) =
> getIndexedGraph(vertices, edges, numOrigVertices)
> origVertices0.cache()
> edges0.cache()
> println(s"origVertices0.count: ${origVertices0.count()}")
> println(s"edges0.count: ${edges0.count()}")
> origVertices0.groupBy().agg(min(col("id")), max(col("id")), min(col(ORIG_ID)), max(col(ORIG_ID)),
min(col("component")), max(col("component"))).show()
> edges0.groupBy().agg(min(col("src")), max(col("src")), min(col("dst")), max(col("dst"))).show()
> {code}
> h2. Output which shows incorrect values
> I'm attaching a text file with the output. Search for "HERE IS THE BUG" and "ALSO SHOWS
BUG"
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org
|