drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacques Nadeau <jacq...@dremio.com>
Subject Re: Partial aggregation in Drill-on-Phoenix
Date Wed, 07 Oct 2015 16:19:15 GMT
I just filed a jira for the merge issue:

https://issues.apache.org/jira/browse/DRILL-3907

--
Jacques Nadeau
CTO and Co-Founder, Dremio

On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau <jacques@dremio.com> wrote:

> Drill doesn't currently have a merge-sort operation available outside the
> context of an exchange. See here:
>
>
> https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver
>
> We'll need to do a bit of refactoring to provide this functionality
> outside the context of an exchange. The one other thing we'll have to think
> about in this context is how do we avoid doing a n-way merge in the case
> that the we're not using the collation.
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue <maryann.xue@gmail.com> wrote:
>
>> One thing from what I asked James offline yesterday, and maybe we can
>> discuss a little bit in today's meeting:
>>
>> Phoenix uses a list of lists of Scan objects to indicate Region
>> boundaries and guideposts, and if the top-level list contains more than one
>> element it means that the results from different Scanner/ResultIterator
>> should be merge-sorted. We now use this list in Drill integration to
>> generate different batches or slices. I see from the Drill plan of a simple
>> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of
>> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort.
>> So optimally,
>> 1) this should be a merge-sort (to be more accurate, a merge)
>> 2) furthermore, if Drill has something to indicate the order among slices
>> and batches, we could even turn it into a concat.
>>
>> The structure of this Scan list might be helpful for 2), or we may have
>> some Logical representation for this. Otherwise, we can simply flatten this
>> list to a one-dimensional list as we do now (in my ci yesterday).
>>
>>
>>
>> Thanks,
>> Maryann
>>
>> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue <maryann.xue@gmail.com>
>> wrote:
>>
>>> Yes, but the partially aggregated results will not contain any duplicate
>>> rowkeys, since they are also group-by keys. What we need is the aggregators
>>> and call aggregate for each row. We can write a new simpler ResultIterator
>>> to replace this, but for now it should work correctly.
>>>
>>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor <jamestaylor@apache.org>
>>> wrote:
>>>
>>>> The results we get back from the server-side scan are already the
>>>> partial aggregated values we need. GroupedAggregatingResultIterator
>>>> will collapse adjacent Tuples together which happen to have the same row
>>>> key. I'm not sure we want/need this to happen. Instead I think we just need
>>>> to decode the aggregated values directly from the result of the scan.
>>>>
>>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue <maryann.xue@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi James,
>>>>>
>>>>> bq. A few questions for you: not sure I understand the changes you
>>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side
scan
>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan
will
>>>>> produce results with a single tuple per group by key. In Phoenix, the
>>>>> GroupedAggregatingResultIterator's function in life is to do the final
>>>>> merge. Note too that the results aren't sorted that come back from the
>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples
sorted
>>>>> by the group by key). Or is this just to help in decoding the values
coming
>>>>> back from the scan?
>>>>>
>>>>> It is necessary. I suppose what we should return as a partial result
>>>>> from PhoenixRecordReader is exactly the same as what we do in standalone
>>>>> Phoenix+Calcite, except that the result is partial or say incomplete.
For
>>>>> example, we have "select a, count(*) from t group by a", we should return
>>>>> rows that have "a" as the first expression value, and "count(*)" as the
>>>>> second expression value. For this "count" expression, it actually needs
a
>>>>> ClientAggregator for evaluation, and that's what this
>>>>> GroupedAggregatingResultIterator is used for.
>>>>> Since "each server-side scan will produce results with a single tuple
>>>>> per group by key", and PhoenixRecordReader is only dealing with one
>>>>> server-side result each time, we don't care how the group-by keys are
>>>>> arranged (ordered or unordered"). Actually
>>>>> GroupedAggregatingResultIterator is not the group-by iterator we use
>>>>> for AggregatePlan. It does not "combine". It treats every row as a
>>>>> different group, by returning its rowkey as the group-by key (
>>>>> GroupedAggregatingResultIterator.java:56).
>>>>>
>>>>> In short, this iterator is for decoding the server-side values. So we
>>>>> may want to optimize this logic by removing this serialization and
>>>>> deserialization and having only one set of aggregators in future.
>>>>>
>>>>> bq. Also, not sure what impact it has in the way we "combine" the
>>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
>>>>> as each of our scans could include duplicate group by keys. Is it ok
to
>>>>> combine them in this case?
>>>>>
>>>>> It should not matter, or at least is not related to the problem I'm
>>>>> now having.
>>>>>
>>>>> bq. One more question: how is the group by key communicated back to
>>>>> Drill?
>>>>>
>>>>> According to the HashAggPrule, if it decides to create a two-phase
>>>>> aggregate, the first phase is now handled by Phoenix (after applying
the
>>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled
based
>>>>> on the hash of their group-by keys (returned by PhoenixRecordReader).
The
>>>>> final step is the Drill hash aggregation.
>>>>>
>>>>>
>>>>> This is my test table "A.BEER", which has for columns: "B", "E1",
>>>>> "E2", "R", all of INTEGER types. And the data is generated like this:
>>>>> for (x=1 to N) { //currently N=1000
>>>>>  UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x);
>>>>> }
>>>>>
>>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer
>>>>> GROUP BY e1".
>>>>> The expected result should be:
>>>>> 0 100
>>>>> 1 100
>>>>> 2 100
>>>>> 3 100
>>>>> 4 100
>>>>> 5 100
>>>>> 6 100
>>>>> 7 100
>>>>> 8 100
>>>>> 9 100
>>>>> The actual result was:
>>>>> 6 0
>>>>> 7 0
>>>>> 8 0
>>>>> 9 0
>>>>> 0 0
>>>>> 1 100
>>>>> 2 100
>>>>> 3 100
>>>>> 4 100
>>>>> 5 100
>>>>>
>>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP
>>>>> BY e2".
>>>>> Similarly, the expected result should have group-by keys from 0 to 99,
>>>>> each having a value of 10 as the count, while the actual result was:
>>>>> from group-by key 86 to 99, together with 0, their count values were
>>>>> all 0; the rest (1 to 85) all had the correct value 10.
>>>>>
>>>>> Looks to me that the scans were good but there was a problem with one
>>>>> of the hash buckets.
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>>
>>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor <jamestaylor@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Nice progress, Maryann.
>>>>>>
>>>>>> A few questions for you: not sure I understand the changes you made
>>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan
>>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan
will
>>>>>> produce results with a single tuple per group by key. In Phoenix,
the
>>>>>> GroupedAggregatingResultIterator's function in life is to do the
final
>>>>>> merge. Note too that the results aren't sorted that come back from
the
>>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples
sorted
>>>>>> by the group by key). Or is this just to help in decoding the values
coming
>>>>>> back from the scan?
>>>>>>
>>>>>> Also, not sure what impact it has in the way we "combine" the scans
>>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()),
as
>>>>>> each of our scans could include duplicate group by keys. Is it ok
to
>>>>>> combine them in this case?
>>>>>>
>>>>>> One more question: how is the group by key communicated back to Drill?
>>>>>>
>>>>>> Thanks,
>>>>>> James
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue <maryann.xue@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Added a few fixes in the pull request. Tested with two regions,
>>>>>>> turned out that half of the result is empty (count = 0).
>>>>>>> Not sure if there's anything wrong with
>>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java
>>>>>>> .
>>>>>>> Like Julian said, this rule looks a bit hacky.
>>>>>>>
>>>>>>> To force a 2-phase HashAgg, I made a temporary change as well:
>>>>>>>
>>>>>>> diff --git
>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> index b911f6b..58bc918 100644
>>>>>>>
>>>>>>> ---
>>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> +++
>>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>
>>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends
>>>>>>> Prule {
>>>>>>>
>>>>>>>    // If any of the aggregate functions are not one of these,
then we
>>>>>>>
>>>>>>>    // currently won't generate a 2 phase plan.
>>>>>>>
>>>>>>>    protected boolean create2PhasePlan(RelOptRuleCall call,
>>>>>>> DrillAggregateRel aggregate) {
>>>>>>>
>>>>>>> -    PlannerSettings settings =
>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>
>>>>>>> -    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>
>>>>>>> -    boolean smallInput = child.getRows() <
>>>>>>> settings.getSliceTarget();
>>>>>>>
>>>>>>> -    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>
>>>>>>> -      return false;
>>>>>>>
>>>>>>> -    }
>>>>>>>
>>>>>>> +//    PlannerSettings settings =
>>>>>>> PrelUtil.getPlannerSettings(call.getPlanner());
>>>>>>>
>>>>>>> +//    RelNode child = call.rel(0).getInputs().get(0);
>>>>>>>
>>>>>>> +//    boolean smallInput = child.getRows() <
>>>>>>> settings.getSliceTarget();
>>>>>>>
>>>>>>> +//    if (! settings.isMultiPhaseAggEnabled() ||
>>>>>>> settings.isSingleMode() || smallInput) {
>>>>>>>
>>>>>>> +//      return false;
>>>>>>>
>>>>>>> +//    }
>>>>>>>
>>>>>>>
>>>>>>>      for (AggregateCall aggCall : aggregate.getAggCallList())
{
>>>>>>>
>>>>>>>        String name = aggCall.getAggregation().getName();
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Maryann
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde <jhyde@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Drill's current approach seems adequate for Drill alone but
>>>>>>>> extending
>>>>>>>> it to a heterogenous system that includes Phoenix seems like
a hack.
>>>>>>>>
>>>>>>>> I think you should only create Prels for algebra nodes that
you know
>>>>>>>> for sure are going to run on the Drill engine. If there's
a
>>>>>>>> possibility that it would run in another engine such as Phoenix
then
>>>>>>>> they should still be logical.
>>>>>>>>
>>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue <maryann.xue@gmail.com>
>>>>>>>> wrote:
>>>>>>>> > The partial aggregate seems to be working now, with
one interface
>>>>>>>> extension
>>>>>>>> > and one bug fix in the Phoenix project. Will do some
code cleanup
>>>>>>>> and
>>>>>>>> > create a pull request soon.
>>>>>>>> >
>>>>>>>> > Still there was a hack in the Drill project which I
made to force
>>>>>>>> 2-phase
>>>>>>>> > aggregation. I'll try to fix that.
>>>>>>>> >
>>>>>>>> > Jacques, I have one question though, how can I verify
that there
>>>>>>>> are more
>>>>>>>> > than one slice and the shuffle happens?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > Thanks,
>>>>>>>> > Maryann
>>>>>>>> >
>>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor <
>>>>>>>> jamestaylor@apache.org> wrote:
>>>>>>>> >
>>>>>>>> >> Maryann,
>>>>>>>> >> I believe Jacques mentioned that a little bit of
refactoring is
>>>>>>>> required
>>>>>>>> >> for a merge sort to occur - there's something that
does that,
>>>>>>>> but it's not
>>>>>>>> >> expected to be used in this context currently.
>>>>>>>> >>
>>>>>>>> >> IMHO, there's more of a clear value in getting the
aggregation
>>>>>>>> to use
>>>>>>>> >> Phoenix first, so I'd recommend going down that
road as Jacques
>>>>>>>> mentioned
>>>>>>>> >> above if possible. Once that's working, we can circle
back to
>>>>>>>> the partial
>>>>>>>> >> sort.
>>>>>>>> >>
>>>>>>>> >> Thoughts?
>>>>>>>> >> James
>>>>>>>> >>
>>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue <
>>>>>>>> maryann.xue@gmail.com>
>>>>>>>> >> wrote:
>>>>>>>> >>
>>>>>>>> >>> I actually tried implementing partial sort with
>>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which
I figured
>>>>>>>> might be a
>>>>>>>> >>> little easier to start with than partial aggregation.
But I
>>>>>>>> found that even
>>>>>>>> >>> though the code worked (returned the right results),
the Drill
>>>>>>>> side sort
>>>>>>>> >>> turned out to be a ordinary sort instead of
a merge which it
>>>>>>>> should have
>>>>>>>> >>> been. Any idea of how to fix that?
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Thanks,
>>>>>>>> >>> Maryann
>>>>>>>> >>>
>>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau
<
>>>>>>>> jacques@dremio.com>
>>>>>>>> >>> wrote:
>>>>>>>> >>>
>>>>>>>> >>>> Right now this type of work is done here:
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
>>>>>>>> >>>>
>>>>>>>> >>>> With Distribution Trait application here:
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
>>>>>>>> >>>>
>>>>>>>> >>>> To me, the easiest way to solve the Phoenix
issue is by
>>>>>>>> providing a rule
>>>>>>>> >>>> that matches HashAgg and StreamAgg but requires
Phoenix
>>>>>>>> convention as
>>>>>>>> >>>> input. It would replace everywhere but would
only be plannable
>>>>>>>> when it is
>>>>>>>> >>>> the first phase of aggregation.
>>>>>>>> >>>>
>>>>>>>> >>>> Thoughts?
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>> --
>>>>>>>> >>>> Jacques Nadeau
>>>>>>>> >>>> CTO and Co-Founder, Dremio
>>>>>>>> >>>>
>>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde
<jhyde@apache.org>
>>>>>>>> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>>> Phoenix is able to perform quite a few
relational operations
>>>>>>>> on the
>>>>>>>> >>>>> region server: scan, filter, project,
aggregate, sort
>>>>>>>> (optionally with
>>>>>>>> >>>>> limit). However, the sort and aggregate
are necessarily
>>>>>>>> "local". They
>>>>>>>> >>>>> can only deal with data on that region
server, and there
>>>>>>>> needs to be a
>>>>>>>> >>>>> further operation to combine the results
from the region
>>>>>>>> servers.
>>>>>>>> >>>>>
>>>>>>>> >>>>> The question is how to plan such queries.
I think the answer
>>>>>>>> is an
>>>>>>>> >>>>> AggregateExchangeTransposeRule.
>>>>>>>> >>>>>
>>>>>>>> >>>>> The rule would spot an Aggregate on
a data source that is
>>>>>>>> split into
>>>>>>>> >>>>> multiple locations (partitions) and
split it into a partial
>>>>>>>> Aggregate
>>>>>>>> >>>>> that computes sub-totals and a summarizing
Aggregate that
>>>>>>>> combines
>>>>>>>> >>>>> those totals.
>>>>>>>> >>>>>
>>>>>>>> >>>>> How does the planner know that the Aggregate
needs to be
>>>>>>>> split? Since
>>>>>>>> >>>>> the data's distribution has changed,
there would need to be an
>>>>>>>> >>>>> Exchange operator. It is the Exchange
operator that triggers
>>>>>>>> the rule
>>>>>>>> >>>>> to fire.
>>>>>>>> >>>>>
>>>>>>>> >>>>> There are some special cases. If the
data is sorted as well as
>>>>>>>> >>>>> partitioned (say because the local aggregate
uses a sort-based
>>>>>>>> >>>>> algorithm) we could maybe use a more
efficient plan. And if
>>>>>>>> the
>>>>>>>> >>>>> partition key is the same as the aggregation
key we don't
>>>>>>>> need a
>>>>>>>> >>>>> summarizing Aggregate, just a Union.
>>>>>>>> >>>>>
>>>>>>>> >>>>> It turns out not to be very Phoenix-specific.
In the
>>>>>>>> Drill-on-Phoenix
>>>>>>>> >>>>> scenario, once the Aggregate has been
pushed through the
>>>>>>>> Exchange
>>>>>>>> >>>>> (i.e. onto the drill-bit residing on
the region server) we
>>>>>>>> can then
>>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix
membrane
>>>>>>>> and make
>>>>>>>> >>>>> it into a PhoenixServerAggregate that
executes in the region
>>>>>>>> server.
>>>>>>>> >>>>>
>>>>>>>> >>>>> Related issues:
>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840
>>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751
>>>>>>>> >>>>>
>>>>>>>> >>>>> Julian
>>>>>>>> >>>>>
>>>>>>>> >>>>
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message