spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abraham Jacob <abe.jac...@gmail.com>
Subject Re: Spark Streaming KafkaUtils Issue
Date Fri, 10 Oct 2014 22:56:39 GMT
Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/



   - Spark’s usage of the Kafka consumer parameter auto.offset.reset
   <http://kafka.apache.org/documentation.html#consumerconfigs> is
   different from Kafka’s semantics. In Kafka, the behavior of setting
   auto.offset.reset to “smallest” is that the consumer will automatically
   reset the offset to the smallest offset when a) there is no existing offset
   stored in ZooKeeper or b) there is an existing offset but it is out of
   range. Spark however will *always* remove existing offsets and then
   start all the way from zero again. This means whenever you restart your
   application with auto.offset.reset = "smallest", your application will
   completely re-process all available Kafka data. Doh! See this discussion
   <http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html>
   and that discussion <http://markmail.org/message/257a5l3oqyftsjxj>.


Hmm.... interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:

> Sure... I do set the group.id for all the consumers to be the same. Here
> is the code ---
>
>                 SparkConf sparkConf = new
> SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
> sparkConf.set("spark.shuffle.manager", "SORT");
> sparkConf.set("spark.streaming.unpersist", "true");
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(1000));
>  Map<String, String> kafkaConf = new HashMap<String, String>();
> kafkaConf.put("zookeeper.connect", zookeeper);
> kafkaConf.put("group.id", consumerGrp);
> kafkaConf.put("auto.offset.reset", "smallest");
> kafkaConf.put("zookeeper.conection.timeout.ms", "1000");
> kafkaConf.put("rebalance.max.retries", "4");
> kafkaConf.put("rebalance.backoff.ms", "3000");
>  Map<String, Integer> topicMap = new HashMap<String, Integer>();
> topicMap.put(topic, 1);
>  List<JavaPairDStream<byte[], String>> kafkaStreams = new
> ArrayList<JavaPairDStream<byte[], String>>();
> for(int i = 0; i < numPartitions; i++) {
> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
> DefaultDecoder.class, PayloadDeSerializer.class,
> kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new
> PairFunction<Tuple2<byte[],String>, byte[], String>() {
>
> private static final long serialVersionUID = -1936810126415608167L;
>
> public Tuple2<byte[], String> call(Tuple2<byte[], String> tuple2) throws
> Exception {
> return tuple2;
> }
> }
> )
> );
> }
>
>
>                 JavaPairDStream<byte[], String> unifiedStream;
> if (kafkaStreams.size() > 1) {
> unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
> } else {
> unifiedStream = kafkaStreams.get(0);
> }
>  unifiedStream.print();
> jssc.start();
> jssc.awaitTermination();
>
>
> -abe
>
>
> On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara <
> Sean.McNamara@webtrends.com> wrote:
>
>> Would you mind sharing the code leading to your createStream?  Are you
>> also setting group.id?
>>
>> Thanks,
>>
>> Sean
>>
>>
>> On Oct 10, 2014, at 4:31 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:
>>
>> > Hi Folks,
>> >
>> > I am seeing some strange behavior when using the Spark Kafka connector
>> in Spark streaming.
>> >
>> > I have a Kafka topic which has 8 partitions. I have a kafka producer
>> that pumps some messages into this topic.
>> >
>> > On the consumer side I have a spark streaming application that that has
>> 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same
>> kafka group id connected to the 8 partitions I have for the topic. Also the
>> kafka consumer property "auto.offset.reset" is set to "smallest".
>> >
>> >
>> > Now here is the sequence of steps -
>> >
>> > (1) I Start the the spark streaming app.
>> > (2) Start the producer.
>> >
>> > As this point I see the messages that are being pumped from the
>> producer in Spark Streaming.  Then I -
>> >
>> > (1) Stopped the producer
>> > (2) Wait for all the message to be consumed.
>> > (2) Stopped the spark streaming app.
>> >
>> > Now when I restart the spark streaming app (note - the producer is
>> still down and no messages are being pumped into the topic) - I observe the
>> following -
>> >
>> > (1) Spark Streaming starts reading from each partition right from the
>> beginning.
>> >
>> >
>> > This is not what I was expecting. I was expecting the consumers started
>> by spark streaming to start from where it left off....
>> >
>> > Is my assumption not correct that "the consumers (the kafka/spark
>> connector) to start reading from the topic where it last left off."..?
>> >
>> > Has anyone else seen this behavior? Is there a way to make it such that
>> it starts from where it left off?
>> >
>> > Regards,
>> > - Abraham
>>
>>
>
>
> --
> ~
>



-- 
~

Mime
View raw message