drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maryann Xue <maryann....@gmail.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Tue, 06 Oct 2015 21:10:43 GMT
Added a few fixes in the pull request. Tested with two regions, turned out
that half of the result is empty (count = 0).
Not sure if there's anything wrong with
https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
.
Like Julian said, this rule looks a bit hacky.

To force a 2-phase HashAgg, I made a temporary change as well:

diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

index b911f6b..58bc918 100644

---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java

@@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {

   // If any of the aggregate functions are not one of these, then we

   // currently won't generate a 2 phase plan.

   protected boolean create2PhasePlan(RelOptRuleCall call,
DrillAggregateRel aggregate) {

-    PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());

-    RelNode child = call.rel(0).getInputs().get(0);

-    boolean smallInput = child.getRows() < settings.getSliceTarget();

-    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() ||
smallInput) {

-      return false;

-    }

+//    PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());

+//    RelNode child = call.rel(0).getInputs().get(0);

+//    boolean smallInput = child.getRows() < settings.getSliceTarget();

+//    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
|| smallInput) {

+//      return false;

+//    }


     for (AggregateCall aggCall : aggregate.getAggCallList()) {

       String name = aggCall.getAggregation().getName();


Thanks,
Maryann



On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org> wrote:

> Drill's current approach seems adequate for Drill alone but extending
> it to a heterogenous system that includes Phoenix seems like a hack.
>
> I think you should only create Prels for algebra nodes that you know
> for sure are going to run on the Drill engine. If there's a
> possibility that it would run in another engine such as Phoenix then
> they should still be logical.
>
> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann.xue@gmail.com>
> wrote:
> > The partial aggregate seems to be working now, with one interface
> extension
> > and one bug fix in the Phoenix project. Will do some code cleanup and
> > create a pull request soon.
> >
> > Still there was a hack in the Drill project which I made to force 2-phase
> > aggregation. I'll try to fix that.
> >
> > Jacques, I have one question though, how can I verify that there are more
> > than one slice and the shuffle happens?
> >
> >
> > Thanks,
> > Maryann
> >
> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestaylor@apache.org>
> wrote:
> >
> >> Maryann,
> >> I believe Jacques mentioned that a little bit of refactoring is required
> >> for a merge sort to occur - there's something that does that, but it's
> not
> >> expected to be used in this context currently.
> >>
> >> IMHO, there's more of a clear value in getting the aggregation to use
> >> Phoenix first, so I'd recommend going down that road as Jacques
> mentioned
> >> above if possible. Once that's working, we can circle back to the
> partial
> >> sort.
> >>
> >> Thoughts?
> >> James
> >>
> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann.xue@gmail.com>
> >> wrote:
> >>
> >>> I actually tried implementing partial sort with
> >>> https://github.com/jacques-n/drill/pull/4, which I figured might be a
> >>> little easier to start with than partial aggregation. But I found that
> even
> >>> though the code worked (returned the right results), the Drill side
> sort
> >>> turned out to be a ordinary sort instead of a merge which it should
> have
> >>> been. Any idea of how to fix that?
> >>>
> >>>
> >>> Thanks,
> >>> Maryann
> >>>
> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacques@dremio.com>
> >>> wrote:
> >>>
> >>>> Right now this type of work is done here:
> >>>>
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
> >>>>
> >>>> With Distribution Trait application here:
> >>>>
> >>>>
> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
> >>>>
> >>>> To me, the easiest way to solve the Phoenix issue is by providing a
> rule
> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention as
> >>>> input. It would replace everywhere but would only be plannable when
> it is
> >>>> the first phase of aggregation.
> >>>>
> >>>> Thoughts?
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Jacques Nadeau
> >>>> CTO and Co-Founder, Dremio
> >>>>
> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jhyde@apache.org>
wrote:
> >>>>
> >>>>> Phoenix is able to perform quite a few relational operations on
the
> >>>>> region server: scan, filter, project, aggregate, sort (optionally
> with
> >>>>> limit). However, the sort and aggregate are necessarily "local".
They
> >>>>> can only deal with data on that region server, and there needs to
be
> a
> >>>>> further operation to combine the results from the region servers.
> >>>>>
> >>>>> The question is how to plan such queries. I think the answer is
an
> >>>>> AggregateExchangeTransposeRule.
> >>>>>
> >>>>> The rule would spot an Aggregate on a data source that is split
into
> >>>>> multiple locations (partitions) and split it into a partial Aggregate
> >>>>> that computes sub-totals and a summarizing Aggregate that combines
> >>>>> those totals.
> >>>>>
> >>>>> How does the planner know that the Aggregate needs to be split?
Since
> >>>>> the data's distribution has changed, there would need to be an
> >>>>> Exchange operator. It is the Exchange operator that triggers the
rule
> >>>>> to fire.
> >>>>>
> >>>>> There are some special cases. If the data is sorted as well as
> >>>>> partitioned (say because the local aggregate uses a sort-based
> >>>>> algorithm) we could maybe use a more efficient plan. And if the
> >>>>> partition key is the same as the aggregation key we don't need a
> >>>>> summarizing Aggregate, just a Union.
> >>>>>
> >>>>> It turns out not to be very Phoenix-specific. In the Drill-on-Phoenix
> >>>>> scenario, once the Aggregate has been pushed through the Exchange
> >>>>> (i.e. onto the drill-bit residing on the region server) we can then
> >>>>> push the DrillAggregate across the drill-to-phoenix membrane and
make
> >>>>> it into a PhoenixServerAggregate that executes in the region server.
> >>>>>
> >>>>> Related issues:
> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
> >>>>>
> >>>>> Julian
> >>>>>
> >>>>
> >>>>
> >>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message