Fwd: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming

Mich Talebzadeh
Hi,

Anyone in the Spark community has had any exposure to this work please?

thanks



---------- Forwarded message ---------
From: Mich Talebzadeh <[hidden email]>
Date: Thu, 6 Sep 2018 at 21:12
Subject: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming
To: <[hidden email]>


Hi,

I thought that may find below useful.

Versions:

  • Hadoop 3.1
  • Spark 2.3
  • MongoDB 4.0.1
  • ZooKeeper on docker version zookeeper-3.4.11
  • Three Kafka dockers running kafka version kafka_2.12-0.10.2.1
I send trade data every 2 seconds composing of 100 securities for the Kafka topic. So in every batch interval = 2 seconds we deal with 100 rows.

I then go three every RDD and look at the individual rows comprising:

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)

 And examine every security for high value prices.

This loop seems to work OK

        // Work on individual messages
         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toString.toDouble
           var priceToString = line._2.split(',').view(3)
           var CURRENCY = "GBP"
           var op_type = "1"
           var op_time = System.currentTimeMillis.toString
           if (price > 90.0)
           {
             //println ("price > 90.0, saving to MongoDB collection!")
             var document = sparkContext.parallelize((1 to 1).
                            map(i => Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
             //
             // Writing document to MongoDB collection
             //
             MongoSpark.save(document, writeConfig)
             if(ticker == "VOD" && price > 99.0)
             {
               sqltext = Calendar.getInstance.getTime.toString + ", Price on "+ticker+" hit " +price.toString
               //java.awt.Toolkit.getDefaultToolkit().beep()
               println(sqltext)

             }
           }
         }
      }

I collected 30,000 trades for this streaming and as you see I write to MongoDB. In this case MongoDB is a standalone cluster.

The performance is good as shown in below Spark GUI

image.png
In general if your average processing time (here around 600ms < Batch interval of 2 sec, then you are OK). However, when I compare this using Hbase as the data store (in place of MongoDB), I end up with processing time of 52ms for Hbase) as shown below:

image.png

The number of batches in both runs are pretty similar. So I am wondering what factors influence this delay in MongoDB. In both cases Spark is running under standalone mode with the same configuration for both runs. It is possible that the way I write documents to MongoDB is not particularly efficient or the connection through the following connection string in Spark
connectionString = dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName

and sparkConf 

 sparkConf.set("spark.mongodb.input.uri", connectionString)
 sparkConf.set("spark.mongodb.output.uri", connectionString)

 is not particularly efficient.

Of course Hbase is native to Hadoop in this case and it uses HDFS for storage. MongoDB is configured external to Hadoop 

My concern at the moment is the speed of writes to MongoDB as opposed to any reads/queries etc.

I appreciate if someone else shares their experiences or suggestions..

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming

Mich Talebzadeh
Hi,

Thanks for colleagues who made great suggestions.

In my notes below I raised the concern about the speed of writes from Spark to MongoDB (standalone version).

I was doing looping over RDD rows and selecting high value trades (messages) and posting them into MongoDB collection individually.

This turned out to be inefficient in a distributed environment with spark streaming.

Hence I decided to modify the code and post the conditional rows of RDD by filtering those wanted rows at Dstream .In a simple language we moved away from cursor to treating the result set as one.

    dstream.foreachRDD
    { pricesRDD =>
      if (!pricesRDD.isEmpty)  // data exists in RDD
      {
        val op_time = System.currentTimeMillis.toString
        val spark = SparkSessionSingleton.getInstance(pricesRDD.sparkContext.getConf)
        import spark.implicits._
        // Convert RDD[String] to RDD[case class] to DataFrame
        val RDDString = pricesRDD.map { case (_, value) => value.split(',') }.map(p => columns(p(0).toString,p(1).toString,p(2).toString,p(3).toDouble, currency, op_type, op_time))
        val df = spark.createDataFrame(RDDString)
        var document = df.filter('price > 90.0)
        MongoSpark.save(document, writeConfig)
…..

This immediately improved the performance of the streaming statistics with Processing time going down from 605ms to 71ms. The scheduling delay was reduced from 261ms to 2ms. These are shown in plot below from Spark GUI

image.png

Contrast this one with the graph from the same operation using MongoDB with looping over individual messages 


image.png
I am now looking at other options to streamline the processes. Also note that MongoDB Compass has a web gui that allows basic monitoring of read and writes, network and memory usage. Having said that I did not find it particularly useful. A snapshot is shown below

image.png

HTH

Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 7 Sep 2018 at 19:34, Mich Talebzadeh <[hidden email]> wrote:
Hi,

Anyone in the Spark community has had any exposure to this work please?

thanks



---------- Forwarded message ---------
From: Mich Talebzadeh <[hidden email]>
Date: Thu, 6 Sep 2018 at 21:12
Subject: Using MongoDB as an Operational Data Store (ODS) with Spark Streaming
To: <[hidden email]>


Hi,

I thought that may find below useful.

Versions:

  • Hadoop 3.1
  • Spark 2.3
  • MongoDB 4.0.1
  • ZooKeeper on docker version zookeeper-3.4.11
  • Three Kafka dockers running kafka version kafka_2.12-0.10.2.1
I send trade data every 2 seconds composing of 100 securities for the Kafka topic. So in every batch interval = 2 seconds we deal with 100 rows.

I then go three every RDD and look at the individual rows comprising:

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)

 And examine every security for high value prices.

This loop seems to work OK

        // Work on individual messages
         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toString.toDouble
           var priceToString = line._2.split(',').view(3)
           var CURRENCY = "GBP"
           var op_type = "1"
           var op_time = System.currentTimeMillis.toString
           if (price > 90.0)
           {
             //println ("price > 90.0, saving to MongoDB collection!")
             var document = sparkContext.parallelize((1 to 1).
                            map(i => Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
             //
             // Writing document to MongoDB collection
             //
             MongoSpark.save(document, writeConfig)
             if(ticker == "VOD" && price > 99.0)
             {
               sqltext = Calendar.getInstance.getTime.toString + ", Price on "+ticker+" hit " +price.toString
               //java.awt.Toolkit.getDefaultToolkit().beep()
               println(sqltext)

             }
           }
         }
      }

I collected 30,000 trades for this streaming and as you see I write to MongoDB. In this case MongoDB is a standalone cluster.

The performance is good as shown in below Spark GUI

image.png
In general if your average processing time (here around 600ms < Batch interval of 2 sec, then you are OK). However, when I compare this using Hbase as the data store (in place of MongoDB), I end up with processing time of 52ms for Hbase) as shown below:

image.png

The number of batches in both runs are pretty similar. So I am wondering what factors influence this delay in MongoDB. In both cases Spark is running under standalone mode with the same configuration for both runs. It is possible that the way I write documents to MongoDB is not particularly efficient or the connection through the following connection string in Spark
connectionString = dbConnection+"://"+dbUsername+":"+dbPassword+"@"+mongodbHost+":"+mongodbPort+"/"+dbDatabase+"."+collectionName

and sparkConf 

 sparkConf.set("spark.mongodb.input.uri", connectionString)
 sparkConf.set("spark.mongodb.output.uri", connectionString)

 is not particularly efficient.

Of course Hbase is native to Hadoop in this case and it uses HDFS for storage. MongoDB is configured external to Hadoop 

My concern at the moment is the speed of writes to MongoDB as opposed to any reads/queries etc.

I appreciate if someone else shares their experiences or suggestions..

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.