spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mosharaf Chowdhury <mosharafka...@gmail.com>
Subject Re: Problems with broadcast large datastructure
Date Mon, 13 Jan 2014 06:04:36 GMT
Size calculation is correct, but broadcast happens from the driver to the
workers.

btw, your code is broadcasting 400MB 30 times, which are not being evicted
from the cache fast enough, which, I think, is causing blockManagers to run
out of memory.

On Sun, Jan 12, 2014 at 9:34 PM, lihu <lihu723@gmail.com> wrote:

> Yes, I just using the code snippet from the broadcast example, and using
> the spark-shell run this code.
> I thought the broadcast is driver send to the executor, and the executor
> will send back,  is there some wrong for  calculate the broadcast size?
>
> *val MAX_ITER = 30*
> *val num = 100000000*
> *var arr1 = new Array[Int](num)*
> *    for (i <- 0 until arr1.length) {*
> *      arr1(i) = i*
> *}*
> *for (i <- 0 until MAX_ITER) {*
> *      println("Iteration " + i)*
> *      println("===========")*
> *      val startTime = System.nanoTime*
> *      val barr1 = sc.broadcast(arr1)*
> *      sc.parallelize(1 to 10).foreach {*
> *        i => println(barr1.value.size)*
> *}*
> *   println("Iteration %d took %.0f milliseconds".format(i,
> (System.nanoTime - startTime) / 1E6))*
> * }*
>
>  I also try the TorrentBroadcast , it faster than the default, this is
> very helpful, thanks again!
> but I also get stuck during iteration, here is the log info from the
> master, it seem that this is the heartbeat problem.
>
> *[sparkMaster-akka.actor.default-dispatcher-26] WARN
>  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered
> worker worker-20140102202153-m023.corp.typingx.me-8139*
> *[sparkMaster-akka.actor.default-dispatcher-26] WARN
>  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered
> worker worker-20140102202153-m023.corp.typingx.me-40447*
>
>
>
>
> On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury <
> mosharafkabir@gmail.com> wrote:
>
>> broadcast is supposed to send data from the driver to the executors and
>> not the other direction. can you share the code snippet you are using to
>> broadcast?
>>
>> --
>> Mosharaf Chowdhury
>> http://www.mosharaf.com/
>>
>>
>> On Sun, Jan 12, 2014 at 8:52 PM, lihu <lihu723@gmail.com> wrote:
>>
>>> In my opinion, the spark system is for big data, then 400M seem not big .
>>>
>>> I read slides about the broadcast, in my understanding, the executor
>>> will send the broadcast variable back to the driver. each executor own a
>>> complete copy of the broadcast variable.
>>>
>>> In my experiment, I have 20 machine, each machine own 2 executor, and I
>>> used the default parallelize, which is 8, so there  320  tasks in one stage
>>> in total.
>>>
>>> then the workers will send 320*(400M/8)=16G data back to the driver,
>>> this seem very big. but I get from log that after serialize, the data size
>>> send back to driver is just 446 byte in each task.
>>>
>>> *org.apache.spark.storage.BlockManager - Found block broadcast_5 locally*
>>> *org.apache.spark.executor.Executor - Serialized size of result for 1901
>>> is 446*
>>> *org.apache.spark.executor.Executor - Sending result for 1901 directly
>>> to driver*
>>>
>>> So the total data send back to driver just 320*446byte=142720byte. this
>>> is really small in my opinion.
>>>
>>> ---------------
>>> In summary
>>>
>>> 1.  Spark system is for big data, then 400M is not big in my opinion.
>>> 2.  I do not sure if my understanding for the broadcast is right, then
>>> the data send back to the driver may bigger?
>>> 3.  I just wonder why the serialize rate is so hight, it can serialize
>>> the 400/8=50M to just 446 byte?
>>> 4.  If it is my fault that do not run the broadcast experiment in the
>>> right way,  then I I hope the spark community can give more examples about
>>> the broadcast, this may benefit many users.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <
>>> buendia360@gmail.com> wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 13, 2014 at 4:17 AM, lihu <lihu723@gmail.com> wrote:
>>>>
>>>>> I have occurred the same problem with you .
>>>>> I have a node of 20 machines, and I just run the broadcast example,
>>>>> what I do is just change the data size in the example, to 400M, this
is
>>>>> really a small data size.
>>>>>
>>>>
>>>> Is 400 MB a really small size for broadcasting?
>>>>
>>>> I had the impression that broadcast is for object much much smaller,
>>>> about less than 10 MB.
>>>>
>>>>
>>>>> but I occurred the same problem with you .
>>>>> *So I wonder maybe the broadcast capacity is weak in the spark system?*
>>>>>
>>>>>
>>>>> here is my config:
>>>>>
>>>>> *SPARK_MEM=12g*
>>>>> *SPARK_MASTER_WEBUI_PORT=12306*
>>>>> *SPARK_WORKER_MEMORY=12g*
>>>>> *SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g -Dspark.akka.timeout=600
>>>>>  -Dspark.local.dir=/disk3/lee/tmp -Dspark.worker.timeout=600
>>>>> -Dspark.akka.frameSize=10000 -Dspark.akka.askTimeout=300
>>>>> -Dspark.storage.blockManagerTimeoutIntervalMs=100000
>>>>> -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G
>>>>> -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <
>>>>> khanderao.kand@gmail.com> wrote:
>>>>>
>>>>>> If your object size > 10MB you may need to change
>>>>>> spark.akka.frameSize.
>>>>>>
>>>>>> What is your spark, spark.akka.timeOut ?
>>>>>>
>>>>>> did you change   spark.akka.heartbeat.interval  ?
>>>>>>
>>>>>> BTW based on large size getting broadcasted across 25 nodes, you
may want to consider the frequency of such transfer and evaluate alternative patterns.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter <ssc@apache.org>wrote:
>>>>>>
>>>>>>> Spark repeatedly fails broadcast a large object on a cluster
of 25
>>>>>>> machines for me.
>>>>>>>
>>>>>>> I get log messages like this:
>>>>>>>
>>>>>>> [spark-akka.actor.default-dispatcher-4] WARN
>>>>>>> org.apache.spark.storage.BlockManagerMasterActor - Removing
>>>>>>> BlockManager
>>>>>>> BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with
no
>>>>>>> recent
>>>>>>> heart beats: 134689ms exceeds 45000ms
>>>>>>>
>>>>>>> Is there something wrong with my config? Do I have to increase
some
>>>>>>> timeout?
>>>>>>>
>>>>>>> Thx,
>>>>>>> Sebastian
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> *Best Wishes!*
>
> *Li Hu(李浒) | Graduate Student*
>
> *Institute for Interdisciplinary Information Sciences(IIIS
> <http://iiis.tsinghua.edu.cn/>)*
> *Tsinghua University, China*
>
> *Email: lihu723@gmail.com <lihu723@gmail.com>*
> *Tel  : +86 15120081920 <%2B86%2015120081920>*
> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
> <http://iiis.tsinghua.edu.cn/zh/lihu/>*
>
>
>

Mime
View raw message