spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinti Maheshwari <vinti.u...@gmail.com>
Subject Re: Spark Streaming, very slow processing and increasing scheduling delay of kafka input stream
Date Thu, 10 Mar 2016 21:00:50 GMT
Hi Todd,

Thanks for replying on my email. Thanks for sharing link i will read that
as well.

The info you ask is as follows:
It was running on YARN. Overall memory is 256 GB  ( 2 nodes, 128 GB each)
I have total 16 cores (8 cores on each node) So i was running 16 executors
with one core each. I was not sure how much memory, i should allocate to
each executor. I gave 6 GB to each executor while running these tests.

One more thing, i wanted to mention is when i was running with small data
sets it was finished very quickly but when i am sending more data then the
performance start decreasing.
I enabled backpressure on for kafka input. So as scheduling delay was
increasing, then backpressure automatically reduced the number of events
from 4000 records/ sec to 160 records/sec.

I have attached two more screenshots, which might explain this behaviour
further.

Thanks & Regards,
Vinti



On Thu, Mar 10, 2016 at 6:03 AM, Todd Nist <tsindotg@gmail.com> wrote:

> Hi Vinti,
>
> All of your tasks are failing based on the screen shots provided.
>
> I think a few more details would be helpful.  Is this YARN or a Standalone
> cluster?  How much overall memory is on your cluster?  On each machine
> where workers and executors are running?  Are you using the Direct
> (KafkaUtils.createDirectStream) or Receiver (KafkaUtils.createStream)?
>
> You may find this discussion of value on SO:
>
> http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadatafetchfailedexception-missing-an-output-locatio
>
> -Todd
>
> On Mon, Mar 7, 2016 at 5:52 PM, Vinti Maheshwari <vinti.uiet@gmail.com>
> wrote:
>
>> Hi,
>>
>> My spark-streaming program seems very slow. I am using Ambari for cluster
>> setup and i am using Kafka for data input.
>> I tried to use batch size 2 secs and check pointing duration 10 secs. But
>> as i was seeing scheduling delay was keep increasing so i tried increasing
>> the batch size to 5 and then 10 secs. But it seems noting changed in
>> respect of performance.
>>
>> *My program is doing two tasks:*
>>
>> 1) Data aggregation
>>
>> 2) Data insertion into Hbase
>>
>> Action which took maximum time, when i called foreachRDD on Dstream
>> object (state).
>>
>> *state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*
>>
>>
>>
>>
>> *Program sample input coming from kafka:*
>> test_id, file1, 1,1,1,1,1
>>
>> *Code snippets:*
>>
>> val parsedStream = inputStream
>>   .map(line => {
>>     val splitLines = line.split(",")
>>     (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong)))
>>   })
>> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>           prev.map(_ +: current).orElse(Some(current))
>>             .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
>> })
>> *state.foreachRDD(rdd => rdd.foreach(Blaher.blah))*
>>
>>
>>
>> object Blaher {
>>   def blah(tup: (String, Array[Long])) {
>>     val hConf = HBaseConfiguration.create()
>>     ------
>>     val hTable = new HTable(hConf, tableName)
>>     val thePut = new Put(Bytes.toBytes("file_data"))
>>     thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1), Bytes.toBytes(tup._2.toList.toString))
>>     new ImmutableBytesWritable(Bytes.toBytes("file_data"))
>>
>>     hTable.put(thePut)
>>   }
>> }
>>
>>
>> *My Cluster Specifications:*
>> 16 executors ( 1 core each and 2g memory)
>>
>> I have attached some screenshots of running execution.
>>
>> Anyone has idea what changes should i do to speedup the processing?
>>
>> Thanks & Regards,
>>
>> Vinti
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>

Mime
View raw message