There is no direct way of doing that. If you need a Single file for every batch duration, then you can repartition the data to 1 before saving. Another way would be to use hadoop's copy merge command/api(available from 2.0 versions)

On 13 Jan 2015 01:08, "Su She" <suhshekar52@gmail.com> wrote:
Hello Everyone, 

Quick followup, is there any way I can append output to one file rather then create a new directory/file every X milliseconds? 

Thanks!

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Thu, Jan 8, 2015 at 11:41 PM, Su She <suhshekar52@gmail.com> wrote:
1) Thank you everyone for the help once again...the support here is really amazing and I hope to contribute soon!


in case the thread ever goes down, the soln provided by Matei:

plans.saveAsHadoopFiles("hdfs://localhost:8020/user/hue/output/completed","csv", String.class, String.class, (Class) TextOutputFormat.class);

I had browsed a lot of similar threads that did not have answers, but found this one from quite some time ago, so apologize for posting a question that had been answered before.

3) Akhil, I was specifying the format as "txt", but it was not compatible 

Thanks for the help!


On Thu, Jan 8, 2015 at 11:23 PM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
saveAsHadoopFiles requires you to specify the output format which i believe you are not specifying anywhere and hence the program crashes.

You could try something like this:

Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
46      

yourStream.saveAsNewAPIHadoopFiles(hdfsUrl, "/output-location",Text.class, Text.class, outputFormatClass);


Thanks
Best Regards

On Fri, Jan 9, 2015 at 10:22 AM, Su She <suhshekar52@gmail.com> wrote:
Yes, I am calling the saveAsHadoopFiles on the Dstream. However, when I call print on the Dstream it works? If I had to do foreachRDD to saveAsHadoopFile, then why is it working for print?

Also, if I am doing foreachRDD, do I need connections, or can I simply put the saveAsHadoopFiles inside the foreachRDD function? 

Thanks Yana for the help! I will play around with foreachRDD and convey my results. 



On Thu, Jan 8, 2015 at 6:06 PM, Yana Kadiyska <yana.kadiyska@gmail.com> wrote:
are you calling the saveAsText files on the DStream --looks like it? Look at the section called "Design Patterns for using foreachRDD" in the link you sent -- you want to do  dstream.foreachRDD(rdd => rdd.saveAs....)

On Thu, Jan 8, 2015 at 5:20 PM, Su She <suhshekar52@gmail.com> wrote:
Hello Everyone,

Thanks in advance for the help!

I successfully got my Kafka/Spark WordCount app to print locally. However, I want to run it on a cluster, which means that I will have to save it to HDFS if I want to be able to read the output. 

I am running Spark 1.1.0, which means according to this document: 

I should be able to use commands such as saveAsText/HadoopFiles. 

1) When I try saveAsTextFiles it says: 
cannot find symbol
[ERROR] symbol  : method saveAsTextFiles(java.lang.String,java.lang.String)
[ERROR] location: class org.apache.spark.streaming.api.java.JavaPairDStream<java.lang.String,java.lang.Integer>

This makes some sense as saveAsTextFiles is not included here: 

2) When I try saveAsHadoopFiles("hdfs://ip....us-west-1.compute.internal:8020/user/testwordcount", "txt") it builds, but when I try running it it throws this exception: 

Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2079)
        at org.apache.hadoop.mapred.JobConf.getOutputFormat(JobConf.java:712)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1021)
        at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
        at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:632)
        at org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$8.apply(PairDStreamFunctions.scala:630)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2073)
        ... 14 more


Any help is really appreciated! Thanks.