spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zoran Jeremic <zoran.jere...@gmail.com>
Subject Re: How to restart Twitter spark stream
Date Mon, 20 Jul 2015 16:54:16 GMT
Thanks for explanation.

If I understand this correctly, in this approach I would actually stream
everything from Twitter, and perform filtering in my application using
Spark. Isn't this too much overhead if my application is interested in
listening for couple of hundreds or thousands hashtags?
On one side, this will be better approach since I will not have the problem
to open new streams if number of hashtags go over 400 which is the Twitter
limit for User stream filtering, but on the other side I'm concern about
how much it will affect application performance if I stream everything that
is posted on Twitter and filter it locally. It would be great if somebody
with experience on this could comment on these concerns.

Thanks,
Zoran

On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> Jorn meant something like this:
>
> val filteredStream = twitterStream.transform(rdd =>{
>
> val newRDD =
> scc.sc.textFile("/this/file/will/be/updated/frequently").map(x => (x,1))
>
> rdd.join(newRDD)
>
> })
>
> ​newRDD will work like a filter when you do the join.​
>
>
> Thanks
> Best Regards
>
> On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic <zoran.jeremic@gmail.com>
> wrote:
>
>> Hi Jorn,
>>
>> I didn't know that it is possible to change filter without re-opening
>> twitter stream. Actually, I already had that question earlier at the
>> stackoverflow
>> <http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming>
>> and I got the answer that it's not possible, but it would be even better if
>> there is some other way to add new hashtags or to remove old hashtags that
>> user stopped following. I guess the second request would be more difficult.
>>
>> However, it would be great if you can give me some short example how to
>> make this. I didn't understand well from your explanation what you mean by
>> "join it with a rdd loading the newest hash tags from disk in a regular
>> interval".
>>
>> Thanks,
>> Zoran
>>
>> On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke <jornfranke@gmail.com>
>> wrote:
>>
>>> Why do you even want to stop it? You can join it with a rdd loading the
>>> newest hash tags from disk in a regular interval
>>>
>>> Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic <zoran.jeremic@gmail.com>
a
>>> écrit :
>>>
>>>> Hi,
>>>>
>>>> I have a twitter spark stream initialized in the following way:
>>>>
>>>>               val ssc:StreamingContext =
>>>>> SparkLauncher.getSparkScalaStreamingContext()
>>>>>               val config = getTwitterConfigurationBuilder.build()
>>>>>               val auth: Option[twitter4j.auth.Authorization] =
>>>>> Some(new
>>>>>                         twitter4j.auth.OAuthAuthorization(config))
>>>>>               val stream = TwitterUtils.createStream(ssc, auth,
>>>>> filters)
>>>>>
>>>>
>>>> This works fine when I initialy start it. However, at some point I need
>>>> to update filters since users might add new hashtags they want to follow.
I
>>>> tried to stop the running stream and spark streaming context without
>>>> stoping spark context, e.g:
>>>>
>>>>
>>>>>                stream.stop()
>>>>>                ssc.stop(false)
>>>>>
>>>>
>>>> Afterward, I'm trying to initialize a new Twitter stream like I did
>>>> previously. However, I got this exception:
>>>>
>>>> Exception in thread "Firestorm JMX Monitor"
>>>>> java.lang.IllegalStateException: Adding new inputs, transformations,
and
>>>>> output operations after stopping a context is not supported
>>>>>     at
>>>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
>>>>>     at
>>>>> org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>>>     at
>>>>> org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:41)
>>>>>     at
>>>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.<init>(ReceiverInputDStream.scala:41)
>>>>>     at
>>>>> org.apache.spark.streaming.twitter.TwitterInputDStream.<init>(TwitterInputDStream.scala:46)
>>>>>     at
>>>>> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
>>>>>     at
>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
>>>>>     at
>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
>>>>>     at
>>>>> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
>>>>>     at
>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
>>>>>     at
>>>>> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
>>>>>     at java.util.TimerThread.mainLoop(Timer.java:555)
>>>>>     at java.util.TimerThread.run(Timer.java:505)
>>>>>  INFO    [2015-07-18 22:24:23,430] [Twitter Stream
>>>>> consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
>>>>> (SLF4JLogger.java:83)         Inflater has been closed
>>>>> ERROR    [2015-07-18 22:24:32,503]
>>>>> [sparkDriver-akka.actor.default-dispatcher-3]
>>>>> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75)        
Error
>>>>> stopping receiver
>>>>> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>>>>>
>>>>
>>>>>
>>>>
>>>> Anybody can explain how to solve this issue?
>>>>
>>>> Thanks,
>>>> Zoran
>>>>
>>>
>>
>>
>> --
>>
>> *******************************************************************************
>> Zoran Jeremic, PhD
>> Senior System Analyst & Programmer
>>
>> Athabasca University
>> Tel: +1 604 92 89 944
>> E-mail: zoran.jeremic@gmail.com <zoran.jeremic@va.mod.gov.rs>
>> Homepage:  http://zoranjeremic.org
>>
>> **********************************************************************************
>>
>
>


-- 
*******************************************************************************
Zoran Jeremic, PhD
Senior System Analyst & Programmer

Athabasca University
Tel: +1 604 92 89 944
E-mail: zoran.jeremic@gmail.com <zoran.jeremic@va.mod.gov.rs>
Homepage:  http://zoranjeremic.org
**********************************************************************************

Mime
View raw message