drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maryann Xue <maryann....@gmail.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Wed, 07 Oct 2015 01:52:54 GMT
Yes, but the partially aggregated results will not contain any duplicate
rowkeys, since they are also group-by keys. What we need is the aggregators
and call aggregate for each row. We can write a new simpler ResultIterator
to replace this, but for now it should work correctly.

On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org> wrote:

> The results we get back from the server-side scan are already the partial
> aggregated values we need. GroupedAggregatingResultIterator will collapse
> adjacent Tuples together which happen to have the same row key. I'm not
> sure we want/need this to happen. Instead I think we just need to decode
> the aggregated values directly from the result of the scan.
>
> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann.xue@gmail.com> wrote:
>
>> Hi James,
>>
>> bq. A few questions for you: not sure I understand the changes you made
>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>> results in a GroupedAggregatingResultIterator? Each server-side scan will
>> produce results with a single tuple per group by key. In Phoenix, the
>> GroupedAggregatingResultIterator's function in life is to do the final
>> merge. Note too that the results aren't sorted that come back from the
>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>> by the group by key). Or is this just to help in decoding the values coming
>> back from the scan?
>>
>> It is necessary. I suppose what we should return as a partial result from
>> PhoenixRecordReader is exactly the same as what we do in standalone
>> Phoenix+Calcite, except that the result is partial or say incomplete. For
>> example, we have "select a, count(*) from t group by a", we should return
>> rows that have "a" as the first expression value, and "count(*)" as the
>> second expression value. For this "count" expression, it actually needs a
>> ClientAggregator for evaluation, and that's what this
>> GroupedAggregatingResultIterator is used for.
>> Since "each server-side scan will produce results with a single tuple
>> per group by key", and PhoenixRecordReader is only dealing with one
>> server-side result each time, we don't care how the group-by keys are
>> arranged (ordered or unordered"). Actually GroupedAggregatingResultIterator
>> is not the group-by iterator we use for AggregatePlan. It does not
>> "combine". It treats every row as a different group, by returning its
>> rowkey as the group-by key (GroupedAggregatingResultIterator.java:56).
>>
>> In short, this iterator is for decoding the server-side values. So we may
>> want to optimize this logic by removing this serialization and
>> deserialization and having only one set of aggregators in future.
>>
>> bq. Also, not sure what impact it has in the way we "combine" the scans
>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>> as each of our scans could include duplicate group by keys. Is it ok to
>> combine them in this case?
>>
>> It should not matter, or at least is not related to the problem I'm now
>> having.
>>
>> bq. One more question: how is the group by key communicated back to Drill?
>>
>> According to the HashAggPrule, if it decides to create a two-phase
>> aggregate, the first phase is now handled by Phoenix (after applying the
>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based
>> on the hash of their group-by keys (returned by PhoenixRecordReader). The
>> final step is the Drill hash aggregation.
>>
>>
>> This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
>> "R", all of INTEGER types. And the data is generated like this:
>> for (x=1 to N) { //currently N=1000
>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>> }
>>
>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP
>> BY e1".
>> The expected result should be:
>> 0 100
>> 1 100
>> 2 100
>> 3 100
>> 4 100
>> 5 100
>> 6 100
>> 7 100
>> 8 100
>> 9 100
>> The actual result was:
>> 6 0
>> 7 0
>> 8 0
>> 9 0
>> 0 0
>> 1 100
>> 2 100
>> 3 100
>> 4 100
>> 5 100
>>
>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY
>> e2".
>> Similarly, the expected result should have group-by keys from 0 to 99,
>> each having a value of 10 as the count, while the actual result was:
>> from group-by key 86 to 99, together with 0, their count values were all
>> 0; the rest (1 to 85) all had the correct value 10.
>>
>> Looks to me that the scans were good but there was a problem with one of
>> the hash buckets.
>>
>> Thanks,
>> Maryann
>>
>>
>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org>
>> wrote:
>>
>>> Nice progress, Maryann.
>>>
>>> A few questions for you: not sure I understand the changes you made to
>>> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
>>> in a GroupedAggregatingResultIterator? Each server-side scan will produce
>>> results with a single tuple per group by key. In Phoenix, the
>>> GroupedAggregatingResultIterator's function in life is to do the final
>>> merge. Note too that the results aren't sorted that come back from the
>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
>>> by the group by key). Or is this just to help in decoding the values coming
>>> back from the scan?
>>>
>>> Also, not sure what impact it has in the way we "combine" the scans in
>>> our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
>>> each of our scans could include duplicate group by keys. Is it ok to
>>> combine them in this case?
>>>
>>> One more question: how is the group by key communicated back to Drill?
>>>
>>> Thanks,
>>> James
>>>
>>>
>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com>
>>> wrote:
>>>
>>>> Added a few fixes in the pull request. Tested with two regions, turned
>>>> out that half of the result is empty (count = 0).
>>>> Not sure if there's anything wrong with
>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>> .
>>>> Like Julian said, this rule looks a bit hacky.
>>>>
>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>
>>>> diff --git
>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> index b911f6b..58bc918 100644
>>>>
>>>> ---
>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> +++
>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>
>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>>>
>>>>    // If any of the aggregate functions are not one of these, then we
>>>>
>>>>    // currently won't generate a 2 phase plan.
>>>>
>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>> DrillAggregateRel aggregate) {
>>>>
>>>> -    PlannerSettings settings =
>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>
>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>
>>>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>
>>>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>>>> || smallInput) {
>>>>
>>>> -      return false;
>>>>
>>>> -    }
>>>>
>>>> +//    PlannerSettings settings =
>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>
>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>
>>>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>>>
>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>> settings.isSingleMode() || smallInput) {
>>>>
>>>> +//      return false;
>>>>
>>>> +//    }
>>>>
>>>>
>>>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>>>
>>>>        String name = aggCall.getAggregation().getName();
>>>>
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org> wrote:
>>>>
>>>>> Drill's current approach seems adequate for Drill alone but extending
>>>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>>>
>>>>> I think you should only create Prels for algebra nodes that you know
>>>>> for sure are going to run on the Drill engine. If there's a
>>>>> possibility that it would run in another engine such as Phoenix then
>>>>> they should still be logical.
>>>>>
>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann.xue@gmail.com>
>>>>> wrote:
>>>>> > The partial aggregate seems to be working now, with one interface
>>>>> extension
>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup
and
>>>>> > create a pull request soon.
>>>>> >
>>>>> > Still there was a hack in the Drill project which I made to force
>>>>> 2-phase
>>>>> > aggregation. I'll try to fix that.
>>>>> >
>>>>> > Jacques, I have one question though, how can I verify that there
are
>>>>> more
>>>>> > than one slice and the shuffle happens?
>>>>> >
>>>>> >
>>>>> > Thanks,
>>>>> > Maryann
>>>>> >
>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestaylor@apache.org>
>>>>> wrote:
>>>>> >
>>>>> >> Maryann,
>>>>> >> I believe Jacques mentioned that a little bit of refactoring
is
>>>>> required
>>>>> >> for a merge sort to occur - there's something that does that,
but
>>>>> it's not
>>>>> >> expected to be used in this context currently.
>>>>> >>
>>>>> >> IMHO, there's more of a clear value in getting the aggregation
to
>>>>> use
>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>>>> mentioned
>>>>> >> above if possible. Once that's working, we can circle back to
the
>>>>> partial
>>>>> >> sort.
>>>>> >>
>>>>> >> Thoughts?
>>>>> >> James
>>>>> >>
>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann.xue@gmail.com
>>>>> >
>>>>> >> wrote:
>>>>> >>
>>>>> >>> I actually tried implementing partial sort with
>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured
might
>>>>> be a
>>>>> >>> little easier to start with than partial aggregation. But
I found
>>>>> that even
>>>>> >>> though the code worked (returned the right results), the
Drill
>>>>> side sort
>>>>> >>> turned out to be a ordinary sort instead of a merge which
it
>>>>> should have
>>>>> >>> been. Any idea of how to fix that?
>>>>> >>>
>>>>> >>>
>>>>> >>> Thanks,
>>>>> >>> Maryann
>>>>> >>>
>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <
>>>>> jacques@dremio.com>
>>>>> >>> wrote:
>>>>> >>>
>>>>> >>>> Right now this type of work is done here:
>>>>> >>>>
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>> >>>>
>>>>> >>>> With Distribution Trait application here:
>>>>> >>>>
>>>>> >>>>
>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>> >>>>
>>>>> >>>> To me, the easiest way to solve the Phoenix issue is
by providing
>>>>> a rule
>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix
>>>>> convention as
>>>>> >>>> input. It would replace everywhere but would only be
plannable
>>>>> when it is
>>>>> >>>> the first phase of aggregation.
>>>>> >>>>
>>>>> >>>> Thoughts?
>>>>> >>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> --
>>>>> >>>> Jacques Nadeau
>>>>> >>>> CTO and Co-Founder, Dremio
>>>>> >>>>
>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jhyde@apache.org>
>>>>> wrote:
>>>>> >>>>
>>>>> >>>>> Phoenix is able to perform quite a few relational
operations on
>>>>> the
>>>>> >>>>> region server: scan, filter, project, aggregate,
sort
>>>>> (optionally with
>>>>> >>>>> limit). However, the sort and aggregate are necessarily
"local".
>>>>> They
>>>>> >>>>> can only deal with data on that region server, and
there needs
>>>>> to be a
>>>>> >>>>> further operation to combine the results from the
region servers.
>>>>> >>>>>
>>>>> >>>>> The question is how to plan such queries. I think
the answer is
>>>>> an
>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>> >>>>>
>>>>> >>>>> The rule would spot an Aggregate on a data source
that is split
>>>>> into
>>>>> >>>>> multiple locations (partitions) and split it into
a partial
>>>>> Aggregate
>>>>> >>>>> that computes sub-totals and a summarizing Aggregate
that
>>>>> combines
>>>>> >>>>> those totals.
>>>>> >>>>>
>>>>> >>>>> How does the planner know that the Aggregate needs
to be split?
>>>>> Since
>>>>> >>>>> the data's distribution has changed, there would
need to be an
>>>>> >>>>> Exchange operator. It is the Exchange operator that
triggers the
>>>>> rule
>>>>> >>>>> to fire.
>>>>> >>>>>
>>>>> >>>>> There are some special cases. If the data is sorted
as well as
>>>>> >>>>> partitioned (say because the local aggregate uses
a sort-based
>>>>> >>>>> algorithm) we could maybe use a more efficient plan.
And if the
>>>>> >>>>> partition key is the same as the aggregation key
we don't need a
>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>> >>>>>
>>>>> >>>>> It turns out not to be very Phoenix-specific. In
the
>>>>> Drill-on-Phoenix
>>>>> >>>>> scenario, once the Aggregate has been pushed through
the Exchange
>>>>> >>>>> (i.e. onto the drill-bit residing on the region
server) we can
>>>>> then
>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
membrane and
>>>>> make
>>>>> >>>>> it into a PhoenixServerAggregate that executes in
the region
>>>>> server.
>>>>> >>>>>
>>>>> >>>>> Related issues:
>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>> >>>>>
>>>>> >>>>> Julian
>>>>> >>>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message