spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rajjad <raj...@gmail.com>
Subject union of multiple twitter streams [spark-streaming-twitter_2.11]
Date Mon, 02 Jul 2018 10:44:00 GMT
Hello,

Has anybody tried to union two streams of Twitter Statues? I am
instantiating two twitter streams through two different set of credentials
and passing them through a union function, but the console does not show
any activity neither there are any errors.


--static function that returns JavaReceiverInputDStream<Status>--



public static JavaReceiverInputDStream<Status>
getTwitterStream(JavaStreamingContext spark, String consumerKey, String
consumerSecret,String accessToken, String accessTokenSecret,String[]
filter) {
  // Enable Oauth
  ConfigurationBuilder cb = new ConfigurationBuilder();
  cb.setDebugEnabled(false)
    .setOAuthConsumerKey(consumerKey).setOAuthConsumerSecret(consumerSecret)

.setOAuthAccessToken(accessToken).setOAuthAccessTokenSecret(accessTokenSecret)
    .setJSONStoreEnabled(true);
  TwitterFactory tf = new TwitterFactory(cb.build());
  Twitter twitter = tf.getInstance();

  // Create stream
  return TwitterUtils.createStream(spark,
twitter.getAuthorization(),filter);
 }
---trying to union two twitter streams---

JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.minutes(5));

jssc.sparkContext().setLogLevel("ERROR");


JavaReceiverInputDStream<Status> twitterStreamByHashtag =
TwitterUtil.getTwitterStream(jssc, consumerKey1, consumerSecret1,
accessToken1, accessTokenSecret1,new String[]{"#Twitter"});
      // JavaReceiverInputDStream<Status> twitterStreamByUser =
TwitterUtil.getTwitterStream(jssc, consumerKey2, consumerSecret2,
accessToken2, accessTokenSecret2,new String[]{"@Twitter"});


JavaDStream<String> statuses = twitterStreamByHashtag
                .union(twitterStreamByUser)
                .map(s->{return s.getText();});


regards,
Imran

-- 
I.R

Mime
View raw message