spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Perrigo <tperr...@gmail.com>
Subject Re: Help with Initial Cluster Configuration / Tuning
Date Tue, 22 Oct 2013 13:44:00 GMT
I didn't realize that about countByValue...thanks!  And thanks for the code
advice as well; I appreciate all the help I can get!

I've switched from countByValue to the reduceByKey method you sent; with
that as the only change, I did see much better utilization of the nodes in
the cluster, but eventually still got OutOfMemory errors.  I'm trying now
setting the number of tasks as the second parameter to reduceByKey, using
100 (as you suggested).  My question at this point is-- is there any
rule-of-thumb for determining when you need to specify the number of tasks,
or for determining how many tasks to use?  Similarly, I see the
SparkContext.textFile method allows the setting of 'default min splits';
when would you want / need to set this parameter, and how would you
determine what to set it to?

Sorry to overwhelm you with questions!  I've been really enjoying working
with Spark, but I haven't been able to figure out how to answer these kinds
of questions.  Your help would be very much appreciated!

Tim


On Mon, Oct 21, 2013 at 8:18 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> Hi there,
>
> The problem is that countByValue happens in only a single reduce task --
> this is probably something we should fix but it's basically not designed
> for lots of values. Instead, do the count in parallel as follows:
>
> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>
> If this still has trouble, you can also increase the level of parallelism
> of reduceByKey by passing it a second parameter for the number of tasks
> (e.g. 100).
>
> BTW one other small thing with your code, flatMap should actually work
> fine if your function returns an Iterator to Traversable, so there's no
> need to call toList and return a Seq in ngrams; you can just return an
> Iterator[String].
>
> Matei
>
> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tperrigo@gmail.com> wrote:
>
> > Hi everyone,
> > I am very new to Spark, so as a learning exercise I've set up a small
> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
> I'm hoping to use to calculate ngram frequencies from text files of various
> sizes (I'm not doing anything with them; I just thought this would be
> slightly more interesting than the usual 'word count' example).  Currently,
> I'm trying to work with a 1GB text file, but running into memory issues.
>  I'm wondering what parameters I should be setting (in spark-env.sh) in
> order to properly utilize the cluster.  Right now, I'd be happy just to
> have the process complete successfully with the 1 gig file, so I'd really
> appreciate any suggestions you all might have.
> >
> > Here's a summary of the code I'm running through the spark shell on the
> master:
> >
> > def ngrams(s: String, n: Int = 3): Seq[String] = {
> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
> ")).map(_.trim).toList
> > }
> >
> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
> >
> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
> >
> > So far so good; the problems come during the reduce phase.  With small
> files, I was able to issue the following to calculate the most frequently
> occurring trigram:
> >
> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
> b:(String, Long)) => if (a._2 > b._2) a else b)
> >
> > With the 1 gig file, though, I've been running into OutOfMemory errors,
> so I decided to split the reduction to several steps, starting with simply
> issuing countByValue of my "mapped" RDD, but I have yet to get it to
> complete successfully.
> >
> > SPARK_MEM is currently set to 6154m.  I also bumped up the
> spark.akka.framesize setting to 500 (though at this point, I was grasping
> at straws; I'm not sure what a "proper" value would be).  What properties
> should I be setting for a job of this size on a cluster of 3 m1.large
> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>  Also, programmatically, what should I be doing differently?  (For example,
> should I be setting the minimum number of splits when reading the text
> file?  If so, what would be a good default?).
> >
> > I apologize for what I'm sure are very naive questions.  I think Spark
> is a fantastic project and have enjoyed working with it, but I'm still very
> much a newbie and would appreciate any help you all can provide (as well as
> any 'rules-of-thumb' or best practices I should be following).
> >
> > Thanks,
> > Tim Perrigo
>
>

Mime
View raw message