drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacques Nadeau <jacq...@dremio.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Wed, 07 Oct 2015 00:53:57 GMT
I'm not sure how to accomplish this cleanly. The concept of two-phased
agg-key distributed aggregation (and exchanges in general) seems very much
a physical concept. Since Phoenix can only do half this operation (in
parallel), I'm having trouble figuring out what the logical plan would look
like if we did this transformation in the logical planning phase. (In
general, I think part of the problem is that each phase of planning can
output only a single plan.) Since we have chosen to break the planning into
~5 phases due to performance, we have to pick where the transformations are
most appropriate.

--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Tue, Oct 6, 2015 at 11:31 AM, Julian Hyde <jhyde@apache.org> wrote:

> Drill's current approach seems adequate for Drill alone but extending
> it to a heterogenous system that includes Phoenix seems like a hack.
>
> I think you should only create Prels for algebra nodes that you know
> for sure are going to run on the Drill engine. If there's a
> possibility that it would run in another engine such as Phoenix then
> they should still be logical.
>
> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann.xue@gmail.com>
> wrote:
> > The partial aggregate seems to be working now, with one interface
> extension
> > and one bug fix in the Phoenix project. Will do some code cleanup and
> > create a pull request soon.
> >
> > Still there was a hack in the Drill project which I made to force 2-phase
> > aggregation. I'll try to fix that.
> >
> > Jacques, I have one question though, how can I verify that there are more
> > than one slice and the shuffle happens?
> >
> >
> > Thanks,
> > Maryann
> >
> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestaylor@apache.org>
> wrote:
> >
> >> Maryann,
> >> I believe Jacques mentioned that a little bit of refactoring is required
> >> for a merge sort to occur - there's something that does that, but it's
> not
> >> expected to be used in this context currently.
> >>
> >> IMHO, there's more of a clear value in getting the aggregation to use
> >> Phoenix first, so I'd recommend going down that road as Jacques
> mentioned
> >> above if possible. Once that's working, we can circle back to the
> partial
> >> sort.
> >>
> >> Thoughts?
> >> James
> >>
> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann.xue@gmail.com>
> >> wrote:
> >>
> >>> I actually tried implementing partial sort with
> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
> >>> little easier to start with than partial aggregation. But I found that
> even
> >>> though the code worked (returned the right results), the Drill side
> sort
> >>> turned out to be a ordinary sort instead of a merge which it should
> have
> >>> been. Any idea of how to fix that?
> >>>
> >>>
> >>> Thanks,
> >>> Maryann
> >>>
> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacques@dremio.com>
> >>> wrote:
> >>>
> >>>> Right now this type of work is done here:
> >>>>
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
> >>>>
> >>>> With Distribution Trait application here:
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
> >>>>
> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
> rule
> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
> >>>> input. It would replace everywhere but would only be plannable when
> it is
> >>>> the first phase of aggregation.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Jacques Nadeau
> >>>> CTO and Co-Founder, Dremio
> >>>>
> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jhyde@apache.org>
wrote:
> >>>>
> >>>>> Phoenix is able to perform quite a few relational operations on
the
> >>>>> region server: scan, filter, project, aggregate, sort (optionally
> with
> >>>>> limit). However, the sort and aggregate are necessarily "local".
They
> >>>>> can only deal with data on that region server, and there needs to
be
> a
> >>>>> further operation to combine the results from the region servers.
> >>>>>
> >>>>> The question is how to plan such queries. I think the answer is
an
> >>>>> AggregateExchangeTransposeRule.
> >>>>>
> >>>>> The rule would spot an Aggregate on a data source that is split
into
> >>>>> multiple locations (partitions) and split it into a partial Aggregate
> >>>>> that computes sub-totals and a summarizing Aggregate that combines
> >>>>> those totals.
> >>>>>
> >>>>> How does the planner know that the Aggregate needs to be split?
Since
> >>>>> the data's distribution has changed, there would need to be an
> >>>>> Exchange operator. It is the Exchange operator that triggers the
rule
> >>>>> to fire.
> >>>>>
> >>>>> There are some special cases. If the data is sorted as well as
> >>>>> partitioned (say because the local aggregate uses a sort-based
> >>>>> algorithm) we could maybe use a more efficient plan. And if the
> >>>>> partition key is the same as the aggregation key we don't need a
> >>>>> summarizing Aggregate, just a Union.
> >>>>>
> >>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
> >>>>> scenario, once the Aggregate has been pushed through the Exchange
> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
make
> >>>>> it into a PhoenixServerAggregate that executes in the region server.
> >>>>>
> >>>>> Related issues:
> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
> >>>>>
> >>>>> Julian
> >>>>>
> >>>>
> >>>>
> >>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message