spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hao Ren <inv...@gmail.com>
Subject Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
Date Mon, 08 Aug 2016 08:02:31 GMT
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non
serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <babloo80@gmail.com> wrote:

> Hello Hao Ren,
>
> Doesn't the code...
>
> val add = udf {
>       (a: Int) => a + notSer.value
>     }
> Mean UDF function that Int => Int ?
>
> Thanks,
> Muthu
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <invkrh@gmail.com> wrote:
>
>> I am playing with spark 2.0
>> What I tried to test is:
>>
>> Create a UDF in which there is a non serializable object.
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>> It depends also which "action" is called on that dataframe.
>>
>> Here is the code for reproducing the pb:
>>
>> ============
>> object DataFrameSerDeTest extends App {
>>
>>   class A(val value: Int) // It is not serializable
>>
>>   def run() = {
>>     val spark = SparkSession
>>       .builder()
>>       .appName("DataFrameSerDeTest")
>>       .master("local[*]")
>>       .getOrCreate()
>>
>>     import org.apache.spark.sql.functions.udf
>>     import spark.sqlContext.implicits._
>>
>>     val notSer = new A(2)
>>     val add = udf {
>>       (a: Int) => a + notSer.value
>>     }
>>     val df = spark.createDataFrame(Seq(
>>       (1, 2),
>>       (2, 2),
>>       (3, 2),
>>       (4, 2)
>>     )).toDF("key", "value")
>>       .select($"key", add($"value").as("added"))
>>
>>     df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>>     df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>   }
>>
>>   run()
>> }
>> ============
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>> Any ideas ? Or I just messed things up ?
>> Any help is highly appreciated.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Mime
View raw message