spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hao Ren <inv...@gmail.com>
Subject Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
Date Mon, 08 Aug 2016 21:05:07 GMT
@Reynold

Some questions to make things clear:

1. As nothing is really final in the JVM, is the generated code during the
execution of `df.show()` different from the one of `df.filter($"key" ===
2).show()` in my snippet ?

2. When `df.show()` is being executed, it seems that the 'notSer' object is
not serialized (since no exception), instead the Int value in it is
serialized. Is this correct ?
As for me, this behavior is counterintuitive.
The analogical problem would be a `RDD.map` 's closure contains
'notSer.value'. For example:
====
rdd.map {
      case (key, value) => value + notSer.value
    }
rdd.count
====
It should thrown a "Task not serializable" exception. But for dataframe, it
is not the case because of reflection or unsafe.

3. I am wondering whether this "feature" of scala complier makes the
DataFrame API unpredictable ? Any roadmap on this ?
As a user, I can not expect that a `fitler` call before `show` crashes,
while a simple `show` on the original df works.

The workaround I can imagine is just to cache and materialize `df` by
`df.cache.count()`, and then call `df.filter(...).show()`.
It should work, just a little bit tedious.



On Mon, Aug 8, 2016 at 10:00 PM, Reynold Xin <rxin@databricks.com> wrote:

> That is unfortunately the way how Scala compiler captures (and defines)
> closures. Nothing is really final in the JVM. You can always use reflection
> or unsafe to modify the value of fields.
>
> On Mon, Aug 8, 2016 at 8:16 PM, Simon Scott <Simon.Scott@viavisolutions.
> com> wrote:
>
>> But does the “notSer” object have to be serialized?
>>
>>
>>
>> The object is immutable by the definition of A, so the only thing that
>> needs to be serialized is the (immutable) Int value? And Ints are
>> serializable?
>>
>>
>>
>> Just thinking out loud
>>
>>
>>
>> Simon Scott
>>
>>
>>
>> Research Developer @ viavisolutions.com
>>
>>
>>
>> *From:* Hao Ren [mailto:invkrh@gmail.com]
>> *Sent:* 08 August 2016 09:03
>> *To:* Muthu Jayakumar <babloo80@gmail.com>
>> *Cc:* user <user@spark.apache.org>; dev <dev@spark.apache.org>
>> *Subject:* Re: [SPARK-2.0][SQL] UDF containing non-serializable object
>> does not work as expected
>>
>>
>>
>> Yes, it is.
>>
>> You can define a udf like that.
>>
>> Basically, it's a udf Int => Int which is a closure contains a non
>> serializable object.
>>
>> The latter should cause Task not serializable exception.
>>
>>
>>
>> Hao
>>
>>
>>
>> On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <babloo80@gmail.com>
>> wrote:
>>
>> Hello Hao Ren,
>>
>>
>>
>> Doesn't the code...
>>
>>
>>
>> val add = udf {
>>
>>       (a: Int) => a + notSer.value
>>
>>     }
>>
>> Mean UDF function that Int => Int ?
>>
>>
>>
>> Thanks,
>>
>> Muthu
>>
>>
>>
>> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <invkrh@gmail.com> wrote:
>>
>> I am playing with spark 2.0
>>
>> What I tried to test is:
>>
>>
>>
>> Create a UDF in which there is a non serializable object.
>>
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>>
>> It depends also which "action" is called on that dataframe.
>>
>>
>>
>> Here is the code for reproducing the pb:
>>
>>
>>
>> ============
>>
>> object DataFrameSerDeTest extends App {
>>
>>
>>
>>   class A(val value: Int) // It is not serializable
>>
>>
>>
>>   def run() = {
>>
>>     val spark = SparkSession
>>
>>       .builder()
>>
>>       .appName("DataFrameSerDeTest")
>>
>>       .master("local[*]")
>>
>>       .getOrCreate()
>>
>>
>>
>>     import org.apache.spark.sql.functions.udf
>>
>>     import spark.sqlContext.implicits._
>>
>>
>>
>>     val notSer = new A(2)
>>
>>     val add = udf {
>>
>>       (a: Int) => a + notSer.value
>>
>>     }
>>
>>     val df = spark.createDataFrame(Seq(
>>
>>       (1, 2),
>>
>>       (2, 2),
>>
>>       (3, 2),
>>
>>       (4, 2)
>>
>>     )).toDF("key", "value")
>>
>>       .select($"key", add($"value").as("added"))
>>
>>
>>
>>     df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>>
>>
>>     df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>
>>   }
>>
>>
>>
>>   run()
>>
>> }
>>
>> ============
>>
>>
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>>
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>>
>>
>> Any ideas ? Or I just messed things up ?
>>
>> Any help is highly appreciated.
>>
>>
>>
>> --
>>
>> Hao Ren
>>
>>
>>
>> Data Engineer @ leboncoin
>>
>>
>>
>> Paris, France
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Hao Ren
>>
>>
>>
>> Data Engineer @ leboncoin
>>
>>
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Mime
View raw message