Thanks for helping debugging this. You can reproduce the issue using below deserializer, and
invoking kafka-console-consumer with --value-deserializer=my.BasicDeserializer . As you will
see, when the consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed to
the console.
Thanks,
Jorg
public class BasicDeserializer implements Deserializer<String> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
System.out.println("CONFIGURE");
}
@Override
public String deserialize(String topic, byte[] data) {
System.out.println("SERDE WITHOUT HEADERS");
return new String(data);
}
@Override
public String deserialize(String topic, Headers headers, byte[] data) {
System.out.println("SERDE WITH HEADERS");
return new String(data);
}
@Override
public void close() {
System.out.println("CLOSE");
}
}
On 2019/11/12 12:57:21, "M. Manna" <manmedia@gmail.com> wrote:
> HI again,
>
> On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jorg.heymans@gmail.com> wrote:
>
> > Hi,
> >
> > The issue is not that i cannot get a custom deserializer working, it's
> > that the custom deserializer i provide implements the default method from
> > the Deserializer interface
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > that gives access to record Headers.
> >
> > The kafka console consumer never calls this method, it will only call the
> > variant without Headers
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> >
> > I'm using kafka 2.3.0 btw.
> >
> > Jorg
> >
>
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
>
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
>
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
>
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
>
> Thanks,
>
>
>
>
>
> > On 2019/11/12 11:58:26, "M. Manna" <manmedia@gmail.com> wrote:
> > >
> > > I think you can try the following to get your implementation working
> > >
> > > 1) Provide the SerDe classes into classpath
> > > 2) Provide your consumer config file
> > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > >
> >
> >
>
|