From user-return-28561-apmail-spark-user-archive=spark.apache.org@spark.apache.org Thu Mar 12 12:05:23 2015 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7198617365 for ; Thu, 12 Mar 2015 12:05:23 +0000 (UTC) Received: (qmail 32691 invoked by uid 500); 12 Mar 2015 12:05:11 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 32617 invoked by uid 500); 12 Mar 2015 12:05:10 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 32607 invoked by uid 99); 12 Mar 2015 12:05:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2015 12:05:10 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of juan.rodriguez.hortala@gmail.com designates 209.85.218.41 as permitted sender) Received: from [209.85.218.41] (HELO mail-oi0-f41.google.com) (209.85.218.41) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2015 12:05:04 +0000 Received: by oigh136 with SMTP id h136so1265840oig.2 for ; Thu, 12 Mar 2015 05:03:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:reply-to:in-reply-to:references:date:message-id :subject:from:to:cc:content-type; bh=X+vRhMpruw1qVSiBX0emyPQiZlgg/xy12++8cq/7LFc=; b=nb3A/yc8BpqsWhMuO8GAEv+KcAmvralfA7gqVEs67NrufKeTx63xvhh3OC6n+Bwedz B744JUPgLP9guxmGf/Ym9ZIshiTswMgAS7lasWId7xkgWufc5oKX+ur9S+F3jG6dCJ89 VHWmJSL5EFE+tKVLvjbXCHVH8MQqK2y+NysXbls9336tUrYP1q4Ox8ii98MlEsR7c46j pjoA5JNOAFUzi9SisgjtX36sAuOysTLWjJm4ACdE0czIIr2Bszn5OuYABzaFytTJvz8v zT1QIq929Ngf+SDibCXenh4WOy+vJ5IPB7fQaNfFSLGludBJK8zTECHdtj6MPJ/Geky/ 158A== MIME-Version: 1.0 X-Received: by 10.60.97.35 with SMTP id dx3mr33587316oeb.6.1426161838751; Thu, 12 Mar 2015 05:03:58 -0700 (PDT) Received: by 10.60.131.66 with HTTP; Thu, 12 Mar 2015 05:03:58 -0700 (PDT) Reply-To: juan.rodriguez.hortala@gmail.com In-Reply-To: References: Date: Thu, 12 Mar 2015 13:03:58 +0100 Message-ID: Subject: Re: Is there a limit to the number of RDDs in a Spark context? From: =?UTF-8?B?SnVhbiBSb2Ryw61ndWV6IEhvcnRhbMOh?= To: Sean Owen Cc: =?UTF-8?B?UGF3ZcWCIFN6dWxj?= , "user@spark.apache.org" Content-Type: multipart/alternative; boundary=089e01227a18ca85cc0511162d23 X-Virus-Checked: Checked by ClamAV on apache.org --089e01227a18ca85cc0511162d23 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, It's been some time since my last message on the subject of using many RDDs in a Spark job, but I have just encountered the same problem again. The thing it's that I have an RDD of time tagged data, that I want to 1) divide into windows according to a timestamp field; 2) compute KMeans for each time window; 3) collect the results in a RDD of pairs that contains the centroids for each time window. For 1) I generate a RDD of pairs where the key is an id for the time window, but for 2) I find the problem that KMeans from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I think this is a reusability problem for any algorithms in MLlib based on passing an RDD, if we want to apply the algorithm independently to several groups of data. So the only approaches I can imagine are: a) Generate an RDD per time window, which is easy to do but doesn't work because it's easy to end up with thousand of windows hence thousands of RDDs, which freezes the Spark scheduler, as seen in my previous messages b) Collect the set of ids for the time windows in the driver, and traverse that set by generating an RDD per each window, calling KMeans, and then storing the results with an export action. I will try that now and I think that could work because only one RDD per window will be present at the same time. The point here is that we avoid creating an RDD with a lineage dependending on a thousand RDDs, like in the collecting phase 3) of a). But that implies a sequential execution of the computation of KMeans, which is a waste of resources: imagine I have a cluster with 200 machines and I can compute each call to KMeans in 5 machines in 10 minutes, and I have 1000 windows to compute hence 1000 calls to KMeans; by sequencing the KMeans computations I would be having 195 idle machines and a running time of 10 * 1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs for some number m that doesn't freezes the Spark scheduler, but I think that's a not very clean workaround. Also, this makes very difficult to reuse this computation of KMeans by window in a bigger program, because I'm not able to get an RDD with a key per window id and the centroids in the values. The only way I imagine I could do that is by storing the pairs in a database during the export actions, and then loading all the results in a single RDD, but I would prefer to do everything inside Spark if possible. Maybe I'm missing something here, any idea would be appreciated. Thanks in advance for your help, Greetings, Juan Rodriguez 2015-02-18 20:23 GMT+01:00 Juan Rodr=C3=ADguez Hortal=C3=A1 < juan.rodriguez.hortala@gmail.com>: > Hi Sean, > > Thanks a lot for your answer. That explains it, as I was creating > thousands of RDDs, so I guess the communication overhead was the reason w= hy > the Spark job was freezing. After changing the code to use RDDs of pairs > and aggregateByKey it works just fine, and quite fast. > > Again, thanks a lot for your help. > > Greetings, > > Juan > > 2015-02-18 15:35 GMT+01:00 Sean Owen : > >> At some level, enough RDDs creates hundreds of thousands of tiny >> partitions of data each of which creates a task for each stage. The >> raw overhead of all the message passing can slow things down a lot. I >> would not design something to use an RDD per key. You would generally >> use key by some value you want to divide and filter on, and then use a >> *ByKey to do your work. >> >> You don't work with individual RDDs this way, but usually that's good >> news. You usually have a lot more flexibility operating just in pure >> Java / Scala to do whatever you need inside your function. >> >> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodr=C3=ADguez Hortal=C3=A1 >> wrote: >> > Hi Pawe=C5=82, >> > >> > Thanks a lot for your answer. I finally got the program to work by usi= ng >> > aggregateByKey, but I was wondering why creating thousands of RDDs >> doesn't >> > work. I think that could be interesting for using methods that work on >> RDDs >> > like for example JavaDoubleRDD.stats() ( >> > >> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/J= avaDoubleRDD.html#stats%28%29 >> ). >> > If the groups are small then I can chain groupBy(), collect(), >> parallelize() >> > and stats(), but that is quite inefficient because it implies moving >> data to >> > and from the driver, and also doesn't scale to big groups; on the othe= r >> hand >> > if I use aggregateByKey or a similar function then I cannot use stats(= ) >> so I >> > have to reimplement it. In general I was looking for a way to reuse >> other >> > functions that I have that work on RDDs, for using them on groups of >> data in >> > a RDD, because I don't see a how to directly apply them to each of the >> > groups in a pair RDD. >> > >> > Again, thanks a lot for your answer, >> > >> > Greetings, >> > >> > Juan Rodriguez >> > >> > >> > >> > >> > 2015-02-18 14:56 GMT+01:00 Pawe=C5=82 Szulc : >> >> >> >> Maybe you can omit using grouping all together with groupByKey? What = is >> >> your next step after grouping elements by key? Are you trying to redu= ce >> >> values? If so then I would recommend using some reducing functions >> like for >> >> example reduceByKey or aggregateByKey. Those will first reduce value >> for >> >> each key locally on each node before doing actual IO over the network= . >> There >> >> will also be no grouping phase so you will not run into memory issues= . >> >> >> >> Please let me know if that helped >> >> >> >> Pawel Szulc >> >> @rabbitonweb >> >> http://www.rabbitonweb.com >> >> >> >> >> >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodr=C3=ADguez Hortal=C3=A1 >> >> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I'm writing a Spark program where I want to divide a RDD into >> different >> >>> groups, but the groups are too big to use groupByKey. To cope with >> that, >> >>> since I know in advance the list of keys for each group, I build a >> map from >> >>> the keys to the RDDs that result from filtering the input RDD to get >> the >> >>> records for the corresponding key. This works when I have a small >> number of >> >>> keys, but for big number of keys (tens of thousands) the execution >> gets >> >>> stuck, without issuing any new Spark stage. I suspect the reason is >> that the >> >>> Spark scheduler is not able to handle so many RDDs. Does it make >> sense? I'm >> >>> rewriting the program to use a single RDD of pairs, with cached >> partions, >> >>> but I wanted to be sure I understand the problem here. >> >>> >> >>> Thanks a lot in advance, >> >>> >> >>> Greetings, >> >>> >> >>> Juan Rodriguez >> >> >> >> >> > >> > > --089e01227a18ca85cc0511162d23 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

It's been some t= ime since my last message on the subject of using many RDDs in a Spark job,= but I have just encountered the same problem again. The thing it's tha= t I have an RDD of time tagged data, that I want to 1) divide into windows = according to a timestamp field; 2) compute KMeans for each time window; 3) = collect the results in a RDD of pairs that contains the centroids for each = time window. For 1) I generate a RDD of pairs where the key is an id for th= e time window, but for 2) I find the problem that KMeans from MLlib only ac= cepts a RDD, and I cannot call it from aggregateByKey. I think this is a re= usability problem for any algorithms in MLlib based on passing an RDD, if w= e want to apply the algorithm independently to several groups of data. So t= he only approaches I can imagine are:

a) Generate an RDD per t= ime window, which is easy to do but doesn't work because it's easy = to end up with thousand of windows hence thousands of RDDs, which freezes t= he Spark scheduler, as seen in my previous messages

b) Collect= the set of ids for the time windows in the driver, and traverse that set b= y generating an RDD per each window, calling KMeans, and then storing the r= esults with an export action. I will try that now and I think that could wo= rk because only one RDD per window will be present at the same time. The po= int here is that we avoid creating an RDD with a lineage dependending on a = thousand RDDs, like in the collecting phase 3) of a). But that implies a se= quential execution of the computation of KMeans, which is a waste of resour= ces: imagine I have a cluster with 200 machines and I can compute each call= to KMeans in 5 machines in 10 minutes, and I have 1000 windows to compute = hence 1000 calls to KMeans; by sequencing the KMeans computations I would b= e having 195 idle machines and a running time of 10 * 1000 windows. Maybe t= his could be overcome by having not 1 RDD but m RDDs for some number m that= doesn't freezes the Spark scheduler, but I think that's a not very= clean workaround.=C2=A0 Also, this makes very difficult to reuse this comp= utation of KMeans by window in a bigger program, because I'm not able t= o get an RDD with a key per window id and the centroids in the values. The = only way I imagine I could do that is by storing the pairs in a database du= ring the export actions, and then loading all the results in a single RDD, = but I would prefer to do everything inside Spark if possible.

= Maybe I'm missing something here, any idea would be appreciated.
Thanks in advance for your help,

G= reetings,

Juan Rodriguez

<= /div>

= 2015-02-18 20:23 GMT+01:00 Juan Rodr=C3=ADguez Hortal=C3=A1 <juan.rodriguez.hortala@gmail.com>:
Hi Sean,

Thanks a lot fo= r your answer. That explains it, as I was creating thousands of RDDs, so I = guess the communication overhead was the reason why the Spark job was freez= ing. After changing the code to use RDDs of pairs and aggregateByKey it wor= ks just fine, and quite fast.

Again, thanks a lot for your help.
Greetings,

Juan

2015-0= 2-18 15:35 GMT+01:00 Sean Owen <sowen@cloudera.com>:
At some level, enough RDDs creates hundreds of = thousands of tiny
partitions of data each of which creates a task for each stage. The
raw overhead of all the message passing can slow things down a lot. I
would not design something to use an RDD per key. You would generally
use key by some value you want to divide and filter on, and then use a
*ByKey to do your work.

You don't work with individual RDDs this way, but usually that's go= od
news. You usually have a lot more flexibility operating just in pure
Java / Scala to do whatever you need inside your function.

On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodr=C3=ADguez Hortal=C3=A1
<juan.rodriguez.hortala@gmail.com> wrote:
> Hi Pawe=C5=82,
>
> Thanks a lot for your answer. I finally got the program to work by usi= ng
> aggregateByKey, but I was wondering why creating thousands of RDDs doe= sn't
> work. I think that could be interesting for using methods that work on= RDDs
> like for example JavaDoubleRDD.stats() (
> http://spark.= apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.htm= l#stats%28%29).
> If the groups are small then I can chain groupBy(), collect(), paralle= lize()
> and stats(), but that is quite inefficient because it implies moving d= ata to
> and from the driver, and also doesn't scale to big groups; on the = other hand
> if I use aggregateByKey or a similar function then I cannot use stats(= ) so I
> have to reimplement it. In general I was looking for a way to reuse ot= her
> functions that I have that work on RDDs, for using them on groups of d= ata in
> a RDD, because I don't see a how to directly apply them to each of= the
> groups in a pair RDD.
>
> Again, thanks a lot for your answer,
>
> Greetings,
>
> Juan Rodriguez
>
>
>
>
> 2015-02-18 14:56 GMT+01:00 Pawe=C5=82 Szulc <paul.szulc@gmail.com>:
>>
>> Maybe you can omit using grouping all together with groupByKey? Wh= at is
>> your next step after grouping elements by key? Are you trying to r= educe
>> values? If so then I would recommend using some reducing functions= like for
>> example reduceByKey or aggregateByKey. Those will first reduce val= ue for
>> each key locally on each node before doing actual IO over the netw= ork. There
>> will also be no grouping phase so you will not run into memory iss= ues.
>>
>> Please let me know if that helped
>>
>> Pawel Szulc
>> @rabbitonweb
>> http://ww= w.rabbitonweb.com
>>
>>
>> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodr=C3=ADguez Hortal=C3=A1=
>> <juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Spark program where I want to divide a RDD i= nto different
>>> groups, but the groups are too big to use groupByKey. To cope = with that,
>>> since I know in advance the list of keys for each group, I bui= ld a map from
>>> the keys to the RDDs that result from filtering the input RDD = to get the
>>> records for the corresponding key. This works when I have a sm= all number of
>>> keys, but for big number of keys (tens of thousands) the execu= tion gets
>>> stuck, without issuing any new Spark stage. I suspect the reas= on is that the
>>> Spark scheduler is not able to handle so many RDDs. Does it ma= ke sense? I'm
>>> rewriting the program to use a single RDD of pairs, with cache= d partions,
>>> but I wanted to be sure I understand the problem here.
>>>
>>> Thanks a lot in advance,
>>>
>>> Greetings,
>>>
>>> Juan Rodriguez
>>
>>
>


--089e01227a18ca85cc0511162d23--