kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams and broker compatibility
Date Tue, 27 Aug 2019 00:24:32 GMT
Hello Alisson,

The root cause you've seen is the message header support, which is added in
brokers as in 0.11.0 (KIP-82) and in streams client as in 2.0 (KIP-244). If
your code does not add any more headers then it would only inherit the
headers from source topics when trying to write to intermediate / sink
topics. So I think that even if you were using 2.2.0 you'd still hit this
issue if you happen to have headers in some of your source topic messages.

I've updated
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix per
the updates.


Guozhang

On Thu, Aug 22, 2019 at 4:12 PM Alisson Sales <alisson.sales@gmail.com>
wrote:

> Hi all, I've just upgraded a project that was using kafka-streams 2.2.0 to
> 2.2.1 and found the following error at the end.
>
> I was using Kafka Broker 0.10.2.1 and after upgrading the Broker to 0.11
> the error doesn't happen anymore.
>
> My question here is: where is the best place we can find the required
> minimum broker version for the kafka-streams version one is using?
>
> This is not clear to me and the
> https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
> wiki
> page seems outdated.
>
> Thanks in advance
>
> Exception in thread
> "streams-batch-opens-f5d8fdb1-db8b-415b-b812-ba448fac3dfa-StreamThread-1"
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed
> to flush state store KTABLE-SUPPRESS-STATE-STORE-0000000009
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1056)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:910)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_2]
> Abort sending since an error caught with a previous record (key
> A:2019-03-10T07:00:00Z\x00\x00\x01l\xBB\x8FZ\xB0 value [B@c28e8db
> timestamp
> null) to topic
> streams-batch-opens-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog due to
> java.lang.IllegalArgumentException: Magic v1 does not support record
> headers
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:244)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.logValue(InMemoryTimeOrderedKeyValueBuffer.java:284)
> at
>
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.flush(InMemoryTimeOrderedKeyValueBuffer.java:266)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> ... 10 more
> Caused by: java.lang.IllegalArgumentException: Magic v1 does not support
> record headers
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:412)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:451)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:508)
> at
>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:531)
> at
>
> org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
> at
>
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:224)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:907)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message