Probably this is the issue -

Hmm.... interesting... Wondering what happens if I set it as largest...?

On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob <> wrote:
Sure... I do set the for all the consumers to be the same. Here is the code ---

                SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("", consumerGrp);
kafkaConf.put("auto.offset.reset", "smallest");
kafkaConf.put("", "1000");
kafkaConf.put("rebalance.max.retries", "4");
kafkaConf.put("", "3000");
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, 
DefaultDecoder.class, PayloadDeSerializer.class, 
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception {
return tuple2;

                JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <> wrote:
Would you mind sharing the code leading to your createStream?  Are you also setting



On Oct 10, 2014, at 4:31 PM, Abraham Jacob <> wrote:

> Hi Folks,
> I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming.
> I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic.
> On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property "auto.offset.reset" is set to "smallest".
> Now here is the sequence of steps -
> (1) I Start the the spark streaming app.
> (2) Start the producer.
> As this point I see the messages that are being pumped from the producer in Spark Streaming.  Then I -
> (1) Stopped the producer
> (2) Wait for all the message to be consumed.
> (2) Stopped the spark streaming app.
> Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following -
> (1) Spark Streaming starts reading from each partition right from the beginning.
> This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off....
> Is my assumption not correct that "the consumers (the kafka/spark connector) to start reading from the topic where it last left off."..?
> Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off?
> Regards,
> - Abraham