spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming
Date Sun, 09 Sep 2018 18:57:20 GMT
Hi,

Thanks for colleagues who made great suggestions.

In my notes below I raised the concern about the speed of writes from Spark
to MongoDB (standalone version).

I was doing looping over RDD rows and selecting high value trades
(messages) and posting them into MongoDB collection individually.

This turned out to be inefficient in a distributed environment with spark
streaming.

Hence I decided to modify the code and post the conditional rows of RDD by
filtering those wanted rows at Dstream .In a simple language we moved away
from cursor to treating the result set as one.

    dstream.foreachRDD
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
      {
        val op_time = System.currentTimeMillis.toString
        val spark =
SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
        import spark.implicits._
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = pricesRDD.map { case (_, value) => value.split(',')
}.map(p => columns(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble,
currency, op_type, op_time))
        val df = spark.createDataFrame(RDDString)
        var document = df.filter('price > 90.0)
        MongoSpark.save(document, writeConfig)
…..

This immediately improved the performance of the streaming statistics with
Processing time going down from 605ms to 71ms. The scheduling delay was
reduced from 261ms to 2ms. These are shown in plot below from Spark GUI

[image: image.png]

Contrast this one with the graph from the same operation using MongoDB with
looping over individual messages


[image: image.png]
I am now looking at other options to streamline the processes. Also note
that MongoDB Compass has a web gui that allows basic monitoring of read and
writes, network and memory usage. Having said that I did not find it
particularly useful. A snapshot is shown below

[image: image.png]

HTH

Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 7 Sep 2018 at 19:34, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Hi,
>
> Anyone in the Spark community has had any exposure to this work please?
>
> thanks
>
>
>
> ---------- Forwarded message ---------
> From: Mich Talebzadeh <mich.talebzadeh@gmail.com>
> Date: Thu, 6 Sep 2018 at 21:12
> Subject: Using MongoDB as an Operational Data Store (ODS) with Spark
> Streaming
> To: <mongodb-user@googlegroups.com>
>
>
> Hi,
>
> I thought that may find below useful.
>
> Versions:
>
>
>    - Hadoop 3.1
>    - Spark 2.3
>    - MongoDB 4.0.1
>    - ZooKeeper on docker version zookeeper-3.4.11
>    - Three Kafka dockers running kafka version kafka_2.12-0.10.2.1
>
> I send trade data every 2 seconds composing of 100 securities for the
> Kafka topic. So in every batch interval = 2 seconds we deal with 100 rows.
>
> I then go three every RDD and look at the individual rows comprising:
>
> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE:
> Double)
>
>  And examine every security for high value prices.
>
> This loop seems to work OK
>
>         // Work on individual messages
>          for(line <- pricesRDD.collect.toArray)
>          {
>            var key = line._2.split(',').view(0).toString
>            var ticker =  line._2.split(',').view(1).toString
>            var timeissued = line._2.split(',').view(2).toString
>            var price = line._2.split(',').view(3).toString.toDouble
>            var priceToString = line._2.split(',').view(3)
>            var CURRENCY = "GBP"
>            var op_type = "1"
>            var op_time = System.currentTimeMillis.toString
>            if (price > 90.0)
>            {
>              //println ("price > 90.0, saving to MongoDB collection!")
>              var document = sparkContext.parallelize((1 to 1).
>                             map(i =>
> Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
>              //
>              // Writing document to MongoDB collection
>              //
>              MongoSpark.save(document, writeConfig)
>              if(ticker == "VOD" && price > 99.0)
>              {
>                sqltext = Calendar.getInstance.getTime.toString + ", Price
> on "+ticker+" hit " +price.toString
>                //java.awt.Toolkit.getDefaultToolkit().beep()
>                println(sqltext)
>              }
>            }
>          }
>       }
>
> I collected 30,000 trades for this streaming and as you see I write to
> MongoDB. In this case MongoDB is a standalone cluster.
>
> The performance is good as shown in below Spark GUI
>
> [image: image.png]
> In general if your average processing time (here around 600ms < Batch
> interval of 2 sec, then you are OK). However, when I compare this using
> Hbase as the data store (in place of MongoDB), I end up with processing
> time of 52ms for Hbase) as shown below:
>
> [image: image.png]
>
> The number of batches in both runs are pretty similar. So I am wondering
> what factors influence this delay in MongoDB. In both cases Spark is
> running under standalone mode with the same configuration for both runs. It
> is possible that the way I write documents to MongoDB is not particularly
> efficient or the connection through the following connection string in Spark
> connectionString =
> dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName
>
> and sparkConf
>
>  sparkConf.set("spark.mongodb.input.uri", connectionString)
>  sparkConf.set("spark.mongodb.output.uri", connectionString)
>
>  is not particularly efficient.
>
> Of course Hbase is native to Hadoop in this case and it uses HDFS for
> storage. MongoDB is configured external to Hadoop
>
> My concern at the moment is the speed of writes to MongoDB as opposed to
> any reads/queries etc.
>
> I appreciate if someone else shares their experiences or suggestions..
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Mime
View raw message