spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dgoldenberg <>
Subject Behavior of the spark.streaming.kafka.maxRatePerPartition config param?
Date Tue, 02 Jun 2015 22:28:25 GMT

Could someone explain the behavior of the
spark.streaming.kafka.maxRatePerPartition parameter? The doc says "An
important (configuration) is spark.streaming.kafka.maxRatePerPartition which
is the maximum rate at which each Kafka partition will be read by (the)
direct API."

What is the default behavior for this parameter? From some testing it
appears that with it not being set, the RDD size tends to be quite low. With
it set, we're seeing the consumer picking up items off the topic quite more
actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in

Does this parameter set the RDD size to a very low value? 

seems to be defaulting to 0... but what's the effect of that?
  protected val maxMessagesPerPartition: Option[Long] = {
    val ratePerSec = context.sparkContext.getConf.getInt(
      "spark.streaming.kafka.maxRatePerPartition", 0)
    if (ratePerSec > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble /
      Some((secsPerBatch * ratePerSec).toLong)
    } else {
  // limits the maximum number of messages per partition
  protected def clamp(
    leaderOffsets: Map[TopicAndPartition, LeaderOffset]):
Map[TopicAndPartition, LeaderOffset] = { { mmp => { case (tp, lo) =>
        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp,

what would we limit by default?  And once Spark Streaming does pick up
messages, would it be at the maximum value? does it ever fall below max even
if there are max or more than max in the topic? Thanks.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message