kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrik Kleindl <pklei...@gmail.com>
Subject Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException
Date Wed, 03 Oct 2018 07:20:46 GMT
Hello Matthias
Thank you for the explanation.

Version used is 2.0.0-cp1

The stacktrace:
2018-10-02 10:51:52,575 ERROR
[org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
(...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
[short-component-name:; transaction-id:; user-id:; creation-time:]
global-stream-thread
[...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
global state failed. You can restart KafkaStreams to recover from this
error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset policy for partitions:
{...=51247974}
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at
org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at
org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)

Fetcher.parseCompletedFetch:

else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                if (fetchOffset != subscriptions.position(tp)) {
                    log.debug("Discarding stale fetch response for
partition {} since the fetched offset {} " +
                            "does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                    log.info("Fetch offset {} is out of range for partition
{}, resetting offset", fetchOffset, tp);
                    subscriptions.requestOffsetReset(tp);
                } else {
                    throw new
OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                }

So this means that for global/restore the exception will always be thrown
without some special handling?

best regards

Patrik

On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matthias@confluent.io> wrote:

> It is by design to set the reset policy to "none"
> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
> design to overwrite this (there might be a workaround for you though).
> However, Streams should not die but catch the exception and recover from
> it automatically.
>
> What version do you use? Can you share the full stack trace to see why
> Streams failed to recover from this exception?
>
>
> -Matthias
>
> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
> > Hi
> >
> > We had several incidents where a streams application crashed while
> > maintaining a global state store.
> > Updating global state failed. You can restart KafkaStreams to recover
> from
> > this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions: ...
> >
> > As we never set this to none I checked the code and found that
> > StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs both
> > set this explicitely:
> > baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
> >
> > The logs confirms this:
> > 2018-10-02 11:07:06,057 INFO
> [org.apache.kafka.common.utils.AppInfoParser]
> > (ServerService Thread Pool -- 70) - [short-component-name:;
> > transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
> > 2018-10-02 11:07:06,057 INFO
> [org.apache.kafka.common.utils.AppInfoParser]
> > (ServerService Thread Pool -- 70) - [short-component-name:;
> > transaction-id:; user-id:; creation-time:]  Kafka commitId :
> > a8c648ff08b9235d
> > 2018-10-02 11:07:06,104 INFO
> > [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
> > Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> > creation-time:]  ConsumerConfig values:
> > auto.commit.interval.ms = 5000
> > auto.offset.reset = none
> > bootstrap.servers = [...]
> > check.crcs = true
> > client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
> >
> > ...
> >
> > 2018-10-02 11:07:06,418 INFO
> > [org.apache.kafka.streams.processor.internals.StreamThread]
> (ServerService
> > Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> > creation-time:]  stream-thread
> > [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
> restore
> > consumer client
> > 2018-10-02 11:07:06,419 INFO
> > [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread
> > Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> > creation-time:]  ConsumerConfig values:
> > auto.commit.interval.ms = 5000
> > auto.offset.reset = none
> > bootstrap.servers = [...]
> > check.crcs = true
> > client.id =
> > ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer
> >
> > Is this intentional and if yes, why can this not use the default policy
> and
> > recover?
> >
> > best regards
> >
> > Patrik
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message