kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: UnknownProducerIdException in Kafka streams when enabling exactly once
Date Tue, 05 Jun 2018 22:35:41 GMT
Sorry for late reply.

> The source stream contains millions of messages produced over several
months.

What is the retention time of the output topic? If it is smaller than
the message timestamp (that I expect to be multiple month old), on write
the data would be delete quitckly, because it's older than retention
time (note, that Kafka Streams preserve the timestamp of the input
records and sets them as timestamps for the output records).

At the same time, the producerId is stored in the output topic and if
the topic segments are deleted, the producerId gets lost and thus, a
alter write fails.

Thus, you would need to either set the retention time of the output
topic larger, or maybe configure AppendTime instead of default
CreateTime. Of course, configuring AppendTime will alter the semantics
of the output data as they get new timestamps assigned.



-Matthias

On 4/17/18 10:52 AM, Odin wrote:
> After enabling exactly once processing on a Kafka streams application, the following
error appears in the logs:
> 
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer
> due to the following error:
> 
> org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort
> sending since an error caught with a previous record (key 222222 value
> some-value timestamp 1519200902670) to topic exactly-once-test-topic-
> v2 due to This exception is raised by the broker if it could not
> locate the producer metadata associated with the producerId in
> question. This could happen if, for instance, the producer's records
> were deleted because their retention time had elapsed. Once the last
> records of the producerId are removed, the producer's metadata is
> removed from the broker, and future appends by the producer will
> return this exception.
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>   at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>   at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>   at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>   at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>   at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
>   at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
>   at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>   at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>   at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
>   at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>   at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>   at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.UnknownProducerIdException
> 
> We've reproduced the issue with a minimal test case where we move messages from a source
stream to another stream without any transformation. The source stream contains millions of
messages produced over several months. The KafkaStreams object is created with the following
StreamsConfig:
> 
> - StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
> - StreamsConfig.APPLICATION_ID_CONFIG = "Some app id"
> - StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
> - ProducerConfig.BATCH_SIZE_CONFIG = 102400
> 
> The app is able to process some messages before the exception occurs. After restarting
the app, the processing of messages continues for a while before the exception occurs again.
> 
> Context information:
> 
> - we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes.
> - there are multiple instances of the app running
> 
> 1. Has anyone seen this problem before or can give us any hints about what might be causing
this behaviour?
> 
> 2. How should problems like this be treated?
> 
> Sincerely
> Odin
> 


Mime
View raw message