spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rubén Berenguel" <rbereng...@gmail.com>
Subject Re: [DISCUSS] Expensive deterministic UDFs
Date Thu, 07 Nov 2019 10:20:02 GMT
That was very interesting, thanks Enrico.

Sean, IIRC it also prevents push down of the UDF in Catalyst in some cases.

Regards, 

Ruben

> On 7 Nov 2019, at 11:09, Sean Owen <srowen@gmail.com> wrote:
> 
> Interesting, what does non-deterministic do except have this effect?
> aside from the naming, it could be a fine use of this flag if that's
> all it effectively does. I'm not sure I'd introduce another flag with
> the same semantics just over naming. If anything 'expensive' also
> isn't the right word, more like 'try not to evaluate multiple times'.
> 
> Why isn't caching the answer? I realize it's big, but you can cache to
> disk. This may be faster than whatever plan reordering has to happen
> to evaluate once.
> 
> Usually I'd say, can you redesign your UDF and code to be more
> efficient too? or use a big a cluster if that's really what you need.
> 
> At first look, no I don't think this Spark-side workaround for naming
> for your use case is worthwhile. There are existing better solutions.
> 
> On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <mail@enrico.minack.dev> wrote:
>> 
>> Hi all,
>> 
>> Running expensive deterministic UDFs that return complex types, followed by multiple
references to those results cause Spark to evaluate the UDF multiple times per row. This has
been reported and discussed before: SPARK-18748 SPARK-17728
>> 
>>    val f: Int => Array[Int]
>>    val udfF = udf(f)
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> A common approach to make Spark evaluate the UDF only once is to cache the intermediate
result right after projecting the UDF:
>> 
>>    df
>>      .select($"id", udfF($"id").as("array"))
>>      .cache()
>>      .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>> 
>> There are scenarios where this intermediate result is too big for the cluster to
cache. Also this is bad design.
>> 
>> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the
query in a way that the UDF gets called only once per row, exactly what you want.
>> 
>>    val udfF = udf(f).asNondeterministic()
>> 
>> However, stating a UDF is non-deterministic though it clearly is deterministic is
counter-intuitive and makes your code harder to read.
>> 
>> Spark should provide a better way to flag the UDF. Calling it expensive would be
a better naming here.
>> 
>>    val udfF = udf(f).asExpensive()
>> 
>> I understand that deterministic is a notion that Expression provides, and there is
no equivalent to expensive that is understood by the optimizer. However, that asExpensive()
could just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which
implements the best available approach behind a better naming.
>> 
>> What are your thoughts on asExpensive()?
>> 
>> Regards,
>> Enrico
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Mime
View raw message