spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Spark Enthusiast <sparkenthusi...@yahoo.in>
Subject Spark
Date Tue, 25 Aug 2015 04:52:03 GMT
I was running a Spark Job to crunch a 9GB apache log file When I saw the following error:


15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
ShuffleMapTask(37, 40), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Resubmitted ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Resubmitted ShuffleMapTask(37, 84), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Resubmitted ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Resubmitted ShuffleMapTask(37, 48), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Resubmitted ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO scheduler.DAGScheduler:
Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying
to remove executor 29 from BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor:
Removing block manager BlockManagerId(29, ip-10-150-137-100.ap-southeast-1.compute.internal,
39411)
                      .                      .Encountered Exception
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.:
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)
                    .                    .
Looking further, it seems like takeOrdered (called by my application) uses collect() internally
and hence drains out all the Drive memory.
line 361, in top10EndPoints    topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1
* s[1])  File "/home/hadoop/spark/python/pyspark/rdd.py", line 1174, in takeOrdered   
return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)  File
"/home/hadoop/spark/python/pyspark/rdd.py", line 739, in reduce    vals = self.mapPartitions(func).collect() 
File "/home/hadoop/spark/python/pyspark/rdd.py", line 713, in collect    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538,
in __call__    self.target_id, self.name)  File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value    format(target_id, '.', name), value)
How can I rewrite this code



endpointCounts = (access_logs                  .map(lambda log: (log.endpoint, 1))       
          .reduceByKey(lambda a, b : a + b))
#Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, count2), ....]
topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])

so that this error does not happen?
Mime
View raw message