spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean McNamara <>
Subject Re: Spark Streaming KafkaUtils Issue
Date Fri, 10 Oct 2014 22:53:47 GMT
How long do you let the consumers run for?  Is it less than 60 seconds by chance?
defaults to 60000 (60 seconds).  If so that may explain why you are seeing that behavior.



On Oct 10, 2014, at 4:47 PM, Abraham Jacob <<>>

Sure... I do set the<> for all the consumers to be the same.
Here is the code ---

                SparkConf sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("Streaming
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.streaming.unpersist", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Map<String, String> kafkaConf = new HashMap<String, String>();
kafkaConf.put("zookeeper.connect", zookeeper);
kafkaConf.put("<>", consumerGrp);
kafkaConf.put("auto.offset.reset", "smallest");
kafkaConf.put("rebalance.max.retries", "4");
kafkaConf.put("<>", "3000");
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
List<JavaPairDStream<byte[], String>> kafkaStreams = new ArrayList<JavaPairDStream<byte[],
for(int i = 0; i < numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunction<Tuple2<byte[],String>,
byte[], String>() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws Exception
return tuple2;

                JavaPairDStream<byte[], String> unifiedStream;
if (kafkaStreams.size() > 1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <<>>
Would you mind sharing the code leading to your createStream?  Are you also setting<>?



On Oct 10, 2014, at 4:31 PM, Abraham Jacob <<>>

> Hi Folks,
> I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming.
> I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some
messages into this topic.
> On the consumer side I have a spark streaming application that that has 8 executors on
8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8
partitions I have for the topic. Also the kafka consumer property "auto.offset.reset" is set
to "smallest".
> Now here is the sequence of steps -
> (1) I Start the the spark streaming app.
> (2) Start the producer.
> As this point I see the messages that are being pumped from the producer in Spark Streaming.
 Then I -
> (1) Stopped the producer
> (2) Wait for all the message to be consumed.
> (2) Stopped the spark streaming app.
> Now when I restart the spark streaming app (note - the producer is still down and no
messages are being pumped into the topic) - I observe the following -
> (1) Spark Streaming starts reading from each partition right from the beginning.
> This is not what I was expecting. I was expecting the consumers started by spark streaming
to start from where it left off....
> Is my assumption not correct that "the consumers (the kafka/spark connector) to start
reading from the topic where it last left off."..?
> Has anyone else seen this behavior? Is there a way to make it such that it starts from
where it left off?
> Regards,
> - Abraham


View raw message