I have a native R model and doing structured streaming on it. Data comes from Kafka and goes into dapply method where my model does prediction and data is written to sink.

Problem:- My model requires caret package. Inside dapply function for every stream job, caret package is loaded again which adds (~2s) delay.


kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed 
18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999 
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 0.001 s, total = 2.093 s

PFA: rest log file.

Ideally, the packages shouldn't be loaded again. I think the environment is getting created and destroyed with each query. Is there some solution to this? or Am I missing something here?


Thanks,

Deepansh