spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazuaki Ishizaki" <ISHIZ...@jp.ibm.com>
Subject Re: A DataFrame cache bug
Date Wed, 22 Feb 2017 06:22:40 GMT
Hi,
Thank you for pointing out the JIRA.
I think that this JIRA suggests you to insert 
"spark.catalog.refreshByPath(dir)".

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
spark.catalog.refreshByPath(dir)  // insert a NEW statement
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect 
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

Regards,
Kazuaki Ishizaki



From:   gen tang <gen.tang86@gmail.com>
To:     dev@spark.apache.org
Date:   2017/02/22 15:02
Subject:        Re: A DataFrame cache bug



Hi All,

I might find a related issue on jira:

https://issues.apache.org/jira/browse/SPARK-15678

This issue is closed, may be we should reopen it.

Thanks 

Cheers
Gen


On Wed, Feb 22, 2017 at 1:57 PM, gen tang <gen.tang86@gmail.com> wrote:
Hi All,

I found a strange bug which is related with reading data from a updated 
path and cache operation.
Please consider the following code:

import org.apache.spark.sql.DataFrame

def f(data: DataFrame): DataFrame = {
  val df = data.filter("id>10")
  df.cache
  df.count
  df
}

f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is 
correct
f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which is 
correct

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
val df = spark.read.parquet(dir)
df.count // output 100 which is correct
f(df).count // output 89 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
val df1 = spark.read.parquet(dir)
df1.count // output 1000 which is correct, in fact other operation expect 
df1.filter("id>10") return correct result.
f(df1).count // output 89 which is incorrect

In fact when we use df1.filter("id>10"), spark will however use old cached 
dataFrame

Any idea? Thanks a lot

Cheers
Gen




Mime
View raw message