spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Spico Florin <spicoflo...@gmail.com>
Subject [Spark Streaming] Apply multiple ML pipelines(Models) to the same stream
Date Thu, 31 Oct 2019 09:16:12 GMT
Hello!

I have an use case where I have to apply multiple already trained models
(e.g. M1, M2, ..Mn) on the same spark stream ( fetched from kafka).

The models were trained usining the isolation forest algorithm from here:
https://github.com/titicaca/spark-iforest


I have found something similar with my case here
https://www.youtube.com/watch?v=EhRHQPCdldI, but unfortunately I don't know
if the company Genesys (former AltoCloud) made this API  (StreamPipeline,
Heterogenous Pipeline ) open source.

I handled this with the above schema code, but I don't know how optimal is.

//read the stream
val kafkaStreamDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", broker)
      .option("subscribe", "topic")
      .load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the
same stream, otherwise blocked??
 myModels.par.foreach(lm => {

     //load the model
     val model = PipelineModel.load(lm)

      kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame,
batchId: Long) =>
        //apply model
        val pdf =
model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS
value").write
          .format("json")
          .save("anom/" + lm +  System.currentTimeMillis())
      }).start().awaitTermination()
    })

Questions:
1. Therefore, I would like to know if there is any any Spark API for
handling such an use case?

2. If yes, where can I find it?

3. If no, how can I optimally implement this?

Any idea, suggestions is highly appreciated.

Thanks.
 Florin

Mime
View raw message