spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: Help with Initial Cluster Configuration / Tuning
Date Tue, 22 Oct 2013 15:14:53 GMT
There's no need to guess at that.  The docs tell you directly:

def countByValue(): Map[T, Long]

Return the count of each unique value in this RDD as a map of (value,
count) pairs. The final combine step happens locally on the master,
equivalent to running a single reduce task.



On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <shay@1618labs.com> wrote:

> Hi Matei,
>
> I've seen several memory tuning queries on this mailing list, and also
> heard the same kinds of queries at the spark meetup. In fact the last
> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
> tuning is still a mystery".
>
> I certainly had lots of issues in when I first started. From memory issues
> to gc issues, things seem to run fine until you try something with 500GB of
> data etc.
>
> I was wondering if you could write up a little white paper or some guide
> lines on how to set memory values, and what to look at when something goes
> wrong? Eg. I would never gave guessed that countByValue happens on a single
> machine etc.
> On Oct 21, 2013 6:18 PM, "Matei Zaharia" <matei.zaharia@gmail.com> wrote:
>
>> Hi there,
>>
>> The problem is that countByValue happens in only a single reduce task --
>> this is probably something we should fix but it's basically not designed
>> for lots of values. Instead, do the count in parallel as follows:
>>
>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>
>> If this still has trouble, you can also increase the level of parallelism
>> of reduceByKey by passing it a second parameter for the number of tasks
>> (e.g. 100).
>>
>> BTW one other small thing with your code, flatMap should actually work
>> fine if your function returns an Iterator to Traversable, so there's no
>> need to call toList and return a Seq in ngrams; you can just return an
>> Iterator[String].
>>
>> Matei
>>
>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tperrigo@gmail.com> wrote:
>>
>> > Hi everyone,
>> > I am very new to Spark, so as a learning exercise I've set up a small
>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
>> I'm hoping to use to calculate ngram frequencies from text files of various
>> sizes (I'm not doing anything with them; I just thought this would be
>> slightly more interesting than the usual 'word count' example).  Currently,
>> I'm trying to work with a 1GB text file, but running into memory issues.
>>  I'm wondering what parameters I should be setting (in spark-env.sh) in
>> order to properly utilize the cluster.  Right now, I'd be happy just to
>> have the process complete successfully with the 1 gig file, so I'd really
>> appreciate any suggestions you all might have.
>> >
>> > Here's a summary of the code I'm running through the spark shell on the
>> master:
>> >
>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
>> ")).map(_.trim).toList
>> > }
>> >
>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>> >
>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>> >
>> > So far so good; the problems come during the reduce phase.  With small
>> files, I was able to issue the following to calculate the most frequently
>> occurring trigram:
>> >
>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>> b:(String, Long)) => if (a._2 > b._2) a else b)
>> >
>> > With the 1 gig file, though, I've been running into OutOfMemory errors,
>> so I decided to split the reduction to several steps, starting with simply
>> issuing countByValue of my "mapped" RDD, but I have yet to get it to
>> complete successfully.
>> >
>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>> at straws; I'm not sure what a "proper" value would be).  What properties
>> should I be setting for a job of this size on a cluster of 3 m1.large
>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>  Also, programmatically, what should I be doing differently?  (For example,
>> should I be setting the minimum number of splits when reading the text
>> file?  If so, what would be a good default?).
>> >
>> > I apologize for what I'm sure are very naive questions.  I think Spark
>> is a fantastic project and have enjoyed working with it, but I'm still very
>> much a newbie and would appreciate any help you all can provide (as well as
>> any 'rules-of-thumb' or best practices I should be following).
>> >
>> > Thanks,
>> > Tim Perrigo
>>
>>

Mime
View raw message