spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shay Seng <>
Subject Re: Help with Initial Cluster Configuration / Tuning
Date Tue, 22 Oct 2013 15:50:50 GMT
Hey Mark, I didn't mean to say that the information isn't out there -- just
that when something goes wrong with spark, the scope of what could be wrong
is so large - some bad setting with JVM, serializer, akka, badly written
scala code, algorithm wrong, check worker logs, check executor stderrs,

When I looked at this post this morning, my initial thought wasn't that
"countByValue" would be at fault. ...probably since I've only been using
Scala/Spark for a month or so.

It was just a suggestion to help newbies come up to speed more quickly and
gain insights into how to debug issues.

On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <>wrote:

> There's no need to guess at that.  The docs tell you directly:
> def countByValue(): Map[T, Long]
> Return the count of each unique value in this RDD as a map of (value,
> count) pairs. The final combine step happens locally on the master,
> equivalent to running a single reduce task.
> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <> wrote:
>> Hi Matei,
>> I've seen several memory tuning queries on this mailing list, and also
>> heard the same kinds of queries at the spark meetup. In fact the last
>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>> tuning is still a mystery".
>> I certainly had lots of issues in when I first started. From memory
>> issues to gc issues, things seem to run fine until you try something with
>> 500GB of data etc.
>> I was wondering if you could write up a little white paper or some guide
>> lines on how to set memory values, and what to look at when something goes
>> wrong? Eg. I would never gave guessed that countByValue happens on a single
>> machine etc.
>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <>
>> wrote:
>>> Hi there,
>>> The problem is that countByValue happens in only a single reduce task --
>>> this is probably something we should fix but it's basically not designed
>>> for lots of values. Instead, do the count in parallel as follows:
>>> val counts = => (str, 1)).reduceByKey((a, b) => a + b)
>>> If this still has trouble, you can also increase the level of
>>> parallelism of reduceByKey by passing it a second parameter for the number
>>> of tasks (e.g. 100).
>>> BTW one other small thing with your code, flatMap should actually work
>>> fine if your function returns an Iterator to Traversable, so there's no
>>> need to call toList and return a Seq in ngrams; you can just return an
>>> Iterator[String].
>>> Matei
>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <> wrote:
>>> > Hi everyone,
>>> > I am very new to Spark, so as a learning exercise I've set up a small
>>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
>>> I'm hoping to use to calculate ngram frequencies from text files of various
>>> sizes (I'm not doing anything with them; I just thought this would be
>>> slightly more interesting than the usual 'word count' example).  Currently,
>>> I'm trying to work with a 1GB text file, but running into memory issues.
>>>  I'm wondering what parameters I should be setting (in in
>>> order to properly utilize the cluster.  Right now, I'd be happy just to
>>> have the process complete successfully with the 1 gig file, so I'd really
>>> appreciate any suggestions you all might have.
>>> >
>>> > Here's a summary of the code I'm running through the spark shell on
>>> the master:
>>> >
>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
>>> ")).map(_.trim).toList
>>> > }
>>> >
>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>> >
>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>> >
>>> > So far so good; the problems come during the reduce phase.  With small
>>> files, I was able to issue the following to calculate the most frequently
>>> occurring trigram:
>>> >
>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>> >
>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>> errors, so I decided to split the reduction to several steps, starting with
>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>> complete successfully.
>>> >
>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>  Also, programmatically, what should I be doing differently?  (For example,
>>> should I be setting the minimum number of splits when reading the text
>>> file?  If so, what would be a good default?).
>>> >
>>> > I apologize for what I'm sure are very naive questions.  I think Spark
>>> is a fantastic project and have enjoyed working with it, but I'm still very
>>> much a newbie and would appreciate any help you all can provide (as well as
>>> any 'rules-of-thumb' or best practices I should be following).
>>> >
>>> > Thanks,
>>> > Tim Perrigo

View raw message