spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinti Maheshwari <>
Subject Spark Streaming, very slow processing and increasing scheduling delay of kafka input stream
Date Mon, 07 Mar 2016 22:52:57 GMT

My spark-streaming program seems very slow. I am using Ambari for cluster
setup and i am using Kafka for data input.
I tried to use batch size 2 secs and check pointing duration 10 secs. But
as i was seeing scheduling delay was keep increasing so i tried increasing
the batch size to 5 and then 10 secs. But it seems noting changed in
respect of performance.

*My program is doing two tasks:*

1) Data aggregation

2) Data insertion into Hbase

Action which took maximum time, when i called foreachRDD on Dstream object

*state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*

*Program sample input coming from kafka:*
test_id, file1, 1,1,1,1,1

*Code snippets:*

val parsedStream = inputStream
  .map(line => {
    val splitLines = line.split(",")
    (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
        (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 +: current).orElse(Some(current))
            .flatMap(as => Try( + _).toArray).toOption)
*state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*

object Blaher {
  def blah(tup: (String, Array[Long])) {
    val hConf = HBaseConfiguration.create()
    val hTable = new HTable(hConf, tableName)
    val thePut = new Put(Bytes.toBytes("file_data"))
    thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1),
    new ImmutableBytesWritable(Bytes.toBytes("file_data"))


*My Cluster Specifications:*
16 executors ( 1 core each and 2g memory)

I have attached some screenshots of running execution.

Anyone has idea what changes should i do to speedup the processing?

Thanks & Regards,


View raw message