added the correct attachment.


------------------ 原始邮件 ------------------
发件人: "净`L";<lj_51fun@foxmail.com>;
发送时间: 2014年12月26日(星期五) 中午12:26
收件人: "user"<user@spark.apache.org>;
抄送: "净`L"<lj_51fun@foxmail.com>;
主题: An exception about broadcast in concurrent environment

Hi, all

When I use broadcast in concurrent environment, An exception about broadcast not loaded was threw accidently, and the exception occurs more frequent if use more threads.

Following is an source example using python.

def do_process(x, w):

    return x * w.value

def func(name, rdd, conf):

    new_rdd = rdd.map(lambda x : do_process(x, conf))

    total = new_rdd.reduce(lambda x, y : x + y)

    count = rdd.count()

    print name, 1.0 * total / count

if __name__ == "__main__":

    import threading

    sc = SparkContext(appName="SparkServerMode")

    data_rdd = sc.parallelize(range(0,1000), 1)

    conf1 = sc.broadcast(1)

    conf2 = sc.broadcast(2)

    conf3 = sc.broadcast(3)

    t1 = threading.Thread(target=func, args = ["T1", data_rdd, conf1])

    t2 = threading.Thread(target=func, args = ["T2", data_rdd, conf2])

    t3 = threading.Thread(target=func, args = ["T3", data_rdd, conf3])

    t1.start()

    t2.start()

    t3.start()

 

    t1.join()

    t2.join()

    t3.join()

         if I run the program, unfortunately, many times I will get an error like following content.

Exception: (Exception("Broadcast variable '1' not loaded!",), <function _from_id at 0x02987430>, (1L,))

 

       org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)

       org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)

       org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)

       org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

       org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

       org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

       org.apache.spark.scheduler.Task.run(Task.scala:54)

       org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

       java.lang.Thread.run(Thread.java:744)

 

         To track origin of the exception, I tried to debug the source files of pyspark and spark through printing log. Finally, I found a possible reason to endure the exception, during serializing a task, the dependent broadcast was not be serialized correctly.

         Following picture describes the reason why I think the exception was endured by serializing the task.

 

The attachment contains the python source, a log example and the picture above.