spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Emre Sevinc <emre.sev...@gmail.com>
Subject Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
Date Wed, 04 Mar 2015 08:42:57 GMT
I've also tried the following:

    Configuration hadoopConfiguration = new Configuration();
    hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");

    JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
factory, false);


but I still get the same exception.

Why doesn't getOrCreate ignore that Hadoop configuration part (which
normally works, e.g. when not recovering)?

--
Emre


On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sevinc@gmail.com> wrote:

> Hello,
>
> I have a Spark Streaming application (that uses Spark 1.2.1) that listens
> to an input directory, and when new JSON files are copied to that directory
> processes them, and writes them to an output directory.
>
> It uses a 3rd party library to process the multi-line JSON files (
> https://github.com/alexholmes/json-mapreduce). You can see the relevant
> part of the streaming application at:
>
>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>
> When I run this application locally, it works perfectly fine. But then I
> wanted to test whether it could recover from failure, e.g. if I stopped it
> right in the middle of processing some files. I started the streaming
> application, copied 100 files to the input directory, and hit Ctrl+C when
> it has alread processed about 50 files:
>
> ...
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> [Stage
> 0:==========================================================================================================================>
> (65 + 4) / 100]
> ^C
>
> Then I started the application again, expecting that it could recover from
> the checkpoint. For a while it started to read files again and then gave an
> exception:
>
> ...
> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>  * * * hadoopConfiguration: itemSet
> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
> (TID 0)
> java.io.IOException: Missing configuration value for
> multilinejsoninputformat.member
>     at
> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>     at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Since in the exception it refers to a missing configuration
> "multilinejsoninputformat.member", I think it is about the following line:
>
>    ssc.ssc().sc().hadoopConfiguration().set("
> multilinejsoninputformat.member", "itemSet");
>
> And this is why I also log the value of it, and as you can see above, just
> before it gives the exception in the recovery process, it shows that "multilinejsoninputformat.member"
> is set to "itemSet". But somehow it is not found during the recovery.
> This exception happens only when it tries to recover from a previously
> interrupted run.
>
> I've also tried moving the above line into the "createContext" method, but
> still had the same exception.
>
> Why is that?
>
> And how can I work around it?
>
> --
> Emre Sevinç
> http://www.bigindustries.be/
>
>


-- 
Emre Sevinc

Mime
View raw message