spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tathagata Das (JIRA)" <>
Subject [jira] [Commented] (SPARK-4835) Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException during checkpoint recovery
Date Sat, 13 Dec 2014 10:07:13 GMT


Tathagata Das commented on SPARK-4835:

I wonder whether there is a semantically cleaner way of doing this. In streaming we know which
batches might have been already processed earlier because we store in the checkpoint the batches
that were generated and queued. So upon recovery, the validation could be disabled only for
those batches that were queue before failure, and not any of the subsequent batches. 
What do you think?

> Streaming saveAs*HadoopFiles() methods may throw FileAlreadyExistsException during checkpoint
> ------------------------------------------------------------------------------------------------------
>                 Key: SPARK-4835
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Josh Rosen
>            Assignee: Tathagata Das
>            Priority: Critical
> While running (a slightly modified version of) the "recovery with saveAsHadoopFiles operation"
test in the streaming CheckpointSuite, I noticed the following error message in the streaming
driver log:
> {code}
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler:
Added jobs for time 1500 ms
> 14/12/12 17:42:50.687 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO RecurringTimer:
Started timer for JobGenerator at time 2000
> 14/12/12 17:42:50.688 INFO JobScheduler:
Starting job streaming job 1500 ms.0 from job set of time 1500 ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobGenerator:
Restarted JobGenerator at 2000 ms
> 14/12/12 17:42:50.688 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO JobScheduler:
Started JobScheduler
> 14/12/12 17:42:50.689 INFO JobScheduler:
Starting job streaming job 1500 ms.1 from job set of time 1500 ms
> 14/12/12 17:42:50.689 ERROR JobScheduler:
Error running job streaming job 1500 ms.0
> org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/var/folders/0k/2qp2p2vs7bv033vljnb8nk1c0000gn/T/1418434967213-0/-1500.result
already exists
> 	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(
> 	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1045)
> 	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:944)
> 	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:677)
> 	at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9.apply(PairDStreamFunctions.scala:675)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at
> 	at org.apache.spark.streaming.scheduler.JobScheduler$
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(
> 	at java.util.concurrent.ThreadPoolExecutor$
> 	at
> 14/12/12 17:42:50.691 pool-12-thread-1 INFO SparkContext: Starting job: apply at Transformer.scala:22
> {code}
> Spark Streaming's {{saveAsHadoopFiles}} method calls Spark's {{rdd.saveAsHadoopFile}}
method.  The Spark method, in turn, called {{PairRDDFunctions.saveAsHadoopDataset()}}, which
has error-checking to ensure that the output directory does not already exist:
> {code}
>     if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
>       // FileOutputFormat ignores the filesystem parameter
>       val ignoredFs = FileSystem.get(hadoopConf)
>       hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
>     }
> {code}
> If Spark Streaming recovers from a checkpoint and re-runs the last batch in the checkpoint,
then {{saveAsHadoopDataset}} will have been called twice with the same output path.  If the
output path exists from the first, pre-recovery run, then the recovery will fail.
> This seems like it could be a pretty serious issue: imagine that a streaming job fails
partway through a save() operation, then recovers: in this case, the existing directory will
prevent us from ever recovering and finishing the save().
> Fortunately, this should be simple to fix: we should disable the existing directory checks
for output operations called by streaming jobs.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message