spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sadhan Sood <sadhan.s...@gmail.com>
Subject Re: Too many failed collects when trying to cache a table in SparkSQL
Date Wed, 12 Nov 2014 19:50:47 GMT
This is the log output:

2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'

2014-11-12 19:07:17,455 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps

2014-11-12 19:07:17,756 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68

2014-11-12 19:07:18,292 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)

2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:07:22,916 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:10:04,112 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated

2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52

2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s

2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure

2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages

2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:10:08,989 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sadhan.sood@gmail.com> wrote:

> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for
shuffle 0
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
> 	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
>
>

Mime
View raw message