spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dongjoon Hyun (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-15282) UDF functioin executed twice when filter on new column created by withColumn
Date Fri, 13 May 2016 00:24:12 GMT

    [ https://issues.apache.org/jira/browse/SPARK-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15282279#comment-15282279
] 

Dongjoon Hyun commented on SPARK-15282:
---------------------------------------

Hi, [~linbojin]. 
I think I can find solution for this bug.
I'll make a PR.

> UDF functioin executed twice when filter on new column created by withColumn
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-15282
>                 URL: https://issues.apache.org/jira/browse/SPARK-15282
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>         Environment: spark 1.6.1
>            Reporter: Linbo
>
> I found this problem on spark version 1.6.1 and based on  [~tedyu] in current master
branch, the behavior is the same.
> Basically, i used udf and df.withColumn to create a "new" column, and then i filter the
values on this new columns and call show(action). I see the udf function (which is used to
by withColumn to create the new column) is called twice(duplicated). And if filter on "old"
column, udf only run once which is expected. I attached the example codes,  `filteredOnNewColumnDF.show`
shows the problem.
> {code:title=spark-shell|borderStyle=solid}
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new:
string]
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message