spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gen tang <gen.tan...@gmail.com>
Subject Re: A DataFrame cache bug
Date Wed, 22 Feb 2017 07:47:29 GMT
Hi Kazuaki Ishizaki

Thanks a lot for your help. It works. However, a more strange bug appears
as follows:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)
f(dir, spark).count // output 88 which is incorrect

If we move refreshByPath into f(), just before spark.read. The whole code
works fine.

Any idea? Thanks a lot

Cheers
Gen


On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <ISHIZAKI@jp.ibm.com>
wrote:

> Hi,
> Thank you for pointing out the JIRA.
> I think that this JIRA suggests you to insert "spark.catalog.refreshByPath(
> dir)".
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)  // insert a NEW statement
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From:        gen tang <gen.tang86@gmail.com>
> To:        dev@spark.apache.org
> Date:        2017/02/22 15:02
> Subject:        Re: A DataFrame cache bug
> ------------------------------
>
>
>
> Hi All,
>
> I might find a related issue on jira:
>
> *https://issues.apache.org/jira/browse/SPARK-15678*
> <https://issues.apache.org/jira/browse/SPARK-15678>
>
> This issue is closed, may be we should reopen it.
>
> Thanks
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tang86@gmail.com*
> <gen.tang86@gmail.com>> wrote:
> Hi All,
>
> I found a strange bug which is related with reading data from a updated
> path and cache operation.
> Please consider the following code:
>
> import org.apache.spark.sql.DataFrame
>
> def f(data: DataFrame): DataFrame = {
>   val df = data.filter("id>10")
>   df.cache
>   df.count
>   df
> }
>
> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
> correct
> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is
> correct
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> val df = spark.read.parquet(dir)
> df.count // output 100 which is correct
> f(df).count // output 89 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> val df1 = spark.read.parquet(dir)
> df1.count // output 1000 which is correct, in fact other operation expect
> df1.filter("id>10") return correct result.
> f(df1).count // output 89 which is incorrect
>
> In fact when we use df1.filter("id>10"), spark will however use old cached
> dataFrame
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
>

Mime
View raw message