spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ryan Blue (JIRA)" <>
Subject [jira] [Created] (SPARK-25060) PySpark UDF in case statement is always run
Date Wed, 08 Aug 2018 20:38:00 GMT
Ryan Blue created SPARK-25060:

             Summary: PySpark UDF in case statement is always run
                 Key: SPARK-25060
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.1
            Reporter: Ryan Blue

When evaluating a case statement with a python UDF, Spark will always run the UDF even if
the case doesn't use the branch with the UDF call. Here's a repro case:

from pyspark.sql.types import StringType

def fail_if_x(s):
    assert s != 'x'
    return s

spark.udf.register("fail_if_x", fail_if_x, StringType())

df = spark.createDataFrame([(1, 'x'), (2, 'y')], ['id', 'str'])


spark.sql("select id, case when str <> 'x' then fail_if_x(str) else null end from data").show()

This produces the following error:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/",
line 189, in main 
  File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/",
line 184, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
  File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/",
line 104, in <lambda> 
    func = lambda _, it: map(mapper, it) 
  File "<string>", line 1, in <lambda> 
  File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/",
line 71, in <lambda> 
    return lambda *a: f(*a) 
  File "<ipython-input-1-91ba29d7e46f>", line 4, in fail_if_x 

This is because Python UDFs are extracted from expressions and run in the BatchEvalPython
node inserted as the child of the expression node:

== Physical Plan ==
CollectLimit 21
+- *Project [id#0L, CASE WHEN NOT (str#1 = x) THEN pythonUDF0#14 ELSE null END AS CASE WHEN
(NOT (str = x)) THEN fail_if_x(str) ELSE CAST(NULL AS STRING) END#6]
   +- BatchEvalPython [fail_if_x(str#1)], [id#0L, str#1, pythonUDF0#14]
      +- Scan ExistingRDD[id#0L,str#1]

This doesn't affect correctness, but the behavior doesn't match the Scala API where case can
be used to avoid passing data that will cause a UDF to fail into the UDF.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message