spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jungtaek Lim (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
Date Fri, 15 Feb 2019 00:18:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768824#comment-16768824
] 

Jungtaek Lim commented on SPARK-26880:
--------------------------------------

The reason of failure is not because of `queryExecution.toRdd` but because of `queryExecution.toRdd.collect()`.
You can modify your code to also print out value before collect (via mapPartitions), and see
the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced in MapReduce
- which is OK if you iterate and process them, but it produces hard-to-track issues when we
try to store them into collection.

Given that it's due to reusing object, once you apply conversion they would produce new row
(object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's ideal to inform
or restrict taking this approach, but not sure which is the best way to do that.

> dataDF.queryExecution.toRdd corrupt rows
> ----------------------------------------
>
>                 Key: SPARK-26880
>                 URL: https://issues.apache.org/jira/browse/SPARK-26880
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Grant Henke
>            Priority: Major
>
> I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt.
Some rows are duplicated while other are missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
>     val conf = new SparkConf()
>       .setMaster("local[*]")
>       .setAppName("test")
>       .set("spark.ui.enabled", "false")
>     val ss = SparkSession.builder().config(conf).getOrCreate()
>     // Setup a DataFrame for testing.
>     val data = Seq(
>       Row.fromSeq(Seq(0, "0")),
>       Row.fromSeq(Seq(25, "25")),
>       Row.fromSeq(Seq(50, "50")),
>       Row.fromSeq(Seq(75, "75")),
>       Row.fromSeq(Seq(99, "99")),
>       Row.fromSeq(Seq(100, "100")),
>       Row.fromSeq(Seq(101, "101")),
>       Row.fromSeq(Seq(125, "125")),
>       Row.fromSeq(Seq(150, "150")),
>       Row.fromSeq(Seq(175, "175")),
>       Row.fromSeq(Seq(199, "199"))
>     )
>     val dataRDD = ss.sparkContext.parallelize(data)
>     val schema = StructType(
>       Seq(
>         StructField("key", IntegerType),
>         StructField("value", StringType)
>       ))
>     val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
>     // Convert to an RDD.
>     val rdd = dataDF.queryExecution.toRdd
>     
>     // Collect the data to compare.
>     val resultData = rdd.collect
>     resultData.foreach { row =>
>       // Log for visualizing the corruption.
>       Log.error(s"${row.getInt(0)}")
>     }
>     // Ensure the keys in the original data and resulting data match.
>     val dataKeys = data.map(_.getInt(0)).toSet
>     val resultKeys = resultData.map(_.getInt(0)).toSet
>     assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but was:<Set(0,
25, 125, 150, 199, 99, 75, 100)>
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>    val encoder = RowEncoder.apply(schema).resolveAndBind()
>    internalRows.map(encoder.fromRow)
> }
> {code}
> Converting with CatalystTypeConverters also appears to resolve the issue:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>    val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
>    internalRows.map(ir => typeConverter(ir).asInstanceOf[Row])
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message