spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: An exception about broadcast in concurrent environment
Date Wed, 31 Dec 2014 01:33:17 GMT
This should be a bug about thread safety, could you create a jira to track
it? thanks for reporting this.

On Thu, Dec 25, 2014 at 8:29 PM, 净`L <lj_51fun@foxmail.com> wrote:

> added the correct attachment.
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "净`L";<lj_51fun@foxmail.com>;
> *发送时间:* 2014年12月26日(星期五) 中午12:26
> *收件人:* "user"<user@spark.apache.org>;
> *抄送:* "净`L"<lj_51fun@foxmail.com>;
> *主题:* An exception about broadcast in concurrent environment
>
> Hi, all
>
> When I use broadcast in concurrent environment, An exception about
> broadcast not loaded was threw accidently, and the exception occurs more
> frequent if use more threads.
>
> Following is an source example using python.
>
> def *do_process*(x, w):
>
>     return x * w.value
>
> def *func*(name, rdd, conf):
>
>     new_rdd = rdd.map(lambda x : do_process(x, conf))
>
>     total = new_rdd.reduce(lambda x, y : x + y)
>
>     count = rdd.count()
>
>     print name, 1.0 * total / count
>
> if __name__ == "__main__":
>
>     import threading
>
>     sc = SparkContext(appName="SparkServerMode")
>
>     data_rdd = sc.parallelize(range(0,1000), 1)
>
>     conf1 = sc.broadcast(1)
>
>     conf2 = sc.broadcast(2)
>
>     conf3 = sc.broadcast(3)
>
>     t1 = threading.Thread(target=func, args = ["T1", data_rdd, conf1])
>
>     t2 = threading.Thread(target=func, args = ["T2", data_rdd, conf2])
>
>     t3 = threading.Thread(target=func, args = ["T3", data_rdd, conf3])
>
>     t1.start()
>
>     t2.start()
>
>     t3.start()
>
>
>
>     t1.join()
>
>     t2.join()
>
>     t3.join()
>
>          if I run the program, unfortunately, many times I will get an
> error like following content.
>
> Exception: (Exception("Broadcast variable '1' not loaded!",), <function
> _from_id at 0x02987430>, (1L,))
>
>
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>
>        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>
>        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>
>        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>
>        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>
>        org.apache.spark.scheduler.Task.run(Task.scala:54)
>
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>
>
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>        java.lang.Thread.run(Thread.java:744)
>
>
>
>          To track origin of the exception, I tried to debug the source
> files of pyspark and spark through printing log. Finally, I found a *possible
> *reason to endure the exception, during serializing a task, the dependent
> broadcast was not be serialized correctly.
>
>          Following picture describes the reason why I think the exception
> was endured by serializing the task.
>
>  ‍
>
> The attachment contains the python source, a log example and the picture
> above.
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Mime
View raw message