spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Spark Streaming: Sentiment Analysis of Twitter streams
Date Wed, 15 Oct 2014 11:59:20 GMT
I ran it in both local and standalone, it worked for me. It does throws a
bind exception which is normal since we are using both SparkContext and
StreamingContext.

Thanks
Best Regards

On Wed, Oct 15, 2014 at 5:25 PM, S Krishna <skrishna.id@gmail.com> wrote:

> Hi,
>
> I am using 1.1.0. I did set my twitter credentials and I am using the full
> path. I did not paste this in the public post. I am running on a cluster
> and getting the exception. Are you running in local or standalone mode?
>
> Thanks
> On Oct 15, 2014 3:20 AM, "Akhil Das" <akhil@sigmoidanalytics.com> wrote:
>
>> I just ran the same code and it is running perfectly fine on my machine.
>> These are the things on my end:
>>
>> - Spark version: 1.1.0
>> - Gave full path to the negative and positive files
>> - Set twitter auth credentials in the environment.
>>
>> And here's the code:
>>
>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkContext._
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming.twitter.TwitterUtils
>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>> object Sentimenter {
>>>   def main(args: Array[String]) {
>>>
>>> System.setProperty("twitter4j.oauth.consumerKey","XXXXXXXXXXXXXXXXX");
>>>
>>> System.setProperty("twitter4j.oauth.consumerSecret","XXXXXXXXXXXXXXXXXXXXXXXXX");
>>>
>>> System.setProperty("twitter4j.oauth.accessToken","XXXXXXXXXXXXXXXXXXXXXXXXXXXX");
>>>
>>> System.setProperty("twitter4j.oauth.accessTokenSecret","XXXXXXXXXXXXXXXXXXXXXXX");
>>>
>>>     val filters = new Array[String](2)
>>>     filters(0) = "ebola"
>>>     filters(1) = "isis"
>>>     val sparkConf = new
>>> SparkConf().setAppName("TweetSentiment").setMaster("local[2]")
>>>     val sc = new SparkContext(sparkConf)
>>>     // get the list of positive words
>>>     val pos_list =  sc.textFile("file:///home/akhld/positive-words.txt")
>>> //Random
>>>       .filter(line => !line.isEmpty())
>>>       .collect()
>>>       .toSet
>>>     // get the list of negative words
>>>     val neg_list =  sc.textFile("file:///home/akhld/negative-words.txt")
>>> //Random
>>>       .filter(line => !line.isEmpty())
>>>       .collect()
>>>       .toSet
>>>     // create twitter stream
>>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>     val stream = TwitterUtils.createStream(ssc, None, filters)
>>>     val tweets = stream.map(r => r.getText)
>>>     tweets.print() // print tweet text
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>   }
>>> }
>>
>>
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Oct 15, 2014 at 1:43 AM, SK <skrishna.id@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to implement simple sentiment analysis of Twitter streams in
>>> Spark/Scala.  I am getting an exception and it appears when I combine
>>> SparkContext with StreamingContext in the same program. When I read the
>>> positive and negative words using only SparkContext.textFile (without
>>> creating a StreamingContext) and analyze static text files, the program
>>> works. Likewise, when I just create the twitter stream using
>>> StreamingContext (and dont create a SparkContext to create the
>>> vocabulary),
>>> the program works. The exception seems to be appearing when I combine
>>> both
>>> SparkContext and StreamingContext in the same program and I am not sure
>>> if
>>> we are not allowed to  have both simultaneously. All the examples in the
>>> streaming module contain only the StreamingContext. The error transcript
>>> and
>>> my code appear below. I would appreciate your guidance  in fixing this
>>> error
>>> and the right way to  read static files and streams in the same program
>>> or
>>> any pointers to relevant examples.
>>> Thanks.
>>>
>>>
>>> ----------------------Error transcript -----------------------------
>>> Lost task 0.0 in stage 2.0 (TID 70, mesos4-dev.sccps.net):
>>> java.io.IOException: unexpected exception type
>>>
>>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>>>
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>>>
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>>
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>
>>>
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>>
>>>
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>>
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>>>
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         java.lang.Thread.run(Thread.java:745)
>>> ------------------------------ My code below
>>> ------------------------------
>>> object TweetSentiment {
>>>   def main(args: Array[String]) {
>>>
>>>
>>>     val filters = args
>>>     val sparkConf = new SparkConf().setAppName("TweetSentiment")
>>>     val sc = new SparkContext(sparkConf)
>>>
>>>     // get the list of positive words
>>>     val pos_list =  sc.textFile("positive-words.txt")
>>>                       .filter(line => !line.isEmpty())
>>>                       .collect()
>>>                       .toSet
>>>
>>>     // get the list of negative words
>>>     val neg_list =  sc.textFile("negative-words.txt")
>>>                       .filter(line => !line.isEmpty())
>>>                       .collect()
>>>                       .toSet
>>>
>>>    // create twitter stream
>>>    val ssc = new StreamingContext(sparkConf, Seconds(60))
>>>    val stream = TwitterUtils.createStream(ssc, None, filters)
>>>    val tweets = stream.map(r => r.getText)
>>>    tweets.print() // print tweet text
>>>
>>>   ssc.start()
>>>   ssc.awaitTermination()
>>>   sc.stop()   // I tried commenting this, but the exception still
>>> appeared.
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Sentiment-Analysis-of-Twitter-streams-tp16410.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>

Mime
View raw message