spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gen tang <gen.tan...@gmail.com>
Subject Re: A DataFrame cache bug
Date Wed, 22 Feb 2017 08:02:50 GMT
Hi, The example that I provided is not very clear. And I add a more clear
example in jira.

Thanks

Cheers
Gen

On Wed, Feb 22, 2017 at 3:47 PM, gen tang <gen.tang86@gmail.com> wrote:

> Hi Kazuaki Ishizaki
>
> Thanks a lot for your help. It works. However, a more strange bug appears
> as follows:
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.SparkSession
>
> def f(path: String, spark: SparkSession): DataFrame = {
>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>   println(data.count)
>   val df = data.filter("id>10")
>   df.cache
>   println(df.count)
>   val df1 = df.filter("id>11")
>   df1.cache
>   println(df1.count)
>   df1
> }
>
> val dir = "/tmp/test"
> spark.range(100).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is correct
>
> spark.range(1000).write.mode("overwrite").parquet(dir)
> spark.catalog.refreshByPath(dir)
> f(dir, spark).count // output 88 which is incorrect
>
> If we move refreshByPath into f(), just before spark.read. The whole code
> works fine.
>
> Any idea? Thanks a lot
>
> Cheers
> Gen
>
>
> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki <ISHIZAKI@jp.ibm.com>
> wrote:
>
>> Hi,
>> Thank you for pointing out the JIRA.
>> I think that this JIRA suggests you to insert
>> "spark.catalog.refreshByPath(dir)".
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> Regards,
>> Kazuaki Ishizaki
>>
>>
>>
>> From:        gen tang <gen.tang86@gmail.com>
>> To:        dev@spark.apache.org
>> Date:        2017/02/22 15:02
>> Subject:        Re: A DataFrame cache bug
>> ------------------------------
>>
>>
>>
>> Hi All,
>>
>> I might find a related issue on jira:
>>
>> *https://issues.apache.org/jira/browse/SPARK-15678*
>> <https://issues.apache.org/jira/browse/SPARK-15678>
>>
>> This issue is closed, may be we should reopen it.
>>
>> Thanks
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*gen.tang86@gmail.com*
>> <gen.tang86@gmail.com>> wrote:
>> Hi All,
>>
>> I found a strange bug which is related with reading data from a updated
>> path and cache operation.
>> Please consider the following code:
>>
>> import org.apache.spark.sql.DataFrame
>>
>> def f(data: DataFrame): DataFrame = {
>>   val df = data.filter("id>10")
>>   df.cache
>>   df.count
>>   df
>> }
>>
>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>> correct
>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>> is correct
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> val df = spark.read.parquet(dir)
>> df.count // output 100 which is correct
>> f(df).count // output 89 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> val df1 = spark.read.parquet(dir)
>> df1.count // output 1000 which is correct, in fact other operation expect
>> df1.filter("id>10") return correct result.
>> f(df1).count // output 89 which is incorrect
>>
>> In fact when we use df1.filter("id>10"), spark will however use old
>> cached dataFrame
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>>
>

Mime
View raw message