spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy Dang <nam...@gmail.com>
Subject Re: How to hint Spark to use HashAggregate() for UDAF
Date Tue, 10 Jan 2017 17:35:40 GMT
Thanks. It appears that TypedImperativeAggregate won't be available till
2.2.x. I'm stuck with my RDD approach then :(

-------
Regards,
Andy

On Tue, Jan 10, 2017 at 2:01 AM, Liang-Chi Hsieh <viirya@gmail.com> wrote:

>
> Hi Andy,
>
> Because hash-based aggregate uses unsafe row as aggregation states, so the
> aggregation buffer schema must be mutable types in unsafe row.
>
> If you can use TypedImperativeAggregate to implement your aggregation
> function, SparkSQL has ObjectHashAggregateExec which supports hash-based
> aggregate using arbitrary JVM objects as aggregation states.
>
>
>
> Andy Dang wrote
> > Hi Takeshi,
> >
> > Thanks for the answer. My UDAF aggregates data into an array of rows.
> >
> > Apparently this makes it ineligible to using Hash-based aggregate based
> on
> > the logic at:
> > https://github.com/apache/spark/blob/master/sql/core/
> src/main/java/org/apache/spark/sql/execution/
> UnsafeFixedWidthAggregationMap.java#L74
> > https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/java/org/apache/spark/sql/catalyst/
> expressions/UnsafeRow.java#L108
> >
> > The list of support data type is VERY limited unfortunately.
> >
> > It doesn't make sense to me that data type must be mutable for the UDAF
> to
> > use hash-based aggregate, but I could be missing something here :). I
> > could
> > achieve hash-based aggregate by turning this query to RDD mode, but that
> > is
> > counter intuitive IMO.
> >
> > -------
> > Regards,
> > Andy
> >
> > On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro &lt;
>
> > linguin.m.s@
>
> > &gt;
> > wrote:
> >
> >> Hi,
> >>
> >> Spark always uses hash-based aggregates if the types of aggregated data
> >> are supported there;
> >> otherwise, spark fails to use hash-based ones, then it uses sort-based
> >> ones.
> >> See: https://github.com/apache/spark/blob/master/sql/
> >> core/src/main/scala/org/apache/spark/sql/execution/
> >> aggregate/AggUtils.scala#L38
> >>
> >> So, I'm not sure about your query though, it seems the types of
> >> aggregated
> >> data in your query
> >> are not supported for hash-based aggregates.
> >>
> >> // maropu
> >>
> >>
> >>
> >> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang &lt;
>
> > namd88@
>
> > &gt; wrote:
> >>
> >>> Hi all,
> >>>
> >>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
> >>> which is very inefficient for certain aggration:
> >>>
> >>> The code is very simple:
> >>> - I have a UDAF
> >>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
> >>>
> >>> The physical plan I got was:
> >>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
> >>> +- Exchange SinglePartition
> >>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
> >>> output=[count#71L])
> >>>       +- *Project
> >>>          +- Generate explode(internal_col#31), false, false,
> >>> [internal_col#42]
> >>>             +- SortAggregate(key=[key#0],
> >>> functions=[aggregatefunction(key#0,
> >>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> >>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
> >>>                +- *Sort [key#0 ASC], false, 0
> >>>                   +- Exchange hashpartitioning(key#0, 200)
> >>>                      +- SortAggregate(key=[key#0],
> >>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
> >>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
> >>> output=[key#0,internal_col#37])
> >>>                         +- *Sort [key#0 ASC], false, 0
> >>>                            +- Scan ExistingRDD[key#0,nested#1,nes
> >>> tedArray#2,nestedObjectArray#3,value#4L]
> >>>
> >>> How can I make Spark to use HashAggregate (like the count(*)
> expression)
> >>> instead of SortAggregate with my UDAF?
> >>>
> >>> Is it intentional? Is there an issue tracking this?
> >>>
> >>> -------
> >>> Regards,
> >>> Andy
> >>>
> >>
> >>
> >>
> >> --
> >> ---
> >> Takeshi Yamamuro
> >>
>
>
>
>
>
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/How-to-hint-Spark-
> to-use-HashAggregate-for-UDAF-tp20526p20531.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Mime
View raw message