Hi Buntu,

You could something similar to the following:

 val receiver_stream = new ReceiverInputDStream(ssc) {
      override def getReceiver(): Receiver[Nothing] = ??? //Whatever
    }.map((x : String) => (null, x))
    val config = new Configuration()
    config.set("mongo.output.uri", "mongodb://akhld:27017/sigmoid.output")
    receiver_stream.foreachRDD(rdd => {
      val pair_rdd = new PairRDDFunctions[Null, String](rdd) // make sure your rdd contains a key, value
      pair_rdd.saveAsNewAPIHadoopFile("/home/akhld/sigmoid/beta/", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)

Best Regards

On Tue, Oct 21, 2014 at 11:59 PM, Buntu Dev <buntudev@gmail.com> wrote:
Thanks Akhil, 

I tried this but running into similar error:

 val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)            
 stream.map(message => {
     (null, message)                                                                                                                                                    }).saveAsNewAPIHadoopFile (destination, classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf)

value saveAsNewAPIHadoopFile is not a member of org.apache.spark.rdd.RDD[(Null, String)]

How do I go about converting to PairRDDFunctions?

On Fri, Oct 10, 2014 at 12:01 AM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
You can convert this ReceiverInputDStream into PairRDDFuctions and call the saveAsNewAPIHadoopFile.

Best Regards

On Fri, Oct 10, 2014 at 11:28 AM, Buntu Dev <buntudev@gmail.com> wrote:
Basically I'm attempting to convert a JSON stream to Parquet and I get this error without the .values or .map(_._2) :

 value saveAsNewAPIHadoopFile is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, String)]

On Thu, Oct 9, 2014 at 10:15 PM, Sean Owen <sowen@cloudera.com> wrote:
Your RDD does not contain pairs, since you ".map(_._2)" (BTW that can
just be ".values"). "Hadoop files" means "SequenceFiles" and those
store key-value pairs. That's why the method only appears for

On Fri, Oct 10, 2014 at 3:50 AM, Buntu Dev <buntudev@gmail.com> wrote:
> Thanks Sean, but I'm importing org.apache.spark.streaming.StreamingContext._
> Here are the spark imports:
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.SparkConf
> ....
>     val stream = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(_._2)             stream.saveAsNewAPIHadoopFile (destination,
> classOf[Void], classOf[Group], classOf[ExampleOutputFormat], conf)
> ....
> Anything else I might be missing?