spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy Dang <nam...@gmail.com>
Subject Re: How to hint Spark to use HashAggregate() for UDAF
Date Mon, 09 Jan 2017 14:13:56 GMT
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

-------
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro <linguin.m.s@gmail.com>
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <namd88@gmail.com> wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>       +- *Project
>>          +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>>             +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>                +- *Sort [key#0 ASC], false, 0
>>                   +- Exchange hashpartitioning(key#0, 200)
>>                      +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>>                         +- *Sort [key#0 ASC], false, 0
>>                            +- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> -------
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Mime
View raw message