spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: Is it possible to save the streams to one single file?
Date Thu, 20 Nov 2014 11:30:44 GMT
You can repartition to 1 partition to generate 1 output file, but,
that has other potentially bad implications for your processing. You
would be using 1 partition and 1 worker only for the final stages of
processing.

On Thu, Nov 20, 2014 at 11:32 AM,  <jishnu.prathap@wipro.com> wrote:
> Hi
>
>             My question is generic:
>
> §  Is it possible to save the streams to one single file ? if yes can you
> give me a link or code sample?
>
> §  I tried using .saveastextfile but its creating different file for each
> stream. I need to update the same file instead of creating different file
> for each stream.
>
>             My Use Case:
>
> §  Retrieve twitter streams , then extract each tweets and perform sentiment
> analysis on them. Count the number of +ve and –ve sentiments.
>
> §  Save the count in a file .file should get updated with each stream.
>
>
>
> object sparkAnalytics {
>
>   def main(args: Array[String]) {
>
>
> org.apache.log4j.LogManager.getRootLogger().setLevel(org.apache.log4j.Level.ERROR);
>
>     val sentimentAnalyzer = new SentimentAnalyzer();
>
>     System.setProperty("twitter4j.oauth.consumerKey", "***")
>
>     System.setProperty("twitter4j.oauth.consumerSecret", "**********")
>
>     System.setProperty("twitter4j.oauth.accessToken", "******** ")
>
>     System.setProperty("twitter4j.oauth.accessTokenSecret",
> "**************")
>
>     val sparkConf = new
> SparkConf().setAppName("TwitterSentimentalAnalysis").setMaster("local[4]").set("spark.eventLog.enabled",
> "true")
>
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>
>     val stream = TwitterUtils.createStream(ssc, None) //, filters)
>
>     val statuses = stream.map(
>
>       status => sentimentAnalyzer.findSentiment(
>
>      status.getText().replaceAll("[^A-Za-z0-9 \\#]", "")))
>
>     val line = statuses.map(   tweetWithSentiment =>
> tweetWithSentiment.getCssClass())
>
>     val pos = line.filter(s => s.contains("sentiment-positive"))
>
>     val k = pos.count
>
>     k.print //Instead of Printing it in the console i have to update a file
>
>     val neg = line.filter(s => s.contains("sentiment-negative"))
>
>     val n = neg.count
>
>     n.print //Instead of Printing it in the console i have to update a file
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>
>
>   }
>
> }
>
>
>
> Thanks & Regards
>
> Jishnu Menath Prathap
>
>
>
> The information contained in this electronic message and any attachments to
> this message are intended for the exclusive use of the addressee(s) and may
> contain proprietary, confidential or privileged information. If you are not
> the intended recipient, you should not disseminate, distribute or copy this
> e-mail. Please notify the sender immediately and destroy all copies of this
> message and any attachments.
>
> WARNING: Computer viruses can be transmitted via email. The recipient should
> check this email and any attachments for the presence of viruses. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email.
>
> www.wipro.com

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message