This job contains a spark output action, and is what I originally meant:

rdd.mapPartitions {
}.foreach {


This job is just a transformation, and won't do anything unless you have another output action.  Not to mention, it will exhaust the iterator, as you noticed:

rdd.mapPartitions {

On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <> wrote:
Yes, of course, I am doing that. But once i added results.foreach(row=> {})   i pot empty RDD.

rdd.mapPartitions(partitionOfRecords => {


val results =


results.foreach(row=> {})



On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <> wrote:
You need to return an iterator from the closure you provide to mapPartitions

On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <> wrote:
Thanks for foreach idea. But once i used it i got empty rdd. I think because "results" is an iterator.

Yes i know "Map is lazy" but i expected there is solution to force action.

I can not use foreachPartition because i need reuse the new RDD after some maps.

On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <> wrote:

Map is lazy.  You need an actual action, or nothing will happen.  Use foreachPartition, or do an empty foreach after the map.

On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <> wrote:

    I needs to commit DB Transaction for each partition,Not for each row. below didn't work for me.

rdd.mapPartitions(partitionOfRecords => {


val results =



Best regards,

Ahmed Atef Nawwar

Data Management & Big Data Consultant


On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <> wrote:
Your kafka broker died or you otherwise had a rebalance.

Normally spark retries take care of that.

Is there something going on with your kafka installation, that rebalance is taking especially long?

Yes, increasing backoff / max number of retries will "help", but it's better to figure out what's going on with kafka.

On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <> wrote:

My streaming application gets killed with below error

5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream: ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100], [testtopic,193]))
15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for time 1440626120000 ms
org.apache.spark.SparkException: ArrayBuffer(kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([testtopic,115]))
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)

Kafka params in job logs printed are :
 value.serializer = class org.apache.kafka.common.serialization.StringSerializer
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        block.on.buffer.full = true = 100
        buffer.memory = 1048576
        batch.size = 16384 = 30000 = 300000
        receive.buffer.bytes = 32768 = 30000 = 5
        bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
        metric.reporters = [] =
        compression.type = none
        retries = 0
        max.request.size = 1048576
        send.buffer.bytes = 131072
        acks = all = 10 = 0
        metrics.num.samples = 2 = 60000

Is it kafka broker getting down and job is getting killed ? Whats the best way to handle it ?
Increasing retries and backoff time  wil help and to what values those should be set to never have streaming application failure - rather it keep on retrying after few seconds and send a event so that my custom code can send notification of kafka broker down if its because of that.