spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: about write mongodb in mapPartitions
Date Fri, 07 Nov 2014 10:02:32 GMT
Why not saveAsNewAPIHadoopFile?


//Define your mongoDB confs

val config = new Configuration()

     config.set("mongo.output.uri", "mongodb://
127.0.0.1:27017/sigmoid.output")

//Write everything to mongo
 rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any],
classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
config)


Thanks
Best Regards

On Fri, Nov 7, 2014 at 2:53 PM, qinwei <wei.qin@dewmobile.net> wrote:

> Hi, everyone
>
>     I come across with a prolem about writing data to mongodb in
> mapPartitions, my code is as below:
>
>          val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")
>           // some transformations
>         val rdd= sourceRDD .map(mapFunc).filter(filterFunc)
>         val newRDD = rdd.mapPartitions(args => {
>             val mongoClient = new MongoClient("host", port)
>             val db = mongoClient.getDB("db")
>             val coll = db.getCollection("collectionA")
>
>             args.map(arg => {
>                 coll.insert(new BasicDBObject("pkg", arg))
>                 arg
>             })
>
>             mongoClient.close()
>             args
>         })
>
>         newRDD.saveAsTextFile("hdfs://host:port/path")
>
>     The application saved data to HDFS correctly, but not mongodb, is
> there someting wrong?
>     I know that collecting the newRDD to driver and then saving it to
> mongodb will success, but will the following saveAsTextFile read the
> filesystem once again?
>
>     Thanks
>
>
> ------------------------------
> qinwei
>

Mime
View raw message