Probably this is the issue - 

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/



Hmm.... interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:
Sure... I do set the group.id for all the consumers to be the same. Here is the code ---

                SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("group.id", consumerGrp);
kafkaConf.put("auto.offset.reset", "smallest");
kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
kafkaConf.put("rebalance.max.retries", "4");
kafkaConf.put("rebalance.backoff.ms", "3000");
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[], String>>();
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, 
DefaultDecoder.class, PayloadDeSerializer.class, 
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>, byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception {
return tuple2;
}
}
)
);
}


                JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <Sean.McNamara@webtrends.com> wrote:
Would you mind sharing the code leading to your createStream?  Are you also setting group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:

> Hi Folks,
>
> I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming.
>
> I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic.
>
> On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property "auto.offset.reset" is set to "smallest".
>
>
> Now here is the sequence of steps -
>
> (1) I Start the the spark streaming app.
> (2) Start the producer.
>
> As this point I see the messages that are being pumped from the producer in Spark Streaming.  Then I -
>
> (1) Stopped the producer
> (2) Wait for all the message to be consumed.
> (2) Stopped the spark streaming app.
>
> Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following -
>
> (1) Spark Streaming starts reading from each partition right from the beginning.
>
>
> This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off....
>
> Is my assumption not correct that "the consumers (the kafka/spark connector) to start reading from the topic where it last left off."..?
>
> Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off?
>
> Regards,
> - Abraham




--
~



--
~