spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From S Krishna <skrishna...@gmail.com>
Subject Re: Spark Streaming: Sentiment Analysis of Twitter streams
Date Wed, 15 Oct 2014 11:55:44 GMT
Hi,

I am using 1.1.0. I did set my twitter credentials and I am using the full
path. I did not paste this in the public post. I am running on a cluster
and getting the exception. Are you running in local or standalone mode?

Thanks
On Oct 15, 2014 3:20 AM, "Akhil Das" <akhil@sigmoidanalytics.com> wrote:

> I just ran the same code and it is running perfectly fine on my machine.
> These are the things on my end:
>
> - Spark version: 1.1.0
> - Gave full path to the negative and positive files
> - Set twitter auth credentials in the environment.
>
> And here's the code:
>
> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming.twitter.TwitterUtils
>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>> object Sentimenter {
>>   def main(args: Array[String]) {
>>     System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXXX");
>>
>> System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXXXXXXXXXXX");
>>
>> System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXXXXXXXXXXXXXX");
>>
>> System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXXXXXXXXX");
>>
>>     val filters = new Array[String](2)
>>     filters(0) = "ebola"
>>     filters(1) = "isis"
>>     val sparkConf = new
>> SparkConf().setAppName("TweetSentiment").setMaster("local[2]")
>>     val sc = new SparkContext(sparkConf)
>>     // get the list of positive words
>>     val pos_list =  sc.textFile("file:///home/akhld/positive-words.txt")
>> //Random
>>       .filter(line => !line.isEmpty())
>>       .collect()
>>       .toSet
>>     // get the list of negative words
>>     val neg_list =  sc.textFile("file:///home/akhld/negative-words.txt")
>> //Random
>>       .filter(line => !line.isEmpty())
>>       .collect()
>>       .toSet
>>     // create twitter stream
>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>     val stream = TwitterUtils.createStream(ssc, None, filters)
>>     val tweets = stream.map(r => r.getText)
>>     tweets.print() // print tweet text
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>
>
>
>
>
> Thanks
> Best Regards
>
> On Wed, Oct 15, 2014 at 1:43 AM, SK <skrishna.id@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to implement simple sentiment analysis of Twitter streams in
>> Spark/Scala.  I am getting an exception and it appears when I combine
>> SparkContext with StreamingContext in the same program. When I read the
>> positive and negative words using only SparkContext.textFile (without
>> creating a StreamingContext) and analyze static text files, the program
>> works. Likewise, when I just create the twitter stream using
>> StreamingContext (and dont create a SparkContext to create the
>> vocabulary),
>> the program works. The exception seems to be appearing when I combine both
>> SparkContext and StreamingContext in the same program and I am not sure if
>> we are not allowed to  have both simultaneously. All the examples in the
>> streaming module contain only the StreamingContext. The error transcript
>> and
>> my code appear below. I would appreciate your guidance  in fixing this
>> error
>> and the right way to  read static files and streams in the same program or
>> any pointers to relevant examples.
>> Thanks.
>>
>>
>> ----------------------Error transcript -----------------------------
>> Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
>> java.io.IOException: unexpected exception type
>>
>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>>
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>
>>
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         java.lang.Thread.run(Thread.java:745)
>> ------------------------------ My code below
>> ------------------------------
>> object TweetSentiment {
>>   def main(args: Array[String]) {
>>
>>
>>     val filters = args
>>     val sparkConf = new SparkConf().setAppName("TweetSentiment")
>>     val sc = new SparkContext(sparkConf)
>>
>>     // get the list of positive words
>>     val pos_list =  sc.textFile("positive-words.txt")
>>                       .filter(line => !line.isEmpty())
>>                       .collect()
>>                       .toSet
>>
>>     // get the list of negative words
>>     val neg_list =  sc.textFile("negative-words.txt")
>>                       .filter(line => !line.isEmpty())
>>                       .collect()
>>                       .toSet
>>
>>    // create twitter stream
>>    val ssc = new StreamingContext(sparkConf, Seconds(60))
>>    val stream = TwitterUtils.createStream(ssc, None, filters)
>>    val tweets = stream.map(r => r.getText)
>>    tweets.print() // print tweet text
>>
>>   ssc.start()
>>   ssc.awaitTermination()
>>   sc.stop()   // I tried commenting this, but the exception still
>> appeared.
>> }
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Mime
View raw message