spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peyman Mohajerian <mohaj...@gmail.com>
Subject Re: Twitter streaming with apache spark stream only a small amount of tweets
Date Wed, 29 Jul 2015 17:40:33 GMT
'How to restart Twitter spark stream' i
It may not be exactly what you are looking for, but i thought it did touch
on some aspect of your question.

On Wed, Jul 29, 2015 at 10:26 AM, Zoran Jeremic <zoran.jeremic@gmail.com>
wrote:

> Can you send me the subject of that email? I can't find any email
> suggesting solution to that problem. There is email "*Twitter4j streaming
> question*", but it doesn't have any sample code. It just confirms what I
> explained earlier that without filtering Twitter will limit to 1% of
> tweets, and if you use filter API, Twitter limits you to 400 hashtags you
> can follow.
>
> Thanks,
> Zoran
>
> On Wed, Jul 29, 2015 at 8:40 AM, Peyman Mohajerian <mohajeri@gmail.com>
> wrote:
>
>> This question was answered with sample code a couple of days ago, please
>> look back.
>>
>> On Sat, Jul 25, 2015 at 11:43 PM, Zoran Jeremic <zoran.jeremic@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I discovered what is the problem here. Twitter public stream is limited
>>> to 1% of overall tweets (https://goo.gl/kDwnyS), so that's why I can't
>>> access all the tweets posted with specific hashtag using approach that I
>>> posted in previous email, so I guess this approach would not work for me.
>>> The other problem is that filtering has a limit of 400 hashtags (
>>> https://goo.gl/BywrAk), so in order to follow more than 400 hashtags I
>>> need more parallel streams.
>>>
>>> This brings me back to my previous question (https://goo.gl/bVDkHx). In
>>> my application I need to follow more than 400 hashtags, and I need to
>>> collect each tweet having one of these hashtags. Another complication is
>>> that users could add new hashtags or remove old hashtags, so I have to
>>> update stream in the real-time.
>>> My earlier approach without Apache Spark was to create twitter4j user
>>> stream with initial filter, and each time new hashtag has to be added, stop
>>> stream, add new hashtag and run it again. When stream had 400 hashtags, I
>>> initialize new stream with new credentials. This was really complex, and I
>>> was hopping that Apache Spark would make it simpler. However, I'm trying
>>> for a days to find solution, and had no success.
>>>
>>> If I have to use the same approach I used with twitter4j, I have to
>>> solve 2 problems:
>>> - how to run multiple twitter streams in the same spark context
>>> - how to add new hashtags to the existing filter
>>>
>>> I hope that somebody will have some more elegant solution and idea, and
>>> tell me that I missed something obvious.
>>>
>>> Thanks,
>>> Zoran
>>>
>>> On Sat, Jul 25, 2015 at 8:44 PM, Zoran Jeremic <zoran.jeremic@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've implemented Twitter streaming as in the code given at the bottom
>>>> of email. It finds some tweets based on the hashtags I'm following.
>>>> However, it seems that a large amount of tweets is missing. I've tried to
>>>> post some tweets that I'm following in the application, and none of them
>>>> was received in application. I also checked some hashtags (e.g. #android)
>>>> on Twitter using Live and I could see that almost each second something was
>>>> posted with that hashtag, and my application received only 3-4 posts in one
>>>> minute.
>>>>
>>>> I didn't have this problem in earlier non-spark version of application
>>>> which used twitter4j to access user stream API. I guess this is some
>>>> trending stream, but I couldn't find anything that explains which Twitter
>>>> API is used in Spark Twitter Streaming and how to create stream that will
>>>> access everything posted on the Twitter.
>>>>
>>>> I hope somebody could explain what is the problem and how to solve this.
>>>>
>>>> Thanks,
>>>> Zoran
>>>>
>>>>
>>>>  def initializeStreaming(){
>>>>>    val config = getTwitterConfigurationBuilder.build()
>>>>>    val auth: Option[twitter4j.auth.Authorization] = Some(new
>>>>> twitter4j.auth.OAuthAuthorization(config))
>>>>>    val stream:DStream[Status]  = TwitterUtils.createStream(ssc, auth)
>>>>>    val filtered_statuses = stream.transform(rdd =>{
>>>>>     val filtered = rdd.filter(status =>{
>>>>>     var found = false
>>>>>         for(tag <- hashTagsList){
>>>>>           if(status.getText.toLowerCase.contains(tag)) {
>>>>>             found = true
>>>>>             }
>>>>>         }
>>>>>         found
>>>>>       })
>>>>>       filtered
>>>>>     })
>>>>>     filtered_statuses.foreachRDD(rdd => {
>>>>>       rdd.collect.foreach(t => {
>>>>>         println(t)
>>>>>       })
>>>>>    })
>>>>>     ssc.start()
>>>>>   }
>>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
>
> *******************************************************************************
> Zoran Jeremic, PhD
> Senior System Analyst & Programmer
>
> Athabasca University
> Tel: +1 604 92 89 944
> E-mail: zoran.jeremic@gmail.com <zoran.jeremic@va.mod.gov.rs>
> Homepage:  http://zoranjeremic.org
>
> **********************************************************************************
>

Mime
View raw message