spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nick Pentreath" <nick.pentre...@gmail.com>
Subject Re: OOM - Help Optimizing Local Job
Date Tue, 21 Jan 2014 20:14:45 GMT
mapPartitions expects an iterable return value. If within each partition you're aggregating
the hash map, you can use your foreach operation (or map) to aggregate values In the hashmap,
and then put iterator.single (I think, can't quite remember the syntax).


Thus each mapPartitions will return an iterator containing a single hashmap, that in turn
contains the aggregate for the partition. Then perform a reduceByKey operation on the (Int,
Int) keys with hashmap values for a combined count.




You may want to take a look at Algebird ( https://github.com/twitter/algebird) which contains
monoids for map aggregation which may be useful.



—
Sent from Mailbox for iPhone

On Tue, Jan 21, 2014 at 10:05 PM, Brad Ruderman <bradruderman@gmail.com>
wrote:

> Hi Tathagata-
> Thanks for your help. I appreciate your suggestions. I am having trouble
> following the code for the mapPartitions
> According to the documentation I need to pass an a function of type
> iter[t], thus I receive an error on the foreach.
> var textFile = sc.textFile("input.txt")
> textFile.mapPartitions(lines => {
> lines.foreach( line =>  {
> println(line)
>  })
> })
> <console>:18: error: type mismatch;
>  found   : Unit
>  required: Iterator[?]
> Error occurred in an application involving default arguments.
>               lines.foreach( line =>  {
> Therefore I think I need to run a map, but then not sure if I should reduce
> within the mapPartitions lambda or outside.
>  val counts = textFile.mapPartitions(lines => {
> val hashmap = new HashMap[(Int,Int),Int]()
>  lines.map( line =>  {
> val array = getArray(line)
> for (x <- 0 to array.length - 1){
>  for (y <- 0 to array.length - 1){
> hashmap((array(x), array(y))) = 1
>  }
> }
> })
> })
> In this case the hashmap is not accessible outside the Partitions, since
> when I save to file it is empty. Could you please clarify how to use
> mapPartitions()?
> Thanks,
> Brad
> On Mon, Jan 20, 2014 at 6:41 PM, Tathagata Das
> <tathagata.das1565@gmail.com>wrote:
>> Well, right now it is quite parallelized as each line is processed
>> independent of each other, with a single reduce doing the final counting.
>> You can optimize the combination generation by several ways.
>>
>> 1. I may be missing something, but do you need to sort the list to
>> aSorted? That maybe costly for large lists.
>>
>> 2. Furthermore, dropping from the sorted sequence is probably costly. Just
>> walking through the list with indices is likely to me much more efficient.
>>
>> for (xi <- 0 until length) {
>>        for (yi <- xi + 1 until length) {
>>    ...
>> } }
>>
>> 3. Creating and destroying hashmap's for every array of number, that is,
>> for every line in the file, is not very efficient. Using mapPartition is
>> better, as you can create one hashmap for the entire partition of data, and
>> process the whole partition of the file at one go.
>>
>> textFile.mapPartition(lines => {
>>     val hashmap = ...
>>     lines.foreach(line => {
>>       val array = getArray(line)
>>       // add combinations from array
>>     })
>>     hashmap
>> })
>>
>> Hope this helps.
>>
>> TD
>>
>>
>> On Mon, Jan 20, 2014 at 4:53 PM, Brad Ruderman <bradruderman@gmail.com>wrote:
>>
>>> Thanks Again Tathagata.
>>>
>>> I successfully launched an EC2 cluster with 5 nodes. I have split the
>>> tasks out and found a lot of time is being spent running the:
>>>
>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] = {
>>>  val comboMap = new HashMap[(Int,Int),Int]()
>>> var aSorted = a.sorted
>>> for (x <- 0 to aSorted.length - 1){
>>>  var b = aSorted(0)
>>> aSorted = aSorted.drop(1)
>>> for (y <- 0 to aSorted.length - 1){
>>>  comboMap((b, aSorted(y))) = 1
>>> }
>>> }
>>> return comboMap
>>> }
>>>
>>> from the:
>>> val counts = textFile.map(line => getArray(line)).flatMap(a =>
>>> getCombinations(a)).reduceByKey(_ + _)
>>>
>>> Essentially I am trying to perform a "semi-cartesian" product, element
>>> array element contained within a larger array is joined against all other
>>> elements within that array. So if I have [[1,2,3],[2,3,5]] that would
>>> become:
>>> 1,2
>>> 1,3
>>> 2,3
>>> 2,3
>>> 2,5
>>> 3,5
>>>
>>> which would reduce down to
>>> 1,2 1
>>> 1,3 1
>>> 2,3 2
>>> 2,5
>>> 3,5
>>>
>>> I am wondering if there is a faster method that could be better
>>> parallelized then what I am doing.
>>>
>>> Thanks,
>>> Brad
>>>
>>> On Sun, Jan 19, 2014 at 2:25 PM, Tathagata Das <
>>> tathagata.das1565@gmail.com> wrote:
>>>
>>>> 1) System properties must be set before creating the SparkContext. When
>>>> using the spark shell (which creates the context automatically), you have
>>>> to set it before starting the shell. See the configuration<http://spark.incubator.apache.org/docs/latest/configuration.html#environment-variables>page
to see how to set java properties through environment variables.
>>>>
>>>> 2) Regarding correctness of computation, if it works locally it should
>>>> work on the cluster. Regarding parallelism, Spark runs only ask many task
>>>> simultaneously as there are number of cores that the SparkContext has asked
>>>> for. So on your local machine, it doesnt help much to increase the number
>>>> of tasks too much as it only has 4 cores (or 8 hyperthreaded one, maybe).
>>>> Having 10x more tasks with each task doing 1/10th the computation ends up
>>>> taking more or less the same amount of processing time. With more cores in
>>>> a cluster, it will parallelize further. The number of tasks should be
>>>> configured to be about 2-3x the number of cores in the cluster. See the
>>>> tuning<http://spark.incubator.apache.org/docs/latest/tuning.html#level-of-parallelism>page.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Sun, Jan 19, 2014 at 1:30 PM, Brad Ruderman <bradruderman@gmail.com>wrote:
>>>>
>>>>> Hi Tathagata-
>>>>> Thanks for your response. I appreciate your help. A few follow-up
>>>>> questions:
>>>>>  1) I updated to master and tried to set the system properties. However
>>>>> when I view the properties in the web console, I don't see these listed.
>>>>> Also I see the that in activity monitor:  org.apache.spark.repl.Main
is
>>>>> only taking 758mb.
>>>>>
>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>> res0: String = null
>>>>>
>>>>> scala> System.setProperty("spark.executor.memory","4gb")
>>>>> res1: String = 4gb
>>>>>
>>>>> Did something change in master that would prevent this from being set
>>>>> higher in standalone mode. My exact repo is:
>>>>>
>>>>> git clone git@github.com:apache/incubator-spark.git
>>>>> ./sbt/sbt assembly
>>>>> cd conf
>>>>> cp spark-env.sh.template spark-env.sh
>>>>> cp log4j.properties.template log4j.properties
>>>>> ./bin/spark-shell
>>>>>
>>>>> 2) My next steps are going to be to test this job in an AMZ EC2
>>>>> cluster, but is there anything I can do locally to ensure that it is
>>>>> properly built so  that it can be heavily parallelized. I tried increasing
>>>>> the level of parallelism as well via passing a parameter in the reduceByKey
>>>>> value, but I didn't see any differences in performance. My local computer
>>>>> also doesn't seem to suffer as a result (which I think it would. I am
>>>>> running a MB Pro Retina i7 + 16gb Ram). I am worried that the using
>>>>> ListBuffer or HashMap will significantly slow things down and there are
>>>>> better ways to do this.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Brad
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jan 18, 2014 at 7:11 PM, Tathagata Das <
>>>>> tathagata.das1565@gmail.com> wrote:
>>>>>
>>>>>> Hello Brad,
>>>>>>
>>>>>> The shuffle operation seems to be taking too much memory, more than
>>>>>> what your Java program can provide. I am not sure whether you have
already
>>>>>> tried or not, but there are few basic things you can try.
>>>>>> 1. If you are running a local standalone Spark cluster, you can set
>>>>>> the amount of memory that the worker can use by setting spark.executor.memory.
>>>>>> See Spark configuration<http://spark.incubator.apache.org/docs/latest/configuration.html>
>>>>>> .
>>>>>> 2. You can increase the number of partitions created from the text
>>>>>> file, and increase the number of reducers. This lowers the memory
usage of
>>>>>> each task. See Spark tuning guide<http://spark.incubator.apache.org/docs/latest/tuning.html#other-considerations>
>>>>>> .
>>>>>> 3. If you are still running out of memory, you can try our latest
Spark
>>>>>> 0.9 <https://github.com/apache/incubator-spark/tree/branch-0.9>(or
>>>>>> the master branch). Shuffles have been modified to automatically
spill to
>>>>>> disk when it needs more than available memory. 0.9 is undergoing
community
>>>>>> voting and will be released very soon.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 18, 2014 at 4:58 PM, Brad Ruderman <bradruderman@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi All-
>>>>>>> I am very new to Spark (and Scala) but I was hoping to get some
help
>>>>>>> creating a distributed job. Essentially I have a set of files
(8k at 300mb
>>>>>>> each in HDFS) which was created in Hive (Hadoop) with the form:
>>>>>>> Int
>>>>>>> Array<Map<int,string>>
>>>>>>>
>>>>>>> I want to create a spark job that reads these files and creates
a
>>>>>>> combination of the int array. Meaning it iterates over every
int, ignoring
>>>>>>> the string, in the Array<Map<int,string>> and produces
a [k,v] of
>>>>>>> (int_1,int_2), 1 Then it reduces by the k and sums the v to produce
a final
>>>>>>> result over a large dataset of :
>>>>>>> (int_1, int_2), count
>>>>>>>
>>>>>>> It is producing a set of all possible combinations for that given
>>>>>>> record. In order to do this I have locally downloaded a single
file and am
>>>>>>> running Spark locally on my computer to try to process a a single
file. The
>>>>>>> file is 382715 lines (~400mb), however some of the arrays can
be quite big.
>>>>>>>
>>>>>>> My Spark Job looks like the following:
>>>>>>>
>>>>>>> import collection.mutable.HashMap
>>>>>>> import collection.mutable.ListBuffer
>>>>>>>
>>>>>>> def getCombinations(a: List[Int]) : HashMap[(Int,Int),Int] =
{
>>>>>>>  val comboMap = new HashMap[(Int,Int),Int]()
>>>>>>> var aSorted = a.sorted
>>>>>>> for (x <- 0 to aSorted.length - 1){
>>>>>>>  var b = aSorted(0)
>>>>>>> aSorted = aSorted.drop(1)
>>>>>>> for (y <- 0 to aSorted.length - 1){
>>>>>>>  comboMap((b, aSorted(y))) = 1
>>>>>>> }
>>>>>>> }
>>>>>>> return comboMap
>>>>>>> }
>>>>>>>
>>>>>>> def getArray(line: String):List[Int] = {
>>>>>>> var a = line.split("\\x01")(1).split("\\x02")
>>>>>>> var ids = new ListBuffer[Int]
>>>>>>>  for (x <- 0 to a.length - 1){
>>>>>>> ids += Integer.parseInt(a(x).split("\\x03")(0))
>>>>>>> }
>>>>>>>  return ids.toList
>>>>>>> }
>>>>>>>
>>>>>>> val textFile = sc.textFile("slices.txt")
>>>>>>> val counts = textFile.map(line => getArray(line)).flatMap(a
=>
>>>>>>> getCombinations(a)).reduceByKey(_ + _)
>>>>>>> counts.saveAsTextFile("spark_test")
>>>>>>>
>>>>>>>
>>>>>>> I am using getArray() to process the text file from its hive
storage
>>>>>>> form, and then getCombinations() to create the k,v list then
finally
>>>>>>> reducing over this list. Eventually I will move this to a distributed
>>>>>>> cluster but I was hoping to determine if there is anyway to parallelize
or
>>>>>>> optimize this job for my local computer. Currently it fails with:
>>>>>>>
>>>>>>> java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java
heap
>>>>>>> space)
>>>>>>>
>>>>>>> org.apache.spark.util.AppendOnlyMap.growTable(AppendOnlyMap.scala:200)
>>>>>>> org.apache.spark.util.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:180)
>>>>>>> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:130)
>>>>>>> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:91)
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:444)
>>>>>>> Thanks,Brad
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
Mime
View raw message