[Spark Streaming] Apply multiple ML pipelines(Models) to the same stream

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Spark Streaming] Apply multiple ML pipelines(Models) to the same stream

spicoflorin
Hello!

I have an use case where I have to apply multiple already trained models (e.g. M1, M2, ..Mn) on the same spark stream ( fetched from kafka).

The models were trained usining the isolation forest algorithm from here: 
 

I have found something similar with my case here https://www.youtube.com/watch?v=EhRHQPCdldI, but unfortunately I don't know if the company Genesys (former AltoCloud) made this API  (StreamPipeline, Heterogenous Pipeline ) open source.

I handled this with the above schema code, but I don't know how optimal is.

//read the stream
val kafkaStreamDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", broker)
      .option("subscribe", "topic")
      .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
 myModels.par.foreach(lm => {

     //load the model     
     val model = PipelineModel.load(lm)
      
      kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
        //apply model
        val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
          .format("json")
          .save("anom/" + lm +  System.currentTimeMillis())
      }).start().awaitTermination()
    })

Questions:
1. Therefore, I would like to know if there is any any Spark API for handling such an use case?

2. If yes, where can I find it?

3. If no, how can I optimally implement this?

Any idea, suggestions is highly appreciated.

Thanks.
 Florin