kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka-Streams Offsets Commit - Reprocessing Very Old Messages
Date Wed, 03 May 2017 23:19:27 GMT
Hard to say.

Couple of things you could try: upgrade to 0.10.2.1 (got released last
week) -- it contains a couple of bug fixed with regard to rebalancing
and state store locks.

Also, when you application "jumps back", is this somewhere in the middle
of your input topic or is it "earliest" -- if it is earliest, it might
indicate that auto.offset.reset policy triggered, ie, no valid commit
offset was found.

Also, can you check if you applications is committing regularly? Per
default, this should happen in 30 sec intervals. You can use
bin/kafka-consumer-group.sh command to check committed offsets.


Hope this helps.


-Matthias


On 5/3/17 11:07 AM, Khatri, Alpa wrote:
> Hi,
> We are using apache kafka-streams 0.10.2.0 in an application. We are leveraging kafka-streams
topology for passing the processed data on to the next topic till the end of processing.
> Also, We use AWS ECS container to deploy Consumer Application. We observed consumer is
picking up very old messages to process, although they have been processed earlier. This issue
which happens randomly at the time of service scaling up/down or in case of new deployments.
I understand at the time of consumer rebalancing, some of the messages can be reprocessed.
But in this case, it is reprocessing large amount of messages which were successfully processed
long time back (more than 10 days old)
> We are not able to understand the root cause of this issue. Is it not committing the
offsets properly and picking up random messages in different topology. This leads to inconsistent
behavior of one message being re-processed in any of the topology.
> Here is the configurations we are using:
>     Properties streamsConfiguration = new Properties();
>     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
>    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
>     streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
>     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
>     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
>     streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
>     streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));
> 
> Here is the code snippet for Processors:
>     final KStreamBuilder builder = new KStreamBuilder();
>     builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(),
key);
>     builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
>     builder.addSink(key + "_sink", key + "_sink", key + "_processor");
>     final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key,
kafkaHost));
>     streams.start();
>     streams.setUncaughtExceptionHandler((t, th) -> {
>     _logger.error("UncaughtException in Kafka StreamThread  " + t.getName() + " exception
= ", th.getMessage());
>     });
>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> 
> I have looked into some of the kafka re-processing blogs and thinking to try some more
configurations listed below:
>     streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
>     streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default
is 10000
>     streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default
is 30000
>     streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
>     streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
>     streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);
> 
> 
> I also found that Kafka-Streams by default set the auto.commit.enable as false and don't
let you override the value and auto.offset.reset is set to earliest. Can this be an issue
for inconsistent behavior?
> 
> 
> I have found following types of exceptions :
> 
> Warning : When kafka-streams sends the response to the next topic:
> 
> 
> *       2017-05-0317:24:52.908|INSTANCEID_IS_UNDEFINED|lampnode.devops.tesco.org|kafka-producer-network-thread
| MediaDownload-StreamThread-6-producer|WARN |o.a.k.c.NetworkClient|707|Error while fetching
metadata with correlation id 41 : {MediaValidate=LEADER_NOT_AVAILABLE}
> 
> Randomly while processing message in Consumer End
> 
> 
> *       org.apache.kafka.streams.errors.LockException: task [0_91] Failed to lock the
state directory: /tmp/kafka-streams/Flare.MediaPreProcess/0_91
> 
> *       org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-32]
failed to suspend stream tasks
> 
> 
> 
> Kafka Logs show this exception for different topics:
> 
> [2017-05-03 13:13:52,734] ERROR [ReplicaFetcherThread-0-4], Error for partition [__consumer_offsets,45]
to broker 4:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does
not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> 
> 
> Please let me know if you need any kind of details.
> 
> Thanks,
> Alpa
> M:9742880007
> This is a confidential email. Tesco may monitor and record all emails. The views expressed
in this email are those of the sender and not Tesco. Tesco Stores Limited Company Number:
519500 Registered in England Registered Office: Tesco House, Shire Park, Kestrel Way, Welwyn
Garden City, AL7 1GA VAT Registration Number: GB 220 4302 31
> 


Mime
View raw message