kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Khatri, Alpa" <Alpa.Kha...@in.tesco.com>
Subject Kafka-Streams Offsets Commit - Reprocessing Very Old Messages
Date Wed, 03 May 2017 18:07:24 GMT
We are using apache kafka-streams in an application. We are leveraging kafka-streams
topology for passing the processed data on to the next topic till the end of processing.
Also, We use AWS ECS container to deploy Consumer Application. We observed consumer is picking
up very old messages to process, although they have been processed earlier. This issue which
happens randomly at the time of service scaling up/down or in case of new deployments. I understand
at the time of consumer rebalancing, some of the messages can be reprocessed. But in this
case, it is reprocessing large amount of messages which were successfully processed long time
back (more than 10 days old)
We are not able to understand the root cause of this issue. Is it not committing the offsets
properly and picking up random messages in different topology. This leads to inconsistent
behavior of one message being re-processed in any of the topology.
Here is the configurations we are using:
    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000));
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));

Here is the code snippet for Processors:
    final KStreamBuilder builder = new KStreamBuilder();
    builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key);
    builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
    builder.addSink(key + "_sink", key + "_sink", key + "_processor");
    final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key,
    streams.setUncaughtExceptionHandler((t, th) -> {
    _logger.error("UncaughtException in Kafka StreamThread  " + t.getName() + " exception
= ", th.getMessage());
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

I have looked into some of the kafka re-processing blogs and thinking to try some more configurations
listed below:
    streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default
is 30000
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

I also found that Kafka-Streams by default set the auto.commit.enable as false and don't let
you override the value and auto.offset.reset is set to earliest. Can this be an issue for
inconsistent behavior?

I have found following types of exceptions :

Warning : When kafka-streams sends the response to the next topic:

*       2017-05-0317:24:52.908|INSTANCEID_IS_UNDEFINED|lampnode.devops.tesco.org|kafka-producer-network-thread
| MediaDownload-StreamThread-6-producer|WARN |o.a.k.c.NetworkClient|707|Error while fetching
metadata with correlation id 41 : {MediaValidate=LEADER_NOT_AVAILABLE}

Randomly while processing message in Consumer End

*       org.apache.kafka.streams.errors.LockException: task [0_91] Failed to lock the state
directory: /tmp/kafka-streams/Flare.MediaPreProcess/0_91

*       org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-32]
failed to suspend stream tasks

Kafka Logs show this exception for different topics:

[2017-05-03 13:13:52,734] ERROR [ReplicaFetcherThread-0-4], Error for partition [__consumer_offsets,45]
to broker 4:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does
not host this topic-partition. (kafka.server.ReplicaFetcherThread)

Please let me know if you need any kind of details.

This is a confidential email. Tesco may monitor and record all emails. The views expressed
in this email are those of the sender and not Tesco. Tesco Stores Limited Company Number:
519500 Registered in England Registered Office: Tesco House, Shire Park, Kestrel Way, Welwyn
Garden City, AL7 1GA VAT Registration Number: GB 220 4302 31

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message