spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
Date Mon, 02 Oct 2017 14:43:01 GMT

     [ https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen updated SPARK-22150:
------------------------------
    Target Version/s:   (was: 2.2.1)

> PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-22150
>                 URL: https://issues.apache.org/jira/browse/SPARK-22150
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2,
2.1.0, 2.1.1, 2.2.0
>         Environment: spark 2.2.0
> scala 2.11
>            Reporter: Sergey Zhemzhitsky
>
> PeriodicCheckpointer fails with FileNotFoundException in case of checkpointing dependant
RDDs (consider iterative algorithms), i.e. when the RDD to checkpoint depends on already checkpointed
RDD.
> Here is the exception
> {code}
> Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException:
File file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000
does not exist
> java.io.FileNotFoundException: File file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000
does not exist
> 	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
> 	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
> 	at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
> 	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
> 	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
> 	at scala.Option.map(Option.scala:146)
> 	at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
> 	at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
> {code}
> The issue seems to be in this [piece of code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]
> {code:java}
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>   && sc.getCheckpointDir.nonEmpty) {
>   // Add new checkpoint before removing old checkpoints.
>   checkpoint(newData)
>   checkpointQueue.enqueue(newData)
>   // Remove checkpoints before the latest one.
>   var canDelete = true
>   while (checkpointQueue.size > 1 && canDelete) {
>     // Delete the oldest checkpoint only if the next checkpoint exists.
>     if (isCheckpointed(checkpointQueue.head)) {
>       removeCheckpointFile()
>     } else {
>       canDelete = false
>     }
>   }
> }
> {code}
> Given that _checkpointQueue.head_ is checkpointed and materialized and _newData_ depends
on _checkpointQueue.head_, then the exception happens on action of RDDs representing _newData_



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message