kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams and broker compatibility
Date Tue, 27 Aug 2019 01:08:38 GMT
Right, the fix itself actually add more headers even if there were none
from the source topics, and hence cause old versioned brokers to fail. But
theoretically speaking, as long as the streams clients are version 0.11.0+
the broker version should be 0.11.0+ for various features that may require
higher message format (eos, suppression, etc).

On Mon, Aug 26, 2019 at 5:42 PM Sophie Blee-Goldman <sophie@confluent.io>
wrote:

> I'm pretty sure one of the Suppress bug fixes that went into 2.2.1 involved
> adding headers. Updating the compatibility matrix must have just slipped
> when that bugfix was merged -- thanks for bringing this up!
>
> On Mon, Aug 26, 2019 at 5:37 PM Alisson Sales <alisson.sales@gmail.com>
> wrote:
>
> > Hi Guozhang, thanks for your reply.
> >
> > I suspect the "problem" has to do with the fixes released on 2.2.1. I'm
> > upgrading to this version mostly because we were facing problems with
> > KTable suppress.
> >
> > I was experiencing this exact same problem:
> >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio/54227156
> > This was the fix: https://issues.apache.org/jira/browse/KAFKA-7895.
> >
> > When trying to confirm the fix worked for my topology/app I encountered
> the
> > issue: java.lang.IllegalArgumentException: Magic v1 does not support
> > record.
> >
> > In summary the topology works fine on 0.10.2.1 with kafka-streams 2.2.0,
> > but fails with the error above if I use 2.2.1.
> >
> > I haven't changed any part of the code, simply updated my gradle file
> > updating the dependency.
> >
> > Thanks again
> >
> > On Tue, Aug 27, 2019 at 12:24 PM Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Hello Alisson,
> > >
> > > The root cause you've seen is the message header support, which is
> added
> > in
> > > brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0
> (KIP-244).
> > If
> > > your code does not add any more headers then it would only inherit the
> > > headers from source topics when trying to write to intermediate / sink
> > > topics. So I think that even if you were using 2.2.0 you'd still hit
> this
> > > issue if you happen to have headers in some of your source topic
> > messages.
> > >
> > > I've updated
> > > https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > per
> > > the updates.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <alisson.sales@gmail.com
> >
> > > wrote:
> > >
> > > > Hi all, I've just upgraded a project that was using kafka-streams
> 2.2.0
> > > to
> > > > 2.2.1 and found the following error at the end.
> > > >
> > > > I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to
> > 0.11
> > > > the error doesn't happen anymore.
> > > >
> > > > My question here is: where is the best place we can find the required
> > > > minimum broker version for the kafka-streams version one is using?
> > > >
> > > > This is not clear to me and the
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> > > > wiki
> > > > page seems outdated.
> > > >
> > > > Thanks in advance
> > > >
> > > > Exception in thread
> > > >
> > "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> > > > org.apache.kafka.streams.errors.ProcessorStateException: task [1_2]
> > > Failed
> > > > to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> > > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [1_2]
> > > > Abort sending since an error caught with a previous record (key
> > > > A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> > > > timestamp
> > > > null) to topic
> > > > streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog
> > due
> > > to
> > > > java.lang.IllegalArgumentException: Magic v1 does not support record
> > > > headers
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > > > ... 10 more
> > > > Caused by: java.lang.IllegalArgumentException: Magic v1 does not
> > support
> > > > record headers
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message