spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gen TANG (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-15678) Not use cache on appends and overwrites
Date Fri, 24 Feb 2017 10:44:44 GMT

    [ https://issues.apache.org/jira/browse/SPARK-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877735#comment-15877735
] 

Gen TANG edited comment on SPARK-15678 at 2/24/17 10:44 AM:
------------------------------------------------------------

Hi, All 

It seems that refreshByPath(_path_) should be called 2 times if there are n (n > 1) cache
operation on dataFrame from the _path_. 

Therefore please reopen this issue

{code:title=not work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)  // always correct
  val df = data.filter("id>10")
  df.cache
  println(df.count) // always correct
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is incorrect
{code}

{code:title=work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 988 which is incorrect
{code}



was (Author: gen):
Hi, All 

It seems that refreshByPath(_path_) should be called 2 times if there are n (n > 1) cache
operation on dataFrame from the _path_. So please reopen this issue

{code:title=not work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)  // always correct
  val df = data.filter("id>10")
  df.cache
  println(df.count) // always correct
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is incorrect
{code}

{code:title=work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 988 which is incorrect
{code}


> Not use cache on appends and overwrites
> ---------------------------------------
>
>                 Key: SPARK-15678
>                 URL: https://issues.apache.org/jira/browse/SPARK-15678
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Sameer Agarwal
>            Assignee: Sameer Agarwal
>             Fix For: 2.0.0
>
>
> SparkSQL currently doesn't drop caches if the underlying data is overwritten.
> {code}
> val dir = "/tmp/test"
> sqlContext.range(1000).write.mode("overwrite").parquet(dir)
> val df = sqlContext.read.parquet(dir).cache()
> df.count() // outputs 1000
> sqlContext.range(10).write.mode("overwrite").parquet(dir)
> sqlContext.read.parquet(dir).count() // outputs 1000 instead of 10 <---- We are still
using the cached dataset
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message