spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dongjoon Hyun (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-15282) UDF executed twice when filter on new column created by withColumn and the final value may be not correct
Date Tue, 24 May 2016 05:39:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15297719#comment-15297719
] 

Dongjoon Hyun commented on SPARK-15282:
---------------------------------------

Thank you, @Linbo.
My approaches (defining ScalaUDF deterministic or changing PushDownPredicate optimizer) were
not proper for this PR.
Sorry for that.

> UDF executed twice when filter on new column created by withColumn and the final value
may be not correct
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15282
>                 URL: https://issues.apache.org/jira/browse/SPARK-15282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>         Environment: spark 1.6.1
>            Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on  [~tedyu] in current master
branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i filter the
values on this new columns and call show(action). I see the udf function (which is used to
by withColumn to create the new column) is called twice(duplicated). And if filter on "old"
column, udf only run once which is expected. I attached the example codes,  `filteredOnNewColumnDF.show`
shows the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> {code}
> *Updated: user-defined functions must be deterministic. Due to optimization, duplicate
invocations may be eliminated or the function may even be invoked more times than it is present
in the query.* refer to https://github.com/apache/spark/pull/13087
> For our certain use case, I want to add more detail descriptions. In our project, firstly
we generated a dataframe with one column called "fileName" one column called "url", and then
we use a udf function (used inside withColumn()) to download the files from the corresponding
urls and filter out '{}' data before writing to hdfs:
> {code:title=scala|borderStyle=solid}
> // df: DataFrame["fileName", "url"] 
> val getDataUDF = udf((url: String) => {
>     try { 
>        download data
>     } catch { case e: Exception =>
>       "{}"
>     }
>   })
> val df2 = df.withColumn("data", getDataUDF(df("url")))
>             .filter("data <> '{}'")
> df2.write.save("hdfs path")
> {code}
> Based on our logs, each file will be downloaded twice. As for the running time, the writing
job with filter will be twice as the one without filter.
> Another problem is about data correctness. Because it's downloaded twice for each file,
we came across some cases that the first downloading (getDataUDF) can get data (not '{}'),
and the second downloading return '{}' because of certain connection exception. But i found
the filter only worked on the first returned value so that spark will not remove this row
but the value inside "data" column was '{}' which is the second returned value. *Even after
filter, we get the result dataframe df2 like the follows (file2 with '{}' data should be removed)*:
> |fileName|url|data |
> |     file1       |   url1    |    sth  |    
> |     file2       |   url2    |    '{}'   |
> *So on the high level, we still get '{}' data after filtering out '{}', which is strange.
The reason I think is that UDF function is executed twice when filter on new column created
by withColumn, and two returned values are different: first one makes filter condition true
and second one makes filter condition false. The dataframe will keep the second value which
in fact should not appear after filter operation.*



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message