spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Liang-Chi Hsieh <vii...@gmail.com>
Subject Re: A DataFrame cache bug
Date Sun, 26 Feb 2017 12:25:51 GMT


Hi Gen,

I submitted a PR to fix the issue of refreshByPath:
https://github.com/apache/spark/pull/17064

Thanks.



tgbaggio wrote
> Hi, The example that I provided is not very clear. And I add a more clear
> example in jira.
> 
> Thanks
> 
> Cheers
> Gen
> 
> On Wed, Feb 22, 2017 at 3:47 PM, gen tang &lt;

> gen.tang86@

> &gt; wrote:
> 
>> Hi Kazuaki Ishizaki
>>
>> Thanks a lot for your help. It works. However, a more strange bug appears
>> as follows:
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.SparkSession
>>
>> def f(path: String, spark: SparkSession): DataFrame = {
>>   val data = spark.read.option("mergeSchema", "true").parquet(path)
>>   println(data.count)
>>   val df = data.filter("id>10")
>>   df.cache
>>   println(df.count)
>>   val df1 = df.filter("id>11")
>>   df1.cache
>>   println(df1.count)
>>   df1
>> }
>>
>> val dir = "/tmp/test"
>> spark.range(100).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)
>> f(dir, spark).count // output 88 which is correct
>>
>> spark.range(1000).write.mode("overwrite").parquet(dir)
>> spark.catalog.refreshByPath(dir)
>> f(dir, spark).count // output 88 which is incorrect
>>
>> If we move refreshByPath into f(), just before spark.read. The whole code
>> works fine.
>>
>> Any idea? Thanks a lot
>>
>> Cheers
>> Gen
>>
>>
>> On Wed, Feb 22, 2017 at 2:22 PM, Kazuaki Ishizaki &lt;

> ISHIZAKI@.ibm

> &gt;
>> wrote:
>>
>>> Hi,
>>> Thank you for pointing out the JIRA.
>>> I think that this JIRA suggests you to insert
>>> "spark.catalog.refreshByPath(dir)".
>>>
>>> val dir = "/tmp/test"
>>> spark.range(100).write.mode("overwrite").parquet(dir)
>>> val df = spark.read.parquet(dir)
>>> df.count // output 100 which is correct
>>> f(df).count // output 89 which is correct
>>>
>>> spark.range(1000).write.mode("overwrite").parquet(dir)
>>> spark.catalog.refreshByPath(dir)  // insert a NEW statement
>>> val df1 = spark.read.parquet(dir)
>>> df1.count // output 1000 which is correct, in fact other operation
>>> expect
>>> df1.filter("id>10") return correct result.
>>> f(df1).count // output 89 which is incorrect
>>>
>>> Regards,
>>> Kazuaki Ishizaki
>>>
>>>
>>>
>>> From:        gen tang &lt;

> gen.tang86@

> &gt;
>>> To:        

> dev@.apache

>>> Date:        2017/02/22 15:02
>>> Subject:        Re: A DataFrame cache bug
>>> ------------------------------
>>>
>>>
>>>
>>> Hi All,
>>>
>>> I might find a related issue on jira:
>>>
>>> *https://issues.apache.org/jira/browse/SPARK-15678*
>>> &lt;https://issues.apache.org/jira/browse/SPARK-15678&gt;
>>>
>>> This issue is closed, may be we should reopen it.
>>>
>>> Thanks
>>>
>>> Cheers
>>> Gen
>>>
>>>
>>> On Wed, Feb 22, 2017 at 1:57 PM, gen tang <*

> gen.tang86@

> *
>>> &lt;

> gen.tang86@

> &gt;> wrote:
>>> Hi All,
>>>
>>> I found a strange bug which is related with reading data from a updated
>>> path and cache operation.
>>> Please consider the following code:
>>>
>>> import org.apache.spark.sql.DataFrame
>>>
>>> def f(data: DataFrame): DataFrame = {
>>>   val df = data.filter("id>10")
>>>   df.cache
>>>   df.count
>>>   df
>>> }
>>>
>>> f(spark.range(100).asInstanceOf[DataFrame]).count // output 89 which is
>>> correct
>>> f(spark.range(1000).asInstanceOf[DataFrame]).count // output 989 which
>>> is correct
>>>
>>> val dir = "/tmp/test"
>>> spark.range(100).write.mode("overwrite").parquet(dir)
>>> val df = spark.read.parquet(dir)
>>> df.count // output 100 which is correct
>>> f(df).count // output 89 which is correct
>>>
>>> spark.range(1000).write.mode("overwrite").parquet(dir)
>>> val df1 = spark.read.parquet(dir)
>>> df1.count // output 1000 which is correct, in fact other operation
>>> expect
>>> df1.filter("id>10") return correct result.
>>> f(df1).count // output 89 which is incorrect
>>>
>>> In fact when we use df1.filter("id>10"), spark will however use old
>>> cached dataFrame
>>>
>>> Any idea? Thanks a lot
>>>
>>> Cheers
>>> Gen
>>>
>>>
>>>
>>





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/A-DataFrame-cache-bug-tp21044p21082.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Mime
View raw message