Hi Jorn,

I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult.

However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by "join it with a rdd loading the newest hash tags from disk in a regular interval".


On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfranke@gmail.com> wrote:

Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval

Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jeremic@gmail.com> a écrit :

I have a twitter spark stream initialized in the following way:

              val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext()
              val config = getTwitterConfigurationBuilder.build()
              val auth: Option[twitter4j.auth.Authorization] = Some(new                    
              val stream = TwitterUtils.createStream(ssc, auth, filters)  

This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g:

Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception:

Exception in thread "Firestorm JMX Monitor" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
    at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
    at org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
    at org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
    at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
    at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
    at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
    at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
    at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
    at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
    at java.util.TimerThread.mainLoop(Timer.java:555)
    at java.util.TimerThread.run(Timer.java:505)
 INFO    [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83)         Inflater has been closed
ERROR    [2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)

Anybody can explain how to solve this issue?


Zoran Jeremic, PhD
Senior System Analyst & Programmer

Athabasca University
Tel: +1 604 92 89 944
E-mail: zoran.jeremic@gmail.com
Homepage:  http://zoranjeremic.org