kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shalom Sagges <shalomsag...@gmail.com>
Subject Re: Magic v1 Does not Support Record Headers
Date Tue, 26 Nov 2019 06:26:48 GMT
Luckily, I was able to find that needle in a haystack. :-D
Thanks a lot for your guidance Matthias, it helped me a lot to understand
the issue.

On Mon, Nov 25, 2019 at 9:01 PM Matthias J. Sax <matthias@confluent.io>
wrote:

> Fankly, I am not entirely sure...
>
> I would _assume_ that you could still change the message format but I
> would highly recommend to try it out first in a non-production
> environment first.
>
> -Matthias
>
> On 11/25/19 4:51 AM, Shalom Sagges wrote:
> > Thanks a lot Matthias!
> >
> > This problematic topic is actually a topic that's been mirrored from an
> > older 0.8 version (using kafka-mirror).
> > I guess it's not possible to upgrade the message format in this case?
> >
> > Thanks again for your help!
> >
> > On Fri, Nov 22, 2019 at 7:32 AM Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> It's going to be hard to find out which client it is. This is a known
> >> issue in general and there is a KIP that address is:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
> >>
> >> The root cause for the error you see seems to be, that the client tries
> >> to write messages including record headers. Record headers where added
> >> in 0.11.0.0, thus, your brokers basically support them.
> >>
> >> However, it seems that the topic in question is still on message format
> >> 0.10 that does not support record headers. Note that broker version and
> >> message format are independent of each other. You can see from the stack
> >> trace, that the broker tries to down convert the message format (I
> >> assuem from 0.11 to 0.10 -- this down convertion would succeed if record
> >> headers would not be used).
> >>
> >>>
> >>
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
> >>
> >> Thus, the client must either stop using records headers, or you need to
> >> upgrade the message format to 0.11. See the docs for details about
> >> upgrading the message format.
> >>
> >>
> >> Hope that helps.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 11/21/19 12:38 AM, Shalom Sagges wrote:
> >>> Hi Experts,
> >>>
> >>> I use Kafka 0.11.2
> >>>
> >>> I have an issue where the Kafka logs are bombarded with the following
> >> error:
> >>> ERROR [KafkaApi-14733] Error when handling request
> >>>
> >>
> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
> >>> (kafka.server.KafkaApis)
> >>> java.lang.IllegalArgumentException: *Magic v1 does not support record
> >>> headers*
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
> >>>         at
> >>>
> >>
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
> >>>         at scala.Option.map(Option.scala:146)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
> >>>         at scala.Option.flatMap(Option.scala:171)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
> >>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >>>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >>>         at
> >>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >>>         at
> >>> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
> >>>         at
> >>>
> >>
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >>>         at
> >>>
> >>
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
> >>>         at
> >>> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
> >>>         at
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
> >>>         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
> >>>         at
> >>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>>
> >>>
> >>> I understand this is probably related to a client that uses a client
> >>> version that isn't compatible with 0.11, but I don't know how to
> pinpoint
> >>> the client since the topic is used by multiple consumers.
> >>> Any idea what this error actually means and how I can find the culprit?
> >>> I can't read anything in the logs besides this error  :-S
> >>>
> >>> Thanks a lot!
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message