From "Mao, Wei"
Subject RE: Spark Streaming - graceful shutdown when stream has no more data
Date Fri, 26 Feb 2016 07:24:40 GMT
I would argue against making it configurable unless there is real production use case. If it’s
just for test, there are bunch of ways to achieve it. For example, you can mark if test streaming
is finished globally, and stop ssc on another thread when status of that mark changed.

Back to  original exception, blindly calling “Option.get” is always not a good practice.
It would be better to pre-validate or use getOption/getOrElse.


From: Cheng, Hao
Sent: Thursday, February 25, 2016 1:03 AM
To: Daniel Siegmann; Ashutosh Kumar
Cc: Hemant Bhanawat; Ted Yu; Femi Anthony; user
Subject: RE: Spark Streaming - graceful shutdown when stream has no more data

This is very interesting, how to shutdown the streaming job gracefully once no input data
for some time.

A doable solution probably you can count the input data by using the Accumulator, and anther
thread (in master node) will always to get the latest accumulator value, if there is no value
change from the accumulator for sometime, then shutdown the streaming job.

From: Daniel Siegmann
Sent: Wednesday, February 24, 2016 12:30 AM
To: Ashutosh Kumar <<>>
Cc: Hemant Bhanawat <<>>; Ted Yu
<<>>; Femi Anthony <<>>;
user <<>>
Subject: Re: Spark Streaming - graceful shutdown when stream has no more data

During testing you will typically be using some finite data. You want the stream to shut down
automatically when that data has been consumed so your test shuts down gracefully.
Of course once the code is running in production you'll want it to keep waiting for new records.
So whether the stream shuts down when there's no more data should be configurable.

On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar
Just out of curiosity I will like to know why a streaming program should shutdown when no
new data is arriving?  I think it should keep waiting for arrival of new records.

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat
A guess - parseRecord is returning None in some case (probaly empty lines). And then entry.get
is throwing the exception.
You may want to filter the None values from accessLogDStream before you run the map function
over it.

Hemant Bhanawat

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program.


On Feb 23, 2016, at 12:25 AM, Femi Anthony

I am working on Spark Streaming API and I wish to stream a set of pre-downloaded web log files
continuously to simulate a real-time stream. I wrote a script that gunzips the compressed
logs and pipes the output to nc on port 7777.

The script looks like this:


zipped_files=`find $BASEDIR -name "*.gz"`

for zfile in $zipped_files


  echo "Unzipping $zfile..."

  gunzip -c $zfile  | nc -l -p 7777 -q 20

I have streaming code written in Scala that processes the streams. It works well for the most
part, but when its run out of files to stream I get the following error in Spark:

16/02/19 23:04:35 WARN ReceiverSupervisorImpl:

Restarting receiver with delay 2000 ms: Socket data stream had no more data

16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:

Restarting receiver with delay 2000ms: Socket data stream had no more data

16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated to only 0 peer(s)
instead of 1 peers


16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)

java.util.NoSuchElementException: None.get

at scala.None$.get(Option.scala:313)

at scala.None$.get(Option.scala:311)

at com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

at com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully when it no longer
detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {

 def main(args: Array[String]) {

  val master = args(0)

  val conf = new


 // Create a StreamingContext with a n second batch size

  val ssc = new StreamingContext(conf, Seconds(10))

 // Create a DStream from all the input on port 7777

  val log = Logger.getLogger(getClass.getName)

  sys.ShutdownHookThread {<>("Gracefully stopping Spark Streaming Application")

  ssc.stop(true, true)<>("Application stopped")


  val lines = ssc.socketTextStream("localhost", 7777)

  // Create a count of log hits by ip

  var ipCounts=countByIp(lines)


  // start our streaming context and wait for it to "finish"


  // Wait for 600 seconds then exit




 def countByIp(lines: DStream[String]) = {

   val parser = new AccessLogParser

   val accessLogDStream = => parser.parseRecord(line))

   val ipDStream = =>

                    (entry.get.clientIpAddress, 1))

   ipDStream.reduceByKey((x, y) => x + y)



Thanks for any suggestions in advance.

