spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
Date Mon, 08 Aug 2016 22:02:24 GMT
The show thing was the result of an optimization that short-circuited any
real Spark computation when the input is a local collection, and the result
was simply the first few rows. That's why it completed without serializing
anything.

It is somewhat inconsistent. One way to eliminate the inconsistency is to
always serialize the query plan even for local execution. We did that back
in the days for the RDD code path, and we can do similar things for the SQL
code path. However, serialization is not free and it will slow down the
execution by small percentage.



On Tue, Aug 9, 2016 at 5:05 AM, Hao Ren <invkrh@gmail.com> wrote:

> @Reynold
>
> Some questions to make things clear:
>
> 1. As nothing is really final in the JVM, is the generated code during
> the execution of `df.show()` different from the one of `df.filter($"key"
> === 2).show()` in my snippet ?
>
> 2. When `df.show()` is being executed, it seems that the 'notSer' object
> is not serialized (since no exception), instead the Int value in it is
> serialized. Is this correct ?
> As for me, this behavior is counterintuitive.
> The analogical problem would be a `RDD.map` 's closure contains
> 'notSer.value'. For example:
> ====
> rdd.map {
>       case (key, value) => value + notSer.value
>     }
> rdd.count
> ====
> It should thrown a "Task not serializable" exception. But for dataframe,
> it is not the case because of reflection or unsafe.
>
> 3. I am wondering whether this "feature" of scala complier makes the
> DataFrame API unpredictable ? Any roadmap on this ?
> As a user, I can not expect that a `fitler` call before `show` crashes,
> while a simple `show` on the original df works.
>
> The workaround I can imagine is just to cache and materialize `df` by
> `df.cache.count()`, and then call `df.filter(...).show()`.
> It should work, just a little bit tedious.
>
>
>
> On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin <rxin@databricks.com> wrote:
>
>> That is unfortunately the way how Scala compiler captures (and defines)
>> closures. Nothing is really final in the JVM. You can always use reflection
>> or unsafe to modify the value of fields.
>>
>> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <
>> Simon.Scott@viavisolutions.com> wrote:
>>
>>> But does the “notSer” object have to be serialized?
>>>
>>>
>>>
>>> The object is immutable by the definition of A, so the only thing that
>>> needs to be serialized is the (immutable) Int value? And Ints are
>>> serializable?
>>>
>>>
>>>
>>> Just thinking out loud
>>>
>>>
>>>
>>> Simon Scott
>>>
>>>
>>>
>>> Research Developer @ viavisolutions.com
>>>
>>>
>>>
>>> *From:* Hao Ren [mailto:invkrh@gmail.com]
>>> *Sent:* 08 August 2016 09:03
>>> *To:* Muthu Jayakumar <babloo80@gmail.com>
>>> *Cc:* user <user@spark.apache.org>; dev <dev@spark.apache.org>
>>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>>> does not work as expected
>>>
>>>
>>>
>>> Yes, it is.
>>>
>>> You can define a udf like that.
>>>
>>> Basically, it's a udf Int => Int which is a closure contains a non
>>> serializable object.
>>>
>>> The latter should cause Task not serializable exception.
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <babloo80@gmail.com>
>>> wrote:
>>>
>>> Hello Hao Ren,
>>>
>>>
>>>
>>> Doesn't the code...
>>>
>>>
>>>
>>> val add = udf {
>>>
>>>       (a: Int) => a + notSer.value
>>>
>>>     }
>>>
>>> Mean UDF function that Int => Int ?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Muthu
>>>
>>>
>>>
>>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <invkrh@gmail.com> wrote:
>>>
>>> I am playing with spark 2.0
>>>
>>> What I tried to test is:
>>>
>>>
>>>
>>> Create a UDF in which there is a non serializable object.
>>>
>>> What I expected is when this UDF is called during materializing the
>>> dataFrame where the UDF is used in "select", an task non serializable
>>> exception should be thrown.
>>>
>>> It depends also which "action" is called on that dataframe.
>>>
>>>
>>>
>>> Here is the code for reproducing the pb:
>>>
>>>
>>>
>>> ============
>>>
>>> object DataFrameSerDeTest extends App {
>>>
>>>
>>>
>>>   class A(val value: Int) // It is not serializable
>>>
>>>
>>>
>>>   def run() = {
>>>
>>>     val spark = SparkSession
>>>
>>>       .builder()
>>>
>>>       .appName("DataFrameSerDeTest")
>>>
>>>       .master("local[*]")
>>>
>>>       .getOrCreate()
>>>
>>>
>>>
>>>     import org.apache.spark.sql.functions.udf
>>>
>>>     import spark.sqlContext.implicits._
>>>
>>>
>>>
>>>     val notSer = new A(2)
>>>
>>>     val add = udf {
>>>
>>>       (a: Int) => a + notSer.value
>>>
>>>     }
>>>
>>>     val df = spark.createDataFrame(Seq(
>>>
>>>       (1, 2),
>>>
>>>       (2, 2),
>>>
>>>       (3, 2),
>>>
>>>       (4, 2)
>>>
>>>     )).toDF("key", "value")
>>>
>>>       .select($"key", add($"value").as("added"))
>>>
>>>
>>>
>>>     df.show() // *It should not work because the udf contains a
>>> non-serializable object, but it works*
>>>
>>>
>>>
>>>     df.filter($"key" === 2).show() // *It does not work as expected
>>> (org.apache.spark.SparkException: Task not serializable)*
>>>
>>>   }
>>>
>>>
>>>
>>>   run()
>>>
>>> }
>>>
>>> ============
>>>
>>>
>>>
>>> Also, I tried collect(), count(), first(), limit(). All of them worked
>>> without non-serializable exceptions.
>>>
>>> It seems only filter() throws the exception. (feature or bug ?)
>>>
>>>
>>>
>>> Any ideas ? Or I just messed things up ?
>>>
>>> Any help is highly appreciated.
>>>
>>>
>>>
>>> --
>>>
>>> Hao Ren
>>>
>>>
>>>
>>> Data Engineer @ leboncoin
>>>
>>>
>>>
>>> Paris, France
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Hao Ren
>>>
>>>
>>>
>>> Data Engineer @ leboncoin
>>>
>>>
>>>
>>> Paris, France
>>>
>>
>>
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>

Mime
View raw message