spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "净`L" <lj_51...@foxmail.com>
Subject An exception about broadcast in concurrent environment
Date Fri, 26 Dec 2014 04:26:30 GMT
Hi, all
 
When I use broadcast in concurrent environment, An exception about broadcast not loaded was
threw accidently, and the exception occurs more frequent if use more threads.
 
Following is an source example using python.
         
def do_process(x, w):
   
    return x * w.value
   
def func(name, rdd, conf):
   
    new_rdd = rdd.map(lambda x :   do_process(x, conf))
   
    total = new_rdd.reduce(lambda x, y : x + y)
   
    count = rdd.count()
   
    print name, 1.0 * total / count
   
if __name__ == "__main__":
   
    import threading
   
    sc = SparkContext(appName="SparkServerMode")
   
    data_rdd = sc.parallelize(range(0,1000), 1)
   
    conf1 = sc.broadcast(1)
   
    conf2 = sc.broadcast(2)
   
    conf3 = sc.broadcast(3)
   
    t1 = threading.Thread(target=func, args =   ["T1", data_rdd, conf1])
   
    t2 = threading.Thread(target=func, args =   ["T2", data_rdd, conf2])
   
    t3 = threading.Thread(target=func, args =   ["T3", data_rdd, conf3])
   
    t1.start()
   
    t2.start()
   
    t3.start()
   
 
   
    t1.join()
   
    t2.join()
   
    t3.join()
     
  
         if I run the program, unfortunately, many times I will get an error like following
content. 
         
Exception:   (Exception("Broadcast variable '1' not loaded!",), <function   _from_id at
0x02987430>, (1L,))
   
 
   
          org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
   
       org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
   
          org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
   
          org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   
          org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   
          org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
   
          org.apache.spark.scheduler.Task.run(Task.scala:54)
   
          org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
   
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   
          java.lang.Thread.run(Thread.java:744)
     
  
 
 
         To track origin of the exception, I tried to debug the source files of pyspark and
spark through printing log. Finally, I found a possible reason to endure the exception, during
serializing a task, the dependent broadcast was not be serialized correctly. 
 
         Following picture describes the reason why I think the exception was endured by serializing
the task.
 
 ‍

The attachment contains the python source, a log example and the picture above.
Mime
View raw message