kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jorg Heymans <jorg.heym...@gmail.com>
Subject Re: kafka-console-consumer --value-deserializer with access to headers
Date Thu, 14 Nov 2019 07:24:40 GMT
Hi,

Sorry for the late reply, here is the command:

kafka-console-consumer.cmd --consumer.config my-config.properties --bootstrap-server kafka-host:9443
--topic my-topic --value-deserializer my.BasicDeserializer --group my-console-group --property
print.key=true

my-config.properties only contains SSL related config.

Jorg

On 2019/11/12 15:06:02, "M. Manna" <manmedia@gmail.com> wrote: 
> HI,
> 
> On Tue, 12 Nov 2019 at 14:37, Jorg Heymans <jorg.heymans@gmail.com> wrote:
> 
> > Thanks for helping debugging this. You can reproduce the issue using below
> > deserializer, and invoking kafka-console-consumer with
> > --value-deserializer=my.BasicDeserializer . As you will see, when the
> > consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed
> > to the console.
> >
> > Thanks,
> > Jorg
> >
> In the above, what command have you put exactly from command prompt ? can
> you share this with us?
> 
> Thanks,
> 
> >
> > public class BasicDeserializer implements Deserializer<String> {
> >
> >     @Override
> >     public void configure(Map<String, ?> configs, boolean isKey) {
> >         System.out.println("CONFIGURE");
> >     }
> >
> >     @Override
> >     public String deserialize(String topic, byte[] data) {
> >         System.out.println("SERDE WITHOUT HEADERS");
> >         return new String(data);
> >     }
> >
> >     @Override
> >     public String deserialize(String topic, Headers headers, byte[] data) {
> >         System.out.println("SERDE WITH HEADERS");
> >         return new String(data);
> >     }
> >
> >     @Override
> >     public void close() {
> >         System.out.println("CLOSE");
> >     }
> > }
> >
> >
> >
> >
> > On 2019/11/12 12:57:21, "M. Manna" <manmedia@gmail.com> wrote:
> > > HI again,
> > >
> > > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans <jorg.heymans@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > The issue is not that i cannot get a custom deserializer working, it's
> > > > that the custom deserializer i provide implements the default method
> > from
> > > > the Deserializer interface
> > > >
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > > > that gives access to record Headers.
> > > >
> > > > The kafka console consumer never calls this method, it will only call
> > the
> > > > variant without Headers
> > > >
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> > > >
> > > > I'm using kafka 2.3.0 btw.
> > > >
> > > > Jorg
> > > >
> > >
> > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > > is calling default implementation of Deserializer.deserialize() with
> > > header. The default implementation returns the implementation of
> > > deserialize() with header. If you provide overridden version of
> > > deserializer (for both header/non-header) it will be called.
> > >
> > >
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> > >
> > >
> > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> > >
> > > Console consumer simply puts a consumer wrapper around KafkaConsumer.
> > There
> > > is no change in behaviour otherwise. I take it that you've debugged and
> > > confirmed that it's not calling your overridden deserialize() with
> > headers?
> > > If so, can you link it here for everyone's benefit?
> > >
> > > Thanks,
> > >
> > >
> > >
> > >
> > >
> > > > On 2019/11/12 11:58:26, "M. Manna" <manmedia@gmail.com> wrote:
> > > > >
> > > > > I think you can try the following to get your implementation working
> > > > >
> > > > > 1) Provide the SerDe classes into classpath
> > > > > 2) Provide your consumer config file
> > > > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > > > >
> > > >
> > > >
> > >
> >
> 

Mime
View raw message