spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: How to restart Twitter spark stream
Date Sun, 19 Jul 2015 12:01:38 GMT
Why do you even want to stop it? You can join it with a rdd loading the
newest hash tags from disk in a regular interval

Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jeremic@gmail.com> a
écrit :

> Hi,
>
> I have a twitter spark stream initialized in the following way:
>
>               val ssc:StreamingContext =
>> SparkLauncher.getSparkScalaStreamingContext()
>>               val config = getTwitterConfigurationBuilder.build()
>>               val auth: Option[twitter4j.auth.Authorization] =
>> Some(new
>>                         twitter4j.auth.OAuthAuthorization(config))
>>               val stream = TwitterUtils.createStream(ssc, auth, filters)
>>
>
> This works fine when I initialy start it. However, at some point I need to
> update filters since users might add new hashtags they want to follow. I
> tried to stop the running stream and spark streaming context without
> stoping spark context, e.g:
>
>
>>                stream.stop()
>>                ssc.stop(false)
>>
>
> Afterward, I'm trying to initialize a new Twitter stream like I did
> previously. However, I got this exception:
>
> Exception in thread "Firestorm JMX Monitor"
>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>> output operations after stopping a context is not supported
>>     at
>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>     at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>     at
>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>     at
>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>     at
>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>     at
>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>     at
>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>     at
>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>     at
>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>     at
>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>     at
>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>     at java.util.TimerThread.run(Timer.java:505)
>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
>> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83)         Inflater
>> has been closed
>> ERROR    [2015-07-18 22:24:32,503]
>> [sparkDriver-akka.actor.default-dispatcher-3]
>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)         Error
>> stopping receiver
>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>
>
>>
>
> Anybody can explain how to solve this issue?
>
> Thanks,
> Zoran
>

Mime
View raw message