spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Subject Re: Kafka directsream receiving rate
Date Sat, 06 Feb 2016 20:09:24 GMT
Thanks  Cody for  trying to  understand the  issue .
Sorry if  I am  not  clear .
The scenario  is  to  process all messages at once  in  single  dstream block  when
 source  system  publishes messages .Source  system  will  publish x messages  / 10
minutes  once.

By events I meant that  total no of messages processed by each batch interval  ( in my case
2000ms)   by executor ( web UI shows each block  processing  as  events)  

DirectStream is processing only  10 messages per batch. It  is  same if  100 or  1 million
 messages published. 

xyz topic having 20 partitions. 
I am  using  kafka producer api to publish messages. 
Below  is  the  code  that  I am  using 
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092")
val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams,
topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran  = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}


 }

I tried using DirectStream with map&partition which  I had  issue with  offsetRange
. After  your  suggestion  offset  issue  is  resolved  when  I used  above  DirectStream
code with topic only.

spark-submit setting that  I am  using  is  in  the  mail  chain  below .
Is  there  any bottlebeck I am  hitting to  process maximum messages at one batch interval
using  directsream rdd?  .
If  this  is  not  clear . I would  take  this  offline  and  explain   scenario
briefly. 

Sent from Samsung Mobile.







<div>-------- Original message --------</div><div>From: Cody Koeninger <cody@koeninger.org>
</div><div>Date:06/02/2016  22:32  (GMT+05:30) </div><div>To: Diwakar
Dhanuskodi <diwakar.dhanuskodi@gmail.com> </div><div>Cc: user@spark.apache.org
</div><div>Subject: Re: Kafka directsream receiving rate </div><div>
</div>I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages under each  dstream
block."  If it is processing all messages, what is the problem you are having?

"The issue is  with  Directsream processing 10 message per event. "  What distinction are
you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first email said you
were publishing 100 messages but only processing 10.  Why are you now trying to process 1
million messages without understanding what is going on?  Make sure you can process a limited
number of messages correctly first.  The first code examples you posted to the list had some
pretty serious errors (ie only trying to process 1 partition, trying to process offsets that
didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition,
you shouldn't be hitting some kind of limit with 1 million messages per batch.  You may of
course hit executor resource issues depending on what you're trying to do with each message,
but that doesn't sound like the case here.

If you want help, either clarify what you are saying, or post a minimal reproducible code
example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com> wrote:
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under each  dstream
block.

Source systems are   publishing  1 Million messages /4 secs which is less than batch interval.
The issue is  with  Directsream processing 10 message per event. When partitions were  increased
to  20 in topic, DirectStream picksup only 200 messages ( I guess 10 for  each partition )
at a time for  processing . I have  16 executors running for  streaming ( both  yarn client
& cluster mode). 
I am  expecting  Directsream to  process  1 million messages which  published in topic <
batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream is  better
than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions per batch .




Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <cody@koeninger.org>
Date:06/02/2016 01:30 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

Have you tried just printing each message, to see which ones are being processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com> wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming web UI . Also
 I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <cody@koeninger.org>
Date:06/02/2016 00:33 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, just to eliminate
that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com> wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092")
val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams,
topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  read  by  one
 Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true"
--conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false"
--conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf
"spark.shuffle.consolidateFiles=true"   --conf "spark.streaming.kafka.maxRatePerPartition=1000000"
--driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
--jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
/root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <cody@koeninger.org>
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 1 executor?

Can you post the relevant call to createDirectStream from your code, as well as any relevant
spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com> wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it receives at rate
of 10 messages at time. Am  I missing  some  configurations here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


-------- Original message --------
From: Diwakar Dhanuskodi <diwakar.dhanuskodi@gmail.com>
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org 
Cc:
Subject: Kafka directsream  receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct stream is receiving
10 messages per  dstream. I have  only  one  receiver . When I used createStream the  receiver
 received  entire 100 messages  at once.  

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.




Mime
View raw message