spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <sro...@gmail.com>
Subject Re: [DISCUSS] Expensive deterministic UDFs
Date Thu, 07 Nov 2019 10:09:06 GMT
Interesting, what does non-deterministic do except have this effect?
aside from the naming, it could be a fine use of this flag if that's
all it effectively does. I'm not sure I'd introduce another flag with
the same semantics just over naming. If anything 'expensive' also
isn't the right word, more like 'try not to evaluate multiple times'.

Why isn't caching the answer? I realize it's big, but you can cache to
disk. This may be faster than whatever plan reordering has to happen
to evaluate once.

Usually I'd say, can you redesign your UDF and code to be more
efficient too? or use a big a cluster if that's really what you need.

At first look, no I don't think this Spark-side workaround for naming
for your use case is worthwhile. There are existing better solutions.

On Thu, Nov 7, 2019 at 2:45 AM Enrico Minack <mail@enrico.minack.dev> wrote:
>
> Hi all,
>
> Running expensive deterministic UDFs that return complex types, followed by multiple
references to those results cause Spark to evaluate the UDF multiple times per row. This has
been reported and discussed before: SPARK-18748 SPARK-17728
>
>     val f: Int => Array[Int]
>     val udfF = udf(f)
>     df
>       .select($"id", udfF($"id").as("array"))
>       .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> A common approach to make Spark evaluate the UDF only once is to cache the intermediate
result right after projecting the UDF:
>
>     df
>       .select($"id", udfF($"id").as("array"))
>       .cache()
>       .select($"array"(0).as("array0"), $"array"(1).as("array1"))
>
> There are scenarios where this intermediate result is too big for the cluster to cache.
Also this is bad design.
>
> The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query
in a way that the UDF gets called only once per row, exactly what you want.
>
>     val udfF = udf(f).asNondeterministic()
>
> However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive
and makes your code harder to read.
>
> Spark should provide a better way to flag the UDF. Calling it expensive would be a better
naming here.
>
>     val udfF = udf(f).asExpensive()
>
> I understand that deterministic is a notion that Expression provides, and there is no
equivalent to expensive that is understood by the optimizer. However, that asExpensive() could
just set the ScalaUDF.udfDeterministic = deterministic && !expensive, which implements
the best available approach behind a better naming.
>
> What are your thoughts on asExpensive()?
>
> Regards,
> Enrico

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Mime
View raw message