From dev-return-17078-apmail-drill-dev-archive=drill.apache.org@drill.apache.org Wed Oct 7 16:19:30 2015 Return-Path: X-Original-To: apmail-drill-dev-archive@www.apache.org Delivered-To: apmail-drill-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2B59118209 for ; Wed, 7 Oct 2015 16:19:30 +0000 (UTC) Received: (qmail 17328 invoked by uid 500); 7 Oct 2015 16:19:26 -0000 Delivered-To: apmail-drill-dev-archive@drill.apache.org Received: (qmail 17206 invoked by uid 500); 7 Oct 2015 16:19:26 -0000 Mailing-List: contact dev-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list dev@drill.apache.org Received: (qmail 16851 invoked by uid 99); 7 Oct 2015 16:19:26 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Oct 2015 16:19:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E2A7FC028F for ; Wed, 7 Oct 2015 16:19:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.981 X-Spam-Level: *** X-Spam-Status: No, score=3.981 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id e5F_JdLd_Lkb for ; Wed, 7 Oct 2015 16:19:16 +0000 (UTC) Received: from mail-wi0-f180.google.com (mail-wi0-f180.google.com [209.85.212.180]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 879E2204C8 for ; Wed, 7 Oct 2015 16:19:15 +0000 (UTC) Received: by wicge5 with SMTP id ge5so220652162wic.0 for ; Wed, 07 Oct 2015 09:19:15 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:cc:content-type; bh=oIFa6cfeUwlT0vwEJS/qsJqbJkykh14AEfnPeW2tICw=; b=WDWdTxPai+uaLMlUSJyrQnpxtqyXvp9xX49F7dJWjErK7prRST5rNLnvs6eQT3ZajX 1SWxSnYeZ0+PlZybqri9ruoS21i3UBUioKhPkz+thoMDvGxniXDXgrFmCM0/lxk6Qdq8 szOtn02D+pt/AVcuAdNDJC0rDYT3qL/R/3EsiopZyKtDTCRFZMeKx7YIVhyWFAwtaf61 x9GvFY50I8+Fk/30Yr9wIoEl8pf5nALp7/k2f7UF9j7r2jvVa3GyXIeUfAn+3tiySq+u vdVm7T/e1YvYEjbE1sLUyTnDxYxiq7OI/oaLjRl7QcyB7v4LDJXAMgU5y9ubfAQtJkhW a/XA== X-Gm-Message-State: ALoCoQnmaENRjmas/0b3ojmm9UJxwV8hVJaDulHFlT6nq6rtHVO1+GSNdQcsrluhns6GiB2mLBKM MIME-Version: 1.0 X-Received: by 10.180.103.167 with SMTP id fx7mr2613760wib.89.1444234755252; Wed, 07 Oct 2015 09:19:15 -0700 (PDT) Received: by 10.27.101.10 with HTTP; Wed, 7 Oct 2015 09:19:15 -0700 (PDT) In-Reply-To: References: Date: Wed, 7 Oct 2015 09:19:15 -0700 Message-ID: Subject: Re: Partial aggregation in Drill-on-Phoenix From: Jacques Nadeau To: Maryann Xue Cc: James Taylor , Julian Hyde , dev@drill.apache.org, Maryann Xue Content-Type: multipart/alternative; boundary=f46d044286748f4fa80521861b61 --f46d044286748f4fa80521861b61 Content-Type: text/plain; charset=UTF-8 I just filed a jira for the merge issue: https://issues.apache.org/jira/browse/DRILL-3907 -- Jacques Nadeau CTO and Co-Founder, Dremio On Wed, Oct 7, 2015 at 8:54 AM, Jacques Nadeau wrote: > Drill doesn't currently have a merge-sort operation available outside the > context of an exchange. See here: > > > https://github.com/apache/drill/tree/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver > > We'll need to do a bit of refactoring to provide this functionality > outside the context of an exchange. The one other thing we'll have to think > about in this context is how do we avoid doing a n-way merge in the case > that the we're not using the collation. > > -- > Jacques Nadeau > CTO and Co-Founder, Dremio > > On Wed, Oct 7, 2015 at 8:18 AM, Maryann Xue wrote: > >> One thing from what I asked James offline yesterday, and maybe we can >> discuss a little bit in today's meeting: >> >> Phoenix uses a list of lists of Scan objects to indicate Region >> boundaries and guideposts, and if the top-level list contains more than one >> element it means that the results from different Scanner/ResultIterator >> should be merge-sorted. We now use this list in Drill integration to >> generate different batches or slices. I see from the Drill plan of a simple >> select like "SELECT * FROM A.BEER" that a Drill Sort node sits on top of >> the PhoenixTableScan. I guess this is a real sort rather than a merge-sort. >> So optimally, >> 1) this should be a merge-sort (to be more accurate, a merge) >> 2) furthermore, if Drill has something to indicate the order among slices >> and batches, we could even turn it into a concat. >> >> The structure of this Scan list might be helpful for 2), or we may have >> some Logical representation for this. Otherwise, we can simply flatten this >> list to a one-dimensional list as we do now (in my ci yesterday). >> >> >> >> Thanks, >> Maryann >> >> On Tue, Oct 6, 2015 at 9:52 PM, Maryann Xue >> wrote: >> >>> Yes, but the partially aggregated results will not contain any duplicate >>> rowkeys, since they are also group-by keys. What we need is the aggregators >>> and call aggregate for each row. We can write a new simpler ResultIterator >>> to replace this, but for now it should work correctly. >>> >>> On Tue, Oct 6, 2015 at 9:45 PM, James Taylor >>> wrote: >>> >>>> The results we get back from the server-side scan are already the >>>> partial aggregated values we need. GroupedAggregatingResultIterator >>>> will collapse adjacent Tuples together which happen to have the same row >>>> key. I'm not sure we want/need this to happen. Instead I think we just need >>>> to decode the aggregated values directly from the result of the scan. >>>> >>>> On Tue, Oct 6, 2015 at 6:07 PM, Maryann Xue >>>> wrote: >>>> >>>>> Hi James, >>>>> >>>>> bq. A few questions for you: not sure I understand the changes you >>>>> made to PhoenixRecordReader. Is it necessary to wrap the server-side scan >>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will >>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>> merge. Note too that the results aren't sorted that come back from the >>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >>>>> by the group by key). Or is this just to help in decoding the values coming >>>>> back from the scan? >>>>> >>>>> It is necessary. I suppose what we should return as a partial result >>>>> from PhoenixRecordReader is exactly the same as what we do in standalone >>>>> Phoenix+Calcite, except that the result is partial or say incomplete. For >>>>> example, we have "select a, count(*) from t group by a", we should return >>>>> rows that have "a" as the first expression value, and "count(*)" as the >>>>> second expression value. For this "count" expression, it actually needs a >>>>> ClientAggregator for evaluation, and that's what this >>>>> GroupedAggregatingResultIterator is used for. >>>>> Since "each server-side scan will produce results with a single tuple >>>>> per group by key", and PhoenixRecordReader is only dealing with one >>>>> server-side result each time, we don't care how the group-by keys are >>>>> arranged (ordered or unordered"). Actually >>>>> GroupedAggregatingResultIterator is not the group-by iterator we use >>>>> for AggregatePlan. It does not "combine". It treats every row as a >>>>> different group, by returning its rowkey as the group-by key ( >>>>> GroupedAggregatingResultIterator.java:56). >>>>> >>>>> In short, this iterator is for decoding the server-side values. So we >>>>> may want to optimize this logic by removing this serialization and >>>>> deserialization and having only one set of aggregators in future. >>>>> >>>>> bq. Also, not sure what impact it has in the way we "combine" the >>>>> scans in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), >>>>> as each of our scans could include duplicate group by keys. Is it ok to >>>>> combine them in this case? >>>>> >>>>> It should not matter, or at least is not related to the problem I'm >>>>> now having. >>>>> >>>>> bq. One more question: how is the group by key communicated back to >>>>> Drill? >>>>> >>>>> According to the HashAggPrule, if it decides to create a two-phase >>>>> aggregate, the first phase is now handled by Phoenix (after applying the >>>>> PhoenixHashAggPrule). I assume then the partial results gets shuffled based >>>>> on the hash of their group-by keys (returned by PhoenixRecordReader). The >>>>> final step is the Drill hash aggregation. >>>>> >>>>> >>>>> This is my test table "A.BEER", which has for columns: "B", "E1", >>>>> "E2", "R", all of INTEGER types. And the data is generated like this: >>>>> for (x=1 to N) { //currently N=1000 >>>>> UPSERT INTO A.BEER VALUES (x, x % 10, x % 100, x); >>>>> } >>>>> >>>>> The group-by query for testing is "SELECT e1, count(*) FROM a.beer >>>>> GROUP BY e1". >>>>> The expected result should be: >>>>> 0 100 >>>>> 1 100 >>>>> 2 100 >>>>> 3 100 >>>>> 4 100 >>>>> 5 100 >>>>> 6 100 >>>>> 7 100 >>>>> 8 100 >>>>> 9 100 >>>>> The actual result was: >>>>> 6 0 >>>>> 7 0 >>>>> 8 0 >>>>> 9 0 >>>>> 0 0 >>>>> 1 100 >>>>> 2 100 >>>>> 3 100 >>>>> 4 100 >>>>> 5 100 >>>>> >>>>> Here I just tried another one "SELECT e2, count(*) FROM a.beer GROUP >>>>> BY e2". >>>>> Similarly, the expected result should have group-by keys from 0 to 99, >>>>> each having a value of 10 as the count, while the actual result was: >>>>> from group-by key 86 to 99, together with 0, their count values were >>>>> all 0; the rest (1 to 85) all had the correct value 10. >>>>> >>>>> Looks to me that the scans were good but there was a problem with one >>>>> of the hash buckets. >>>>> >>>>> Thanks, >>>>> Maryann >>>>> >>>>> >>>>> On Tue, Oct 6, 2015 at 6:45 PM, James Taylor >>>>> wrote: >>>>> >>>>>> Nice progress, Maryann. >>>>>> >>>>>> A few questions for you: not sure I understand the changes you made >>>>>> to PhoenixRecordReader. Is it necessary to wrap the server-side scan >>>>>> results in a GroupedAggregatingResultIterator? Each server-side scan will >>>>>> produce results with a single tuple per group by key. In Phoenix, the >>>>>> GroupedAggregatingResultIterator's function in life is to do the final >>>>>> merge. Note too that the results aren't sorted that come back from the >>>>>> aggregated scan (while GroupedAggregatingResultIterator needs tuples sorted >>>>>> by the group by key). Or is this just to help in decoding the values coming >>>>>> back from the scan? >>>>>> >>>>>> Also, not sure what impact it has in the way we "combine" the scans >>>>>> in our Drill parallelization code (PhoenixGroupScan.applyAssignments()), as >>>>>> each of our scans could include duplicate group by keys. Is it ok to >>>>>> combine them in this case? >>>>>> >>>>>> One more question: how is the group by key communicated back to Drill? >>>>>> >>>>>> Thanks, >>>>>> James >>>>>> >>>>>> >>>>>> On Tue, Oct 6, 2015 at 2:10 PM, Maryann Xue >>>>>> wrote: >>>>>> >>>>>>> Added a few fixes in the pull request. Tested with two regions, >>>>>>> turned out that half of the result is empty (count = 0). >>>>>>> Not sure if there's anything wrong with >>>>>>> https://github.com/maryannxue/drill/blob/phoenix_plugin/contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rel/PhoenixHashAggPrule.java >>>>>>> . >>>>>>> Like Julian said, this rule looks a bit hacky. >>>>>>> >>>>>>> To force a 2-phase HashAgg, I made a temporary change as well: >>>>>>> >>>>>>> diff --git >>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>> >>>>>>> index b911f6b..58bc918 100644 >>>>>>> >>>>>>> --- >>>>>>> a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>> >>>>>>> +++ >>>>>>> b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>> >>>>>>> @@ -60,12 +60,12 @@ public abstract class AggPruleBase extends >>>>>>> Prule { >>>>>>> >>>>>>> // If any of the aggregate functions are not one of these, then we >>>>>>> >>>>>>> // currently won't generate a 2 phase plan. >>>>>>> >>>>>>> protected boolean create2PhasePlan(RelOptRuleCall call, >>>>>>> DrillAggregateRel aggregate) { >>>>>>> >>>>>>> - PlannerSettings settings = >>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>> >>>>>>> - RelNode child = call.rel(0).getInputs().get(0); >>>>>>> >>>>>>> - boolean smallInput = child.getRows() < >>>>>>> settings.getSliceTarget(); >>>>>>> >>>>>>> - if (! settings.isMultiPhaseAggEnabled() || >>>>>>> settings.isSingleMode() || smallInput) { >>>>>>> >>>>>>> - return false; >>>>>>> >>>>>>> - } >>>>>>> >>>>>>> +// PlannerSettings settings = >>>>>>> PrelUtil.getPlannerSettings(call.getPlanner()); >>>>>>> >>>>>>> +// RelNode child = call.rel(0).getInputs().get(0); >>>>>>> >>>>>>> +// boolean smallInput = child.getRows() < >>>>>>> settings.getSliceTarget(); >>>>>>> >>>>>>> +// if (! settings.isMultiPhaseAggEnabled() || >>>>>>> settings.isSingleMode() || smallInput) { >>>>>>> >>>>>>> +// return false; >>>>>>> >>>>>>> +// } >>>>>>> >>>>>>> >>>>>>> for (AggregateCall aggCall : aggregate.getAggCallList()) { >>>>>>> >>>>>>> String name = aggCall.getAggregation().getName(); >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Maryann >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Oct 6, 2015 at 2:31 PM, Julian Hyde >>>>>>> wrote: >>>>>>> >>>>>>>> Drill's current approach seems adequate for Drill alone but >>>>>>>> extending >>>>>>>> it to a heterogenous system that includes Phoenix seems like a hack. >>>>>>>> >>>>>>>> I think you should only create Prels for algebra nodes that you know >>>>>>>> for sure are going to run on the Drill engine. If there's a >>>>>>>> possibility that it would run in another engine such as Phoenix then >>>>>>>> they should still be logical. >>>>>>>> >>>>>>>> On Tue, Oct 6, 2015 at 11:03 AM, Maryann Xue >>>>>>>> wrote: >>>>>>>> > The partial aggregate seems to be working now, with one interface >>>>>>>> extension >>>>>>>> > and one bug fix in the Phoenix project. Will do some code cleanup >>>>>>>> and >>>>>>>> > create a pull request soon. >>>>>>>> > >>>>>>>> > Still there was a hack in the Drill project which I made to force >>>>>>>> 2-phase >>>>>>>> > aggregation. I'll try to fix that. >>>>>>>> > >>>>>>>> > Jacques, I have one question though, how can I verify that there >>>>>>>> are more >>>>>>>> > than one slice and the shuffle happens? >>>>>>>> > >>>>>>>> > >>>>>>>> > Thanks, >>>>>>>> > Maryann >>>>>>>> > >>>>>>>> > On Mon, Oct 5, 2015 at 2:03 PM, James Taylor < >>>>>>>> jamestaylor@apache.org> wrote: >>>>>>>> > >>>>>>>> >> Maryann, >>>>>>>> >> I believe Jacques mentioned that a little bit of refactoring is >>>>>>>> required >>>>>>>> >> for a merge sort to occur - there's something that does that, >>>>>>>> but it's not >>>>>>>> >> expected to be used in this context currently. >>>>>>>> >> >>>>>>>> >> IMHO, there's more of a clear value in getting the aggregation >>>>>>>> to use >>>>>>>> >> Phoenix first, so I'd recommend going down that road as Jacques >>>>>>>> mentioned >>>>>>>> >> above if possible. Once that's working, we can circle back to >>>>>>>> the partial >>>>>>>> >> sort. >>>>>>>> >> >>>>>>>> >> Thoughts? >>>>>>>> >> James >>>>>>>> >> >>>>>>>> >> On Mon, Oct 5, 2015 at 10:40 AM, Maryann Xue < >>>>>>>> maryann.xue@gmail.com> >>>>>>>> >> wrote: >>>>>>>> >> >>>>>>>> >>> I actually tried implementing partial sort with >>>>>>>> >>> https://github.com/jacques-n/drill/pull/4, which I figured >>>>>>>> might be a >>>>>>>> >>> little easier to start with than partial aggregation. But I >>>>>>>> found that even >>>>>>>> >>> though the code worked (returned the right results), the Drill >>>>>>>> side sort >>>>>>>> >>> turned out to be a ordinary sort instead of a merge which it >>>>>>>> should have >>>>>>>> >>> been. Any idea of how to fix that? >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Thanks, >>>>>>>> >>> Maryann >>>>>>>> >>> >>>>>>>> >>> On Mon, Oct 5, 2015 at 12:52 PM, Jacques Nadeau < >>>>>>>> jacques@dremio.com> >>>>>>>> >>> wrote: >>>>>>>> >>> >>>>>>>> >>>> Right now this type of work is done here: >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java >>>>>>>> >>>> >>>>>>>> >>>> With Distribution Trait application here: >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java >>>>>>>> >>>> >>>>>>>> >>>> To me, the easiest way to solve the Phoenix issue is by >>>>>>>> providing a rule >>>>>>>> >>>> that matches HashAgg and StreamAgg but requires Phoenix >>>>>>>> convention as >>>>>>>> >>>> input. It would replace everywhere but would only be plannable >>>>>>>> when it is >>>>>>>> >>>> the first phase of aggregation. >>>>>>>> >>>> >>>>>>>> >>>> Thoughts? >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>>> -- >>>>>>>> >>>> Jacques Nadeau >>>>>>>> >>>> CTO and Co-Founder, Dremio >>>>>>>> >>>> >>>>>>>> >>>> On Thu, Oct 1, 2015 at 2:30 PM, Julian Hyde >>>>>>>> wrote: >>>>>>>> >>>> >>>>>>>> >>>>> Phoenix is able to perform quite a few relational operations >>>>>>>> on the >>>>>>>> >>>>> region server: scan, filter, project, aggregate, sort >>>>>>>> (optionally with >>>>>>>> >>>>> limit). However, the sort and aggregate are necessarily >>>>>>>> "local". They >>>>>>>> >>>>> can only deal with data on that region server, and there >>>>>>>> needs to be a >>>>>>>> >>>>> further operation to combine the results from the region >>>>>>>> servers. >>>>>>>> >>>>> >>>>>>>> >>>>> The question is how to plan such queries. I think the answer >>>>>>>> is an >>>>>>>> >>>>> AggregateExchangeTransposeRule. >>>>>>>> >>>>> >>>>>>>> >>>>> The rule would spot an Aggregate on a data source that is >>>>>>>> split into >>>>>>>> >>>>> multiple locations (partitions) and split it into a partial >>>>>>>> Aggregate >>>>>>>> >>>>> that computes sub-totals and a summarizing Aggregate that >>>>>>>> combines >>>>>>>> >>>>> those totals. >>>>>>>> >>>>> >>>>>>>> >>>>> How does the planner know that the Aggregate needs to be >>>>>>>> split? Since >>>>>>>> >>>>> the data's distribution has changed, there would need to be an >>>>>>>> >>>>> Exchange operator. It is the Exchange operator that triggers >>>>>>>> the rule >>>>>>>> >>>>> to fire. >>>>>>>> >>>>> >>>>>>>> >>>>> There are some special cases. If the data is sorted as well as >>>>>>>> >>>>> partitioned (say because the local aggregate uses a sort-based >>>>>>>> >>>>> algorithm) we could maybe use a more efficient plan. And if >>>>>>>> the >>>>>>>> >>>>> partition key is the same as the aggregation key we don't >>>>>>>> need a >>>>>>>> >>>>> summarizing Aggregate, just a Union. >>>>>>>> >>>>> >>>>>>>> >>>>> It turns out not to be very Phoenix-specific. In the >>>>>>>> Drill-on-Phoenix >>>>>>>> >>>>> scenario, once the Aggregate has been pushed through the >>>>>>>> Exchange >>>>>>>> >>>>> (i.e. onto the drill-bit residing on the region server) we >>>>>>>> can then >>>>>>>> >>>>> push the DrillAggregate across the drill-to-phoenix membrane >>>>>>>> and make >>>>>>>> >>>>> it into a PhoenixServerAggregate that executes in the region >>>>>>>> server. >>>>>>>> >>>>> >>>>>>>> >>>>> Related issues: >>>>>>>> >>>>> * https://issues.apache.org/jira/browse/DRILL-3840 >>>>>>>> >>>>> * https://issues.apache.org/jira/browse/CALCITE-751 >>>>>>>> >>>>> >>>>>>>> >>>>> Julian >>>>>>>> >>>>> >>>>>>>> >>>> >>>>>>>> >>>> >>>>>>>> >>> >>>>>>>> >> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --f46d044286748f4fa80521861b61--