spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Svend Vanderveken <svend.vanderve...@gmail.com>
Subject Re: Systematic error when re-starting Spark stream unless I delete all checkpoints
Date Fri, 26 Sep 2014 13:09:09 GMT
Hi all,

I apologise for re-posting this, I realise some mail systems are filtering
all the code samples from the original post.

I would greatly appreciate any pointer regarding, this issue basically
renders spark streaming not fault-tolerant for us.

Thanks in advance,

S



---

"
I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could
be a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS.

The context is (re)-created at startup as follows:

def streamContext() = {

    def newContext() = {
      val ctx = new StreamingContext(sparkConf, Duration(10000))
      ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
      ctx
    }

    StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
newContext)
  }


And the start-up and shutdown of the stream is handled as follows:

try {

    val sparkContext = streamContext()

    [.. build stream here...]

    sparkContext.start()
    sparkContext.awaitTermination()

  } catch {
      case e: Throwable =>
        log.error("shutting down tabulation stream...", e)
        sparkContext.stop()
        log.info("...waiting termination...")
        sparkContext.awaitTermination()
        log.info("...tabulation stream stopped")
  }



When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 11 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
-rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
-rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
-rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
-rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
-rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 2 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542


(checkpoint clean-up seems to happen since the stream ran for much more
than 5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created:

09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp -
shutting down tabulation stream...
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not
been initialized
        at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
        at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
        at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444)
~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]



mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 12 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
-rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
-rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
-rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
-rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
-rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
-rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk


Now if I delete all older checkpoints and keep only the most recent one:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 3 items
drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
-rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
-rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk


I end up with this (kafka?) actor non unique name error.

10:07:25.088 [Result resolver thread-0] WARN
o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID
73, vm21-hulk-priv.mtl.mnubo.com):
akka.actor.InvalidActorNameException: actor name
[Receiver-0-1411654045063] is not unique!
        akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
        akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
        akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
        akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
        akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
        akka.actor.ActorCell.attachChild(ActorCell.scala:338)
        akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
        org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
        org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


If I delete the checkpoint folder the stream starts successfully (but I
lose my ongoing stream state, obviously)

We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
with CDH 5.1.0 and Hive:

sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
./make-distribution.sh --tgz --skip-java-test
-Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive


Any comment or suggestion would be greatly appreciated.
"

On Thu, Sep 25, 2014 at 4:20 PM, Svend <svend.vanderveken@gmail.com> wrote:

> I experience spark streaming restart issues similar to what is discussed in
> the 2 threads below (in which I failed to find a solution). Could anybody
> let me know if anything is wrong in the way I start/stop or if this could
> be
> a spark bug?
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>
> My stream reads a Kafka topic, does some processing involving an
> updatStateByKey and saves the result to HDFS.
>
> The context is (re)-created at startup as follows:
>
>
>
> And the start-up and shutdown of the stream is handled as follows:
>
>
>
>
> When starting the stream for the first time (with spark-submit), the
> processing happens successfully, folders are created on the target HDFS
> folder and streaming stats are visible on http://sparkhost:4040/streaming.
>
> After letting the streaming work several minutes and then stopping it
> (ctrl-c on the command line), the following info is visible in the
> checkpoint folder:
>
>
>
> (checkpoint clean-up seems to happen since the stream ran for much more
> than
> 5 times 10 seconds)
>
> When re-starting the stream, the startup fails with the error below,
> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
> added in the target folder and no new checkpoint are created:
>
>
>
>
>
>
> Now if I delete all older checkpoints and keep only the most recent one:
>
>
>
> I end up with this (kafka?) actor non unique name error.
>
>
>
> If I delete the checkpoint folder the stream starts successfully (but I
> lose
> my ongoing stream state, obviously)
>
> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
> with
> CDH 5.1.0 and Hive:
>
>
>
> Any comment or suggestion would be greatly appreciated.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message