spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Penny Espinoza <pesp...@societyconsulting.com>
Subject spark-streaming "Could not compute split" exception
Date Tue, 09 Sep 2014 20:13:53 GMT
Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs successfully using master
= local[4].  However, when I run it on a Hadoop 2.2 EMR cluster using master yarn-client,
it fails after running for about 5 minutes.  My main method does something like this:


  1.  gets streaming context
  2.  creates a DStream from KafkaUtils.createStream (let’s call this a)
  3.  creates another DStream from a.flatMap (let’s call this b)
  4.  creates another DStream from b.updateStateByKey (c)
  5.  creates another DStream from c.flatMap (d)
  6.  runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save data
to Cassandra

Here’s the error message:

14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1410293154000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

According to logs, the block referenced in the error message was added to memory only about
10 seconds before this error appears, and there is no evidence in the log of any blocks being
cleared from memory, or of it running out of memory.  Below is a snippet of the log:

14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 in memory on
domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 KB, free: 490.8 MB)
… ( several other messages like the one above, but for different blocks)
14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 in memory on
domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, free: 457.3 MB)
14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 blocks
14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time 1410293160000 ms to
StorageLevel(false, true, false, false, 1) at time 1410293160000 ms
14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time 1410293160000 ms for
checkpointing at time 1410293160000 ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time 1410293160000 ms
14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time 1410293160000
ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for time 1410293160000
ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for time 1410293160000
ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job 1410293160000 ms.0
from job set of time 1410293160000 ms
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for time 1410293160000
ms to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000'
14/09/09 20:06:00 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap at FlatMappedDStream.scala:35)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203)
with 2 output partitions (allowLocal=false)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage 23(saveAsNewAPIHadoopFile
at CassandraRDDFunctions.scala:203)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 (FlatMappedRDD[54] at flatMap
at FlatMappedDStream.scala:35), which has no missing parents
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293110000.bk
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time 1410293160000 ms saved
to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000', took
4351 bytes and 55 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Clearing checkpoint data for time 1410293160000
ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Cleared checkpoint data for time 1410293160000
ms
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting 21 missing tasks from Stage 24 (FlatMappedRDD[54]
at flatMap at FlatMappedDStream.scala:35)
14/09/09 20:06:00 INFO cluster.YarnClientClusterScheduler: Adding task set 24.0 with 21 tasks
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:0 as TID 91 on executor
2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:0 as 1437 bytes in 0
ms
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:1 as TID 92 on executor
2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:1 as 1437 bytes in 0
ms
14/09/09 20:06:04 WARN scheduler.TaskSetManager: Lost TID 91 (task 24.0:0)
14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1410293154000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)


I have seen earlier threads recommend getting around this problem by setting spark.streaming.unpersist
to false, but that had no effect on the error.  I have also tried making some changes to shuffle-related
settings. My conf from logs is below.

14/09/09 20:04:32 INFO spark.SparkContext: Spark configuration:
spark.app.name=foo
spark.cleaner.ttl=86400
spark.home=/home/hadoop/spark
spark.jars=file:/home/hadoop/rna/rna-spark-streaming-assembly-1.0-SNAPSHOT.jar
spark.logConf=true
spark.master=yarn-client
spark.shuffle.consolidateFiles=true
spark.shuffle.memoryFraction=0.4
spark.storage.memoryFraction=0.5
spark.streaming.unpersist=false
spark.yarn.dist.archives=file:/home/hadoop/rna/lib/jackson-core-asl-1.8.9.jar,file:/home/hadoop/rna/lib/jackson-mapper-asl-1.8.9.jar

Any idea of what might be happening to this block and how I might eliminate this error?


thanks
p



Mime
View raw message