drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Thu, 22 Oct 2015 00:22:37 GMT
Changes look great, Maryann. Would you mind pulling those in, Jacques?

Thanks,
James

On Mon, Oct 19, 2015 at 11:50 AM, Maryann Xue <maryann.xue@gmail.com> wrote:

> Made another two check-ins to https://github.com/jacques-n/drill/pull/5,
> first one for the changes James had suggested. The second check-in included
> some test cases that failed to use Phoenix partial aggregate because of
> https://issues.apache.org/jira/browse/CALCITE-926.
>
> I also reproduced the problem with Phoenix+Calcite, but will make a new
> patch for CALCITE-926 to add some standalone test cases for Calcite.
>
>
> Thanks,
> Maryann
>
> On Fri, Oct 9, 2015 at 1:30 PM, James Taylor <jamestaylor@apache.org>
> wrote:
>
>> Thanks for the updates to the patch, Maryann. It's looking very good -
>> this will perform better I believe. I made a few comments on the pull
>> request.
>>
>> FYI, I filed PHOENIX-2316 to add the missing information (namely the
>> region server that the parallelized scan will go to) so that I can improve
>> the assignment logic.
>>
>>      James
>>
>> On Wed, Oct 7, 2015 at 1:11 PM, Maryann Xue <maryann.xue@gmail.com>
>> wrote:
>>
>>> Made another checkin for the pull request. All good now.
>>>
>>> In order to compile and run, be sure to update the Phoenix project under
>>> Julian's branch.
>>>
>>>
>>> Thanks,
>>> Maryann
>>>
>>> On Wed, Oct 7, 2015 at 12:19 PM, Jacques Nadeau <jacques@dremio.com>
>>> wrote:
>>>
>>>> I just filed a jira for the merge issue:
>>>>
>>>> https://issues.apache.org/jira/browse/DRILL-3907
>>>>
>>>> --
>>>> Jacques Nadeau
>>>> CTO and Co-Founder, Dremio
>>>>
>>>> On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacques@dremio.com>
>>>> wrote:
>>>>
>>>>> Drill doesn't currently have a merge-sort operation available outside
>>>>> the context of an exchange. See here:
>>>>>
>>>>>
>>>>> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>>>>>
>>>>> We'll need to do a bit of refactoring to provide this functionality
>>>>> outside the context of an exchange. The one other thing we'll have to
think
>>>>> about in this context is how do we avoid doing a n-way merge in the case
>>>>> that the we're not using the collation.
>>>>>
>>>>> --
>>>>> Jacques Nadeau
>>>>> CTO and Co-Founder, Dremio
>>>>>
>>>>> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann.xue@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One thing from what I asked James offline yesterday, and maybe we
can
>>>>>> discuss a little bit in today's meeting:
>>>>>>
>>>>>> Phoenix uses a list of lists of Scan objects to indicate Region
>>>>>> boundaries and guideposts, and if the top-level list contains more
than one
>>>>>> element it means that the results from different Scanner/ResultIterator
>>>>>> should be merge-sorted. We now use this list in Drill integration
to
>>>>>> generate different batches or slices. I see from the Drill plan of
a simple
>>>>>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on
top of
>>>>>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>>>>>> So optimally,
>>>>>> 1) this should be a merge-sort (to be more accurate, a merge)
>>>>>> 2) furthermore, if Drill has something to indicate the order among
>>>>>> slices and batches, we could even turn it into a concat.
>>>>>>
>>>>>> The structure of this Scan list might be helpful for 2), or we may
>>>>>> have some Logical representation for this. Otherwise, we can simply
flatten
>>>>>> this list to a one-dimensional list as we do now (in my ci yesterday).
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Maryann
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, but the partially aggregated results will not contain any
>>>>>>> duplicate rowkeys, since they are also group-by keys. What we
need is the
>>>>>>> aggregators and call aggregate for each row. We can write a new
simpler
>>>>>>> ResultIterator to replace this, but for now it should work correctly.
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> The results we get back from the server-side scan are already
the
>>>>>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>>>>>> will collapse adjacent Tuples together which happen to have
the same row
>>>>>>>> key. I'm not sure we want/need this to happen. Instead I
think we just need
>>>>>>>> to decode the aggregated values directly from the result
of the scan.
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> bq. A few questions for you: not sure I understand the
changes you
>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap
the server-side scan
>>>>>>>>> results in a GroupedAggregatingResultIterator? Each server-side
scan will
>>>>>>>>> produce results with a single tuple per group by key.
In Phoenix, the
>>>>>>>>> GroupedAggregatingResultIterator's function in life is
to do the final
>>>>>>>>> merge. Note too that the results aren't sorted that come
back from the
>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator
needs tuples sorted
>>>>>>>>> by the group by key). Or is this just to help in decoding
the values coming
>>>>>>>>> back from the scan?
>>>>>>>>>
>>>>>>>>> It is necessary. I suppose what we should return as a
partial
>>>>>>>>> result from PhoenixRecordReader is exactly the same as
what we do in
>>>>>>>>> standalone Phoenix+Calcite, except that the result is
partial or say
>>>>>>>>> incomplete. For example, we have "select a, count(*)
from t group by a", we
>>>>>>>>> should return rows that have "a" as the first expression
value, and
>>>>>>>>> "count(*)" as the second expression value. For this "count"
expression, it
>>>>>>>>> actually needs a ClientAggregator for evaluation, and
that's what this
>>>>>>>>> GroupedAggregatingResultIterator is used for.
>>>>>>>>> Since "each server-side scan will produce results with
a single
>>>>>>>>> tuple per group by key", and PhoenixRecordReader is only
dealing with one
>>>>>>>>> server-side result each time, we don't care how the group-by
keys are
>>>>>>>>> arranged (ordered or unordered"). Actually
>>>>>>>>> GroupedAggregatingResultIterator is not the group-by
iterator we
>>>>>>>>> use for AggregatePlan. It does not "combine". It treats
every row as a
>>>>>>>>> different group, by returning its rowkey as the group-by
key (
>>>>>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>>>>>
>>>>>>>>> In short, this iterator is for decoding the server-side
values. So
>>>>>>>>> we may want to optimize this logic by removing this serialization
and
>>>>>>>>> deserialization and having only one set of aggregators
in future.
>>>>>>>>>
>>>>>>>>> bq. Also, not sure what impact it has in the way we "combine"
the
>>>>>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>>>>>> as each of our scans could include duplicate group by
keys. Is it ok to
>>>>>>>>> combine them in this case?
>>>>>>>>>
>>>>>>>>> It should not matter, or at least is not related to the
problem
>>>>>>>>> I'm now having.
>>>>>>>>>
>>>>>>>>> bq. One more question: how is the group by key communicated
back
>>>>>>>>> to Drill?
>>>>>>>>>
>>>>>>>>> According to the HashAggPrule, if it decides to create
a two-phase
>>>>>>>>> aggregate, the first phase is now handled by Phoenix
(after applying the
>>>>>>>>> PhoenixHashAggPrule). I assume then the partial results
gets shuffled based
>>>>>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader).
The
>>>>>>>>> final step is the Drill hash aggregation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is my test table "A.BEER", which has for columns:
"B", "E1",
>>>>>>>>> "E2", "R", all of INTEGER types. And the data is generated
like this:
>>>>>>>>> for (x=1 to N) { //currently N=1000
>>>>>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> The group-by query for testing is "SELECT e1, count(*)
FROM a.beer
>>>>>>>>> GROUP BY e1".
>>>>>>>>> The expected result should be:
>>>>>>>>> 0 100
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>> 6 100
>>>>>>>>> 7 100
>>>>>>>>> 8 100
>>>>>>>>> 9 100
>>>>>>>>> The actual result was:
>>>>>>>>> 6 0
>>>>>>>>> 7 0
>>>>>>>>> 8 0
>>>>>>>>> 9 0
>>>>>>>>> 0 0
>>>>>>>>> 1 100
>>>>>>>>> 2 100
>>>>>>>>> 3 100
>>>>>>>>> 4 100
>>>>>>>>> 5 100
>>>>>>>>>
>>>>>>>>> Here I just tried another one "SELECT e2, count(*) FROM
a.beer
>>>>>>>>> GROUP BY e2".
>>>>>>>>> Similarly, the expected result should have group-by keys
from 0 to
>>>>>>>>> 99, each having a value of 10 as the count, while the
actual result was:
>>>>>>>>> from group-by key 86 to 99, together with 0, their count
values
>>>>>>>>> were all 0; the rest (1 to 85) all had the correct value
10.
>>>>>>>>>
>>>>>>>>> Looks to me that the scans were good but there was a
problem with
>>>>>>>>> one of the hash buckets.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Maryann
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <
>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Nice progress, Maryann.
>>>>>>>>>>
>>>>>>>>>> A few questions for you: not sure I understand the
changes you
>>>>>>>>>> made to PhoenixRecordReader. Is it necessary to wrap
the server-side scan
>>>>>>>>>> results in a GroupedAggregatingResultIterator? Each
server-side scan will
>>>>>>>>>> produce results with a single tuple per group by
key. In Phoenix, the
>>>>>>>>>> GroupedAggregatingResultIterator's function in life
is to do the final
>>>>>>>>>> merge. Note too that the results aren't sorted that
come back from the
>>>>>>>>>> aggregated scan (while GroupedAggregatingResultIterator
needs tuples sorted
>>>>>>>>>> by the group by key). Or is this just to help in
decoding the values coming
>>>>>>>>>> back from the scan?
>>>>>>>>>>
>>>>>>>>>> Also, not sure what impact it has in the way we "combine"
the
>>>>>>>>>> scans in our Drill parallelization code
>>>>>>>>>> (PhoenixGroupScan.applyAssignments()), as each of
our scans could include
>>>>>>>>>> duplicate group by keys. Is it ok to combine them
in this case?
>>>>>>>>>>
>>>>>>>>>> One more question: how is the group by key communicated
back to
>>>>>>>>>> Drill?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> James
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <
>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Added a few fixes in the pull request. Tested
with two regions,
>>>>>>>>>>> turned out that half of the result is empty (count
= 0).
>>>>>>>>>>> Not sure if there's anything wrong with
>>>>>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>>>>>> .
>>>>>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>>>>>
>>>>>>>>>>> To force a 2-phase HashAgg, I made a temporary
change as well:
>>>>>>>>>>>
>>>>>>>>>>> diff --git
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> index b911f6b..58bc918 100644
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> +++
>>>>>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>
>>>>>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase
extends
>>>>>>>>>>> Prule {
>>>>>>>>>>>
>>>>>>>>>>>    // If any of the aggregate functions are not
one of these,
>>>>>>>>>>> then we
>>>>>>>>>>>
>>>>>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>>>>>
>>>>>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall
call,
>>>>>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>>>>>
>>>>>>>>>>> -    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> -    if (! settings.isMultiPhaseAggEnabled()
||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> -      return false;
>>>>>>>>>>>
>>>>>>>>>>> -    }
>>>>>>>>>>>
>>>>>>>>>>> +//    PlannerSettings settings =
>>>>>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>>>>>
>>>>>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>>>>>
>>>>>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>>>>>> settings.getSliceTarget();
>>>>>>>>>>>
>>>>>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled()
||
>>>>>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>>>>>
>>>>>>>>>>> +//      return false;
>>>>>>>>>>>
>>>>>>>>>>> +//    }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList())
{
>>>>>>>>>>>
>>>>>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Maryann
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Drill's current approach seems adequate for
Drill alone but
>>>>>>>>>>>> extending
>>>>>>>>>>>> it to a heterogenous system that includes
Phoenix seems like a
>>>>>>>>>>>> hack.
>>>>>>>>>>>>
>>>>>>>>>>>> I think you should only create Prels for
algebra nodes that you
>>>>>>>>>>>> know
>>>>>>>>>>>> for sure are going to run on the Drill engine.
If there's a
>>>>>>>>>>>> possibility that it would run in another
engine such as Phoenix
>>>>>>>>>>>> then
>>>>>>>>>>>> they should still be logical.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann
Xue <
>>>>>>>>>>>> maryann.xue@gmail.com> wrote:
>>>>>>>>>>>> > The partial aggregate seems to be working
now, with one
>>>>>>>>>>>> interface extension
>>>>>>>>>>>> > and one bug fix in the Phoenix project.
Will do some code
>>>>>>>>>>>> cleanup and
>>>>>>>>>>>> > create a pull request soon.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Still there was a hack in the Drill
project which I made to
>>>>>>>>>>>> force 2-phase
>>>>>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Jacques, I have one question though,
how can I verify that
>>>>>>>>>>>> there are more
>>>>>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>> > Maryann
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James
Taylor <
>>>>>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >> Maryann,
>>>>>>>>>>>> >> I believe Jacques mentioned that
a little bit of refactoring
>>>>>>>>>>>> is required
>>>>>>>>>>>> >> for a merge sort to occur - there's
something that does
>>>>>>>>>>>> that, but it's not
>>>>>>>>>>>> >> expected to be used in this context
currently.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> IMHO, there's more of a clear value
in getting the
>>>>>>>>>>>> aggregation to use
>>>>>>>>>>>> >> Phoenix first, so I'd recommend
going down that road as
>>>>>>>>>>>> Jacques mentioned
>>>>>>>>>>>> >> above if possible. Once that's working,
we can circle back
>>>>>>>>>>>> to the partial
>>>>>>>>>>>> >> sort.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Thoughts?
>>>>>>>>>>>> >> James
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM,
Maryann Xue <
>>>>>>>>>>>> maryann.xue@gmail.com>
>>>>>>>>>>>> >> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>> I actually tried implementing
partial sort with
>>>>>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4,
which I figured
>>>>>>>>>>>> might be a
>>>>>>>>>>>> >>> little easier to start with
than partial aggregation. But I
>>>>>>>>>>>> found that even
>>>>>>>>>>>> >>> though the code worked (returned
the right results), the
>>>>>>>>>>>> Drill side sort
>>>>>>>>>>>> >>> turned out to be a ordinary
sort instead of a merge which
>>>>>>>>>>>> it should have
>>>>>>>>>>>> >>> been. Any idea of how to fix
that?
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> Thanks,
>>>>>>>>>>>> >>> Maryann
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52
PM, Jacques Nadeau <
>>>>>>>>>>>> jacques@dremio.com>
>>>>>>>>>>>> >>> wrote:
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>>> Right now this type of work
is done here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> With Distribution Trait
application here:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> To me, the easiest way to
solve the Phoenix issue is by
>>>>>>>>>>>> providing a rule
>>>>>>>>>>>> >>>> that matches HashAgg and
StreamAgg but requires Phoenix
>>>>>>>>>>>> convention as
>>>>>>>>>>>> >>>> input. It would replace
everywhere but would only be
>>>>>>>>>>>> plannable when it is
>>>>>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> Thoughts?
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> --
>>>>>>>>>>>> >>>> Jacques Nadeau
>>>>>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30
PM, Julian Hyde <
>>>>>>>>>>>> jhyde@apache.org> wrote:
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>> Phoenix is able to perform
quite a few relational
>>>>>>>>>>>> operations on the
>>>>>>>>>>>> >>>>> region server: scan,
filter, project, aggregate, sort
>>>>>>>>>>>> (optionally with
>>>>>>>>>>>> >>>>> limit). However, the
sort and aggregate are necessarily
>>>>>>>>>>>> "local". They
>>>>>>>>>>>> >>>>> can only deal with data
on that region server, and there
>>>>>>>>>>>> needs to be a
>>>>>>>>>>>> >>>>> further operation to
combine the results from the region
>>>>>>>>>>>> servers.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The question is how
to plan such queries. I think the
>>>>>>>>>>>> answer is an
>>>>>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> The rule would spot
an Aggregate on a data source that is
>>>>>>>>>>>> split into
>>>>>>>>>>>> >>>>> multiple locations (partitions)
and split it into a
>>>>>>>>>>>> partial Aggregate
>>>>>>>>>>>> >>>>> that computes sub-totals
and a summarizing Aggregate that
>>>>>>>>>>>> combines
>>>>>>>>>>>> >>>>> those totals.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> How does the planner
know that the Aggregate needs to be
>>>>>>>>>>>> split? Since
>>>>>>>>>>>> >>>>> the data's distribution
has changed, there would need to
>>>>>>>>>>>> be an
>>>>>>>>>>>> >>>>> Exchange operator. It
is the Exchange operator that
>>>>>>>>>>>> triggers the rule
>>>>>>>>>>>> >>>>> to fire.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> There are some special
cases. If the data is sorted as
>>>>>>>>>>>> well as
>>>>>>>>>>>> >>>>> partitioned (say because
the local aggregate uses a
>>>>>>>>>>>> sort-based
>>>>>>>>>>>> >>>>> algorithm) we could
maybe use a more efficient plan. And
>>>>>>>>>>>> if the
>>>>>>>>>>>> >>>>> partition key is the
same as the aggregation key we don't
>>>>>>>>>>>> need a
>>>>>>>>>>>> >>>>> summarizing Aggregate,
just a Union.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> It turns out not to
be very Phoenix-specific. In the
>>>>>>>>>>>> Drill-on-Phoenix
>>>>>>>>>>>> >>>>> scenario, once the Aggregate
has been pushed through the
>>>>>>>>>>>> Exchange
>>>>>>>>>>>> >>>>> (i.e. onto the drill-bit
residing on the region server)
>>>>>>>>>>>> we can then
>>>>>>>>>>>> >>>>> push the DrillAggregate
across the drill-to-phoenix
>>>>>>>>>>>> membrane and make
>>>>>>>>>>>> >>>>> it into a PhoenixServerAggregate
that executes in the
>>>>>>>>>>>> region server.
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Related issues:
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>> Julian
>>>>>>>>>>>> >>>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>>
>>>>>>>>>>>> >>>
>>>>>>>>>>>> >>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message