From user-return-794-apmail-crunch-user-archive=crunch.apache.org@crunch.apache.org Tue Apr 7 13:26:18 2015 Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DCA9A178F2 for ; Tue, 7 Apr 2015 13:26:17 +0000 (UTC) Received: (qmail 8496 invoked by uid 500); 7 Apr 2015 13:26:02 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 8462 invoked by uid 500); 7 Apr 2015 13:26:01 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 8444 invoked by uid 99); 7 Apr 2015 13:26:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 13:26:01 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of dpo5003@gmail.com designates 209.85.223.173 as permitted sender) Received: from [209.85.223.173] (HELO mail-ie0-f173.google.com) (209.85.223.173) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 13:25:57 +0000 Received: by iedfl3 with SMTP id fl3so52210073ied.1 for ; Tue, 07 Apr 2015 06:25:01 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=t1y3DqKE+44yC5SP4GvsPKA4x+q9N0rqMQtKxuWpnlQ=; b=zFnK8YrKbC8A1YSJ5RP7A2rwP3oGJM3VdNNlf8bLRyvHXAsLB5sb9p1sgw0tO99IDs 4rucMK2fMoUjY7nB7RfUeTHu0N0GooOPjSSXT+gyi/spkrJ1aPe/AFlCa2QN5AJRBxnl 17GsymM65oLFYA5inZt4LtbMiqI33X1UxSoJJMKd9LADquaSAo5QZ1GbgNwSlPG4DpUc bNf8rsmbZ1PXl63mIEwKnUugD+Mxu5pZtNyEmCd2adqy8ABBpD8lcDdCM5MuU8Tx1Qwm +asIYftP9iAOArb9DZbvZYTJmbbLozxOoeYCnI8OJkZ4yLzcTaEFu4L0H49zrqP6cPjN JZjA== X-Received: by 10.42.193.205 with SMTP id dv13mr26921540icb.53.1428413101806; Tue, 07 Apr 2015 06:25:01 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: David Ortiz Date: Tue, 07 Apr 2015 13:25:01 +0000 Message-ID: Subject: Re: Percentile rank To: user@crunch.apache.org Content-Type: multipart/alternative; boundary=20cf301d4196869a40051322578f X-Virus-Checked: Checked by ClamAV on apache.org --20cf301d4196869a40051322578f Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable That would be the expectation. Depending on the number of records though, it's possible to start getting OutOfMemoryErrors thrown by the Hadoop framework during the shuffle/sort phase. Had to completely a section of one of my pipelines because once we ran it on production level data that was happening. Depending on what else you're running on the cluster, that particular issue will also be very disruptive to other jobs. On Tue, Apr 7, 2015 at 3:27 AM Andr=C3=A9 Pinto wrote: > Hi Josh, > > Yes. I guess the reasoning to no have the Iterable on Sort.sort but have > it on the Secondary Sort was to avoid people using it on the complete dat= a > set then (and it is assumed that there will be never that much records wi= th > the same key, so it will be OK to iterate over those few records). Seems > reasonable. > > Yes, using the Iterable on a single reducer is certainly not the best way > to do this, but considering that there is no (simple) access to the globa= l > index I think there is really no other way. At least iterating over the > Iterable will not move all the data into memory right? It does lazy > loading, so it will just take a lot longer than doing it in parallel. > > Thanks. > > On Tue, Apr 7, 2015 at 4:06 AM, Josh Wills wrote: > >> Hey Andre, >> >> Not sure what you mean precisely-- do you mean an option or method in th= e >> Sort API that would include the rank of each item? >> >> In general, I like to avoid assuming that one reducer can handle all of >> the data in a PCollection on API methods, which I think is what you're >> saying (i.e., just stream all of the data in sorted order to a single >> reducer.) >> >> J >> >> On Mon, Apr 6, 2015 at 3:19 PM, Andr=C3=A9 Pinto >> wrote: >> >>> Hi Josh, >>> >>> Thanks for replying. >>> >>> That really sounds very hacky. I was expecting something with a little >>> more support from the API. >>> >>> I guess we could also use sortAndApply with a random generated singleto= n >>> Key for the entire set of values and then use the Iterable on the Value= s to >>> obtain the sorted index. It still looks bad though... >>> >>> Just out of curiosity, why isn't the Iterable approach also supported o= n >>> the simple Sort.sort? Sorry if this looks obvious to you, but I'm still= new >>> to Crunch and Hadoop. >>> >>> Thanks. >>> >>> On Thu, Apr 2, 2015 at 6:36 PM, Josh Wills wrote: >>> >>>> I can't think of a great way to do it-- knowing exactly which record >>>> you're processing (in any kind of order) in a distributed processing j= ob is >>>> always somewhat fraught. Gun to my head, I would do it in two phases: >>>> >>>> 1) Get the name of the FileSplit for the current task-- which can be >>>> retrieved, although we don't make it easy. You can do it via something= like >>>> this from inside of a map-side DoFn: >>>> >>>> InputSplit split =3D ((MapContext) getContext()).getInputSplit(); >>>> FileSplit baseSplit =3D (FileSplit) ((Supplier) split).get= (); >>>> >>>> The count up the number of records inside of each FileSplit. I'm not >>>> sure if you should disable combine files when you do this, but it seem= s >>>> like a good idea. >>>> >>>> 2) Create a new DoFn that takes the output of the previous job and use= s >>>> it to determine exactly which record in order the currently processing >>>> record is, based on the sorted order of the FileSplit names and an int= ernal >>>> counter that gets reset to zero for each new FileSplit. >>>> >>>> J >>>> >>>> On Thu, Apr 2, 2015 at 7:39 AM, Andr=C3=A9 Pinto < >>>> andredasilvapinto@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I'm trying to calculate the percentile ranks for the values of a >>>>> sorted PTable (i.e. at which % rank each element is within the whole = data >>>>> set). Is there a way to do this with Crunch? Seems that we would only= need >>>>> to have access to the global index of the record during an iteration = over >>>>> the data set. >>>>> >>>>> Thanks in advance, >>>>> Andr=C3=A9 >>>>> >>>>> >>>> >>>> >>>> -- >>>> Director of Data Science >>>> Cloudera >>>> Twitter: @josh_wills >>>> >>> >>> >> >> >> -- >> Director of Data Science >> Cloudera >> Twitter: @josh_wills >> > > --20cf301d4196869a40051322578f Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
That would be the expectation.=C2=A0 Depending on the numb= er of records though, it's possible to start getting OutOfMemoryErrors = thrown by the Hadoop framework during the shuffle/sort phase.=C2=A0 Had to = completely a section of one of my pipelines because once we ran it on produ= ction level data that was happening.=C2=A0 Depending on what else you'r= e running on the cluster, that particular issue will also be very disruptiv= e to other jobs.

On Tue, Apr 7, 201= 5 at 3:27 AM Andr=C3=A9 Pinto <andredasilvapinto@gmail.com> wrote:
Hi Josh,

Yes. I guess = the reasoning to no have the Iterable on Sort.sort but have it on the Secon= dary Sort was to avoid people using it on the complete data set then (and i= t is assumed that there will be never that much records with the same key, = so it will be OK to iterate over those few records). Seems reasonable.
<= br>
Yes, using the Iterable on a single reducer is certainly not the b= est way to do this, but considering that there is no (simple) access to the= global index I think there is really no other way. At least iterating over= the Iterable will not move all the data into memory right? It does lazy lo= ading, so it will just take a lot longer than doing it in parallel.

=
Thanks.

On Tue, Apr 7, 2015 at 4:06 AM, Josh Wills <jwills@cloudera.com> wrote:
Hey = Andre,

Not sure what you mean precisely-- do you mean an= option or method in the Sort API that would include the rank of each item?=

In general, I like to avoid assuming that one red= ucer can handle all of the data in a PCollection on API methods, which I th= ink is what you're saying (i.e., just stream all of the data in sorted = order to a single reducer.)

J
On Mon, Apr 6, 2015 at 3:19 PM, Andr=C3=A9 Pint= o <andredasilvapinto@gmail.com> wrote:
Hi Josh,
Thanks for replying.

That really sounds very hacky. I w= as expecting something with a little more support from the API.

I guess we could also use sortAndApply with a random generated singleton = Key for the entire set of values and then use the Iterable on the Values to= obtain the sorted index. It still looks bad though...

Ju= st out of curiosity, why isn't the Iterable approach also supported on = the simple Sort.sort? Sorry if this looks obvious to you, but I'm still= new to Crunch and Hadoop.

Thanks.

On Thu, Apr 2,= 2015 at 6:36 PM, Josh Wills <jwills@cloudera.com> wrote:<= br>
I can't think of a g= reat way to do it-- knowing exactly which record you're processing (in = any kind of order) in a distributed processing job is always somewhat fraug= ht. Gun to my head, I would do it in two phases:

1) Get = the name of the FileSplit for the current task-- which can be retrieved, al= though we don't make it easy. You can do it via something like this fro= m inside of a map-side DoFn:

InputSplit split =3D = ((MapContext) getContext()).getInputSplit();
FileSplit baseSplit = =3D (FileSplit) ((Supplier<InputSplit>) split).get();

<= /div>
The count up the number of records inside of each FileSplit. I= 9;m not sure if you should disable combine files when you do this, but it s= eems like a good idea.

2) Create a new DoFn that t= akes the output of the previous job and uses it to determine exactly which = record in order the currently processing record is, based on the sorted ord= er of the FileSplit names and an internal counter that gets reset to zero f= or each new FileSplit.

J

On Thu, Apr 2, 2015 at 7:39 AM, = Andr=C3=A9 Pinto <andredasilvapinto@gmail.com> wro= te:
Hi,

I'm trying to c= alculate the percentile ranks for the values of a sorted PTable (i.e. at wh= ich % rank each element is within the whole data set). Is there a way to do= this with Crunch? Seems that we would only need to have access to the glob= al index of the record during an iteration over the data set.

Thanks in advance,
Andr=C3=A9




<= font color=3D"#888888">--
Director of Data Science
Twitter: @jos= h_wills




--
=
Director of Data Science
Twitter: @josh_wills

--20cf301d4196869a40051322578f--