drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maryann Xue <maryann....@gmail.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Wed, 07 Oct 2015 01:07:39 GMT
Hi James,

bq. A few questions for you: not sure I understand the changes you made to
PhoenixRecordReader. Is it necessary to wrap the server-side scan results
in a GroupedAggregatingResultIterator? Each server-side scan will produce
results with a single tuple per group by key. In Phoenix, the
GroupedAggregatingResultIterator's function in life is to do the final
merge. Note too that the results aren't sorted that come back from the
aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
by the group by key). Or is this just to help in decoding the values coming
back from the scan?

It is necessary. I suppose what we should return as a partial result from
PhoenixRecordReader is exactly the same as what we do in standalone
Phoenix+Calcite, except that the result is partial or say incomplete. For
example, we have "select a, count(*) from t group by a", we should return
rows that have "a" as the first expression value, and "count(*)" as the
second expression value. For this "count" expression, it actually needs a
ClientAggregator for evaluation, and that's what this
GroupedAggregatingResultIterator is used for.
Since "each server-side scan will produce results with a single tuple per
group by key", and PhoenixRecordReader is only dealing with one server-side
result each time, we don't care how the group-by keys are arranged (ordered
or unordered"). Actually GroupedAggregatingResultIterator is not the
group-by iterator we use for AggregatePlan. It does not "combine". It
treats every row as a different group, by returning its rowkey as the
group-by key (GroupedAggregatingResultIterator.java:56).

In short, this iterator is for decoding the server-side values. So we may
want to optimize this logic by removing this serialization and
deserialization and having only one set of aggregators in future.

bq. Also, not sure what impact it has in the way we "combine" the scans in
our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as
each of our scans could include duplicate group by keys. Is it ok to
combine them in this case?

It should not matter, or at least is not related to the problem I'm now
having.

bq. One more question: how is the group by key communicated back to Drill?

According to the HashAggPrule, if it decides to create a two-phase
aggregate, the first phase is now handled by Phoenix (after applying the
PhoenixHashAggPrule). I assume then the partial results gets shuffled based
on the hash of their group-by keys (returned by PhoenixRecordReader). The
final step is the Drill hash aggregation.


This is my test table "A.BEER", which has for columns: "B", "E1", "E2",
"R", all of INTEGER types. And the data is generated like this:
for (x=1 to N) { //currently N=1000
 UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
}

The group-by query for testing is "SELECT e1, count(*) FROM a.beer GROUP BY
e1".
The expected result should be:
0 100
1 100
2 100
3 100
4 100
5 100
6 100
7 100
8 100
9 100
The actual result was:
6 0
7 0
8 0
9 0
0 0
1 100
2 100
3 100
4 100
5 100

Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP BY e2".
Similarly, the expected result should have group-by keys from 0 to 99, each
having a value of 10 as the count, while the actual result was:
from group-by key 86 to 99, together with 0, their count values were all 0;
the rest (1 to 85) all had the correct value 10.

Looks to me that the scans were good but there was a problem with one of
the hash buckets.

Thanks,
Maryann


On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org> wrote:

> Nice progress, Maryann.
>
> A few questions for you: not sure I understand the changes you made to
> PhoenixRecordReader. Is it necessary to wrap the server-side scan results
> in a GroupedAggregatingResultIterator? Each server-side scan will produce
> results with a single tuple per group by key. In Phoenix, the
> GroupedAggregatingResultIterator's function in life is to do the final
> merge. Note too that the results aren't sorted that come back from the
> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted
> by the group by key). Or is this just to help in decoding the values coming
> back from the scan?
>
> Also, not sure what impact it has in the way we "combine" the scans in our
> Drill parallelization code (PhoenixGroupScan.applyAssignments()), as each
> of our scans could include duplicate group by keys. Is it ok to combine
> them in this case?
>
> One more question: how is the group by key communicated back to Drill?
>
> Thanks,
> James
>
>
> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com> wrote:
>
>> Added a few fixes in the pull request. Tested with two regions, turned
>> out that half of the result is empty (count = 0).
>> Not sure if there's anything wrong with
>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>> .
>> Like Julian said, this rule looks a bit hacky.
>>
>> To force a 2-phase HashAgg, I made a temporary change as well:
>>
>> diff --git
>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> index b911f6b..58bc918 100644
>>
>> ---
>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> +++
>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>
>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends Prule {
>>
>>    // If any of the aggregate functions are not one of these, then we
>>
>>    // currently won't generate a 2 phase plan.
>>
>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>> DrillAggregateRel aggregate) {
>>
>> -    PlannerSettings settings =
>> PrelUtil.getPlannerSettings(call.getPlanner());
>>
>> -    RelNode child = call.rel(0).getInputs().get(0);
>>
>> -    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>
>> -    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>> || smallInput) {
>>
>> -      return false;
>>
>> -    }
>>
>> +//    PlannerSettings settings =
>> PrelUtil.getPlannerSettings(call.getPlanner());
>>
>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>
>> +//    boolean smallInput = child.getRows() < settings.getSliceTarget();
>>
>> +//    if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode()
>> || smallInput) {
>>
>> +//      return false;
>>
>> +//    }
>>
>>
>>      for (AggregateCall aggCall : aggregate.getAggCallList()) {
>>
>>        String name = aggCall.getAggregation().getName();
>>
>>
>> Thanks,
>> Maryann
>>
>>
>>
>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org> wrote:
>>
>>> Drill's current approach seems adequate for Drill alone but extending
>>> it to a heterogenous system that includes Phoenix seems like a hack.
>>>
>>> I think you should only create Prels for algebra nodes that you know
>>> for sure are going to run on the Drill engine. If there's a
>>> possibility that it would run in another engine such as Phoenix then
>>> they should still be logical.
>>>
>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann.xue@gmail.com>
>>> wrote:
>>> > The partial aggregate seems to be working now, with one interface
>>> extension
>>> > and one bug fix in the Phoenix project. Will do some code cleanup and
>>> > create a pull request soon.
>>> >
>>> > Still there was a hack in the Drill project which I made to force
>>> 2-phase
>>> > aggregation. I'll try to fix that.
>>> >
>>> > Jacques, I have one question though, how can I verify that there are
>>> more
>>> > than one slice and the shuffle happens?
>>> >
>>> >
>>> > Thanks,
>>> > Maryann
>>> >
>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <jamestaylor@apache.org>
>>> wrote:
>>> >
>>> >> Maryann,
>>> >> I believe Jacques mentioned that a little bit of refactoring is
>>> required
>>> >> for a merge sort to occur - there's something that does that, but
>>> it's not
>>> >> expected to be used in this context currently.
>>> >>
>>> >> IMHO, there's more of a clear value in getting the aggregation to use
>>> >> Phoenix first, so I'd recommend going down that road as Jacques
>>> mentioned
>>> >> above if possible. Once that's working, we can circle back to the
>>> partial
>>> >> sort.
>>> >>
>>> >> Thoughts?
>>> >> James
>>> >>
>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <maryann.xue@gmail.com>
>>> >> wrote:
>>> >>
>>> >>> I actually tried implementing partial sort with
>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured might
be
>>> a
>>> >>> little easier to start with than partial aggregation. But I found
>>> that even
>>> >>> though the code worked (returned the right results), the Drill side
>>> sort
>>> >>> turned out to be a ordinary sort instead of a merge which it should
>>> have
>>> >>> been. Any idea of how to fix that?
>>> >>>
>>> >>>
>>> >>> Thanks,
>>> >>> Maryann
>>> >>>
>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau <jacques@dremio.com>
>>> >>> wrote:
>>> >>>
>>> >>>> Right now this type of work is done here:
>>> >>>>
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>> >>>>
>>> >>>> With Distribution Trait application here:
>>> >>>>
>>> >>>>
>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>> >>>>
>>> >>>> To me, the easiest way to solve the Phoenix issue is by providing
a
>>> rule
>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix convention
>>> as
>>> >>>> input. It would replace everywhere but would only be plannable
when
>>> it is
>>> >>>> the first phase of aggregation.
>>> >>>>
>>> >>>> Thoughts?
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> --
>>> >>>> Jacques Nadeau
>>> >>>> CTO and Co-Founder, Dremio
>>> >>>>
>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde <jhyde@apache.org>
>>> wrote:
>>> >>>>
>>> >>>>> Phoenix is able to perform quite a few relational operations
on the
>>> >>>>> region server: scan, filter, project, aggregate, sort (optionally
>>> with
>>> >>>>> limit). However, the sort and aggregate are necessarily
"local".
>>> They
>>> >>>>> can only deal with data on that region server, and there
needs to
>>> be a
>>> >>>>> further operation to combine the results from the region
servers.
>>> >>>>>
>>> >>>>> The question is how to plan such queries. I think the answer
is an
>>> >>>>> AggregateExchangeTransposeRule.
>>> >>>>>
>>> >>>>> The rule would spot an Aggregate on a data source that is
split
>>> into
>>> >>>>> multiple locations (partitions) and split it into a partial
>>> Aggregate
>>> >>>>> that computes sub-totals and a summarizing Aggregate that
combines
>>> >>>>> those totals.
>>> >>>>>
>>> >>>>> How does the planner know that the Aggregate needs to be
split?
>>> Since
>>> >>>>> the data's distribution has changed, there would need to
be an
>>> >>>>> Exchange operator. It is the Exchange operator that triggers
the
>>> rule
>>> >>>>> to fire.
>>> >>>>>
>>> >>>>> There are some special cases. If the data is sorted as well
as
>>> >>>>> partitioned (say because the local aggregate uses a sort-based
>>> >>>>> algorithm) we could maybe use a more efficient plan. And
if the
>>> >>>>> partition key is the same as the aggregation key we don't
need a
>>> >>>>> summarizing Aggregate, just a Union.
>>> >>>>>
>>> >>>>> It turns out not to be very Phoenix-specific. In the
>>> Drill-on-Phoenix
>>> >>>>> scenario, once the Aggregate has been pushed through the
Exchange
>>> >>>>> (i.e. onto the drill-bit residing on the region server)
we can then
>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane
and
>>> make
>>> >>>>> it into a PhoenixServerAggregate that executes in the region
>>> server.
>>> >>>>>
>>> >>>>> Related issues:
>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>> >>>>>
>>> >>>>> Julian
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message