spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Is it possible to save the streams to one single file?
Date Thu, 20 Nov 2014 10:56:08 GMT
Can you not use *hadoop getmerge* option to merge the files afterwards?
Older version of HDFS are immutable, meaning once you close the file then
you won't be able to modify it. In the newer version they have support for
it. Code below writes the output to one directory (deletes the previous
entries), you can modify it and make it "appendable" (Of course, you will
have to use outputstream/bufferedWriter for this to happen rather than
simple Spark's saveAs*File code)


      val hdfs = org.apache.hadoop.fs.FileSystem.get(new
URI("hdfs://akhld:9000"), hadoopConf)        try {
hdfs.delete(new org.apache.hadoop.fs.Path(this.location), true)
} catch { case _ : Throwable => { } }        tmp_stream.foreachRDD(rdd
=> {          try{            try {              hdfs.delete(new
org.apache.hadoop.fs.Path(this.location), true)            } catch{
case e: Exception =>              println("Exception!!HDFS => " + e)
         }            rdd.saveAsTextFile(this.location)
}catch{ case e: Exception =>            println("Exception!!HDFSs => "
+ e)          }        })





Thanks
Best Regards

On Thu, Nov 20, 2014 at 4:02 PM, <jishnu.prathap@wipro.com> wrote:

>  Hi
>
>             My question is generic:
>
> §  Is it possible to save the streams to *one* single file ? if yes can
> you give me a link or code sample?
>
> §  I tried using .saveastextfile but its creating different file for each
> stream. I need to update the same file instead of creating different file
> for each stream.
>
>             My Use Case:
>
> §  Retrieve twitter streams , then extract each tweets and perform
> sentiment analysis on them. Count the number of +ve and –ve sentiments.
>
> §  Save the count in a file .file should get *updated* with each stream.
>
>
>
> object sparkAnalytics {
>
>   def main(args: Array[String]) {
>
>
> org.apache.log4j.LogManager.getRootLogger().setLevel(org.apache.log4j.Level.ERROR);
>
>     val sentimentAnalyzer = new SentimentAnalyzer();
>
>     System.setProperty("twitter4j.oauth.consumerKey", "***")
>
>     System.setProperty("twitter4j.oauth.consumerSecret", "**********")
>
>     System.setProperty("twitter4j.oauth.accessToken", "******** ")
>
>     System.setProperty("twitter4j.oauth.accessTokenSecret",
> "**************")
>
>     val sparkConf = new
> SparkConf().setAppName("TwitterSentimentalAnalysis").setMaster("local[4]").set("spark.eventLog.enabled",
> "true")
>
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>
>     val stream = TwitterUtils.createStream(ssc, None) //, filters)
>
>     val statuses = stream.map(
>
>       status => sentimentAnalyzer.findSentiment(
>
>      status.getText().replaceAll("[^A-Za-z0-9 \\#]", "")))
>
>     val line = statuses.map(   tweetWithSentiment =>
> tweetWithSentiment.getCssClass())
>
>     val pos = line.filter(s => s.contains("sentiment-positive"))
>
>     val k = pos.count
>
>     k.print *//Instead of Printing it in the console i have to update a
> file*
>
>     val neg = line.filter(s => s.contains("sentiment-negative"))
>
>     val n = neg.count
>
>     n.print *//Instead of Printing it in the console i have to update a
> file*
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>
>
>   }
>
> }
>
>
>
> Thanks & Regards
>
> Jishnu Menath Prathap
>
>
>
> The information contained in this electronic message and any attachments
> to this message are intended for the exclusive use of the addressee(s) and
> may contain proprietary, confidential or privileged information. If you are
> not the intended recipient, you should not disseminate, distribute or copy
> this e-mail. Please notify the sender immediately and destroy all copies of
> this message and any attachments.
>
> WARNING: Computer viruses can be transmitted via email. The recipient
> should check this email and any attachments for the presence of viruses.
> The company accepts no liability for any damage caused by any virus
> transmitted by this email.
>
> www.wipro.com
>

Mime
View raw message