spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <>
Subject Fwd: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming
Date Fri, 07 Sep 2018 18:34:51 GMT

Anyone in the Spark community has had any exposure to this work please?


---------- Forwarded message ---------
From: Mich Talebzadeh <>
Date: Thu, 6 Sep 2018 at 21:12
Subject: Using MongoDB as an Operational Data Store (ODS) with Spark
To: <>


I thought that may find below useful.


   - Hadoop 3.1
   - Spark 2.3
   - MongoDB 4.0.1
   - ZooKeeper on docker version zookeeper-3.4.11
   - Three Kafka dockers running kafka version kafka_2.12-

I send trade data every 2 seconds composing of 100 securities for the Kafka
topic. So in every batch interval = 2 seconds we deal with 100 rows.

I then go three every RDD and look at the individual rows comprising:

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:

 And examine every security for high value prices.

This loop seems to work OK

        // Work on individual messages
         for(line <- pricesRDD.collect.toArray)
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toString.toDouble
           var priceToString = line._2.split(',').view(3)
           var CURRENCY = "GBP"
           var op_type = "1"
           var op_time = System.currentTimeMillis.toString
           if (price > 90.0)
             //println ("price > 90.0, saving to MongoDB collection!")
             var document = sparkContext.parallelize((1 to 1).
                            map(i =>
             // Writing document to MongoDB collection
   , writeConfig)
             if(ticker == "VOD" && price > 99.0)
               sqltext = Calendar.getInstance.getTime.toString + ", Price
on "+ticker+" hit " +price.toString

I collected 30,000 trades for this streaming and as you see I write to
MongoDB. In this case MongoDB is a standalone cluster.

The performance is good as shown in below Spark GUI

[image: image.png]
In general if your average processing time (here around 600ms < Batch
interval of 2 sec, then you are OK). However, when I compare this using
Hbase as the data store (in place of MongoDB), I end up with processing
time of 52ms for Hbase) as shown below:

[image: image.png]

The number of batches in both runs are pretty similar. So I am wondering
what factors influence this delay in MongoDB. In both cases Spark is
running under standalone mode with the same configuration for both runs. It
is possible that the way I write documents to MongoDB is not particularly
efficient or the connection through the following connection string in Spark
connectionString =

and sparkConf

 sparkConf.set("spark.mongodb.input.uri", connectionString)
 sparkConf.set("spark.mongodb.output.uri", connectionString)

 is not particularly efficient.

Of course Hbase is native to Hadoop in this case and it uses HDFS for
storage. MongoDB is configured external to Hadoop

My concern at the moment is the speed of writes to MongoDB as opposed to
any reads/queries etc.

I appreciate if someone else shares their experiences or suggestions..


Dr Mich Talebzadeh

LinkedIn *

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

View raw message