spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zoran Jeremic <zoran.jere...@gmail.com>
Subject How to restart Twitter spark stream
Date Sun, 19 Jul 2015 05:39:57 GMT
Hi,

I have a twitter spark stream initialized in the following way:

              val ssc:StreamingContext =
> SparkLauncher.getSparkScalaStreamingContext()
>               val config = getTwitterConfigurationBuilder.build()
>               val auth: Option[twitter4j.auth.Authorization] =
> Some(new
>                         twitter4j.auth.OAuthAuthorization(config))
>               val stream = TwitterUtils.createStream(ssc, auth, filters)
>

This works fine when I initialy start it. However, at some point I need to
update filters since users might add new hashtags they want to follow. I
tried to stop the running stream and spark streaming context without
stoping spark context, e.g:


>                stream.stop()
>                ssc.stop(false)
>

Afterward, I'm trying to initialize a new Twitter stream like I did
previously. However, I got this exception:

Exception in thread "Firestorm JMX Monitor"
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after stopping a context is not supported
>     at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>     at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>     at
> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>     at
> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>     at
> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>     at
> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>     at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>     at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>     at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>     at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>     at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>     at java.util.TimerThread.mainLoop(Timer.java:555)
>     at java.util.TimerThread.run(Timer.java:505)
>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83)         Inflater
> has been closed
> ERROR    [2015-07-18 22:24:32,503]
> [sparkDriver-akka.actor.default-dispatcher-3]
> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         Error
> stopping receiver
> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>

>

Anybody can explain how to solve this issue?

Thanks,
Zoran

Mime
View raw message