kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kristopher Kane <kkane.l...@gmail.com>
Subject Re: Kafka Streams Avro SerDe version/id caching
Date Fri, 05 Jan 2018 16:56:59 GMT
Just a follow up caching example for the DSL with emphasis on the consumer
(deserializer):

final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(
        new CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final Serde genericAvroSerde = Serdes.serdeFrom(kafkaAvroSerializer,
kafkaAvroDeserializer);

final KStream<String, GenericRecord> stream =
builder.stream(Serdes.String(), genericAvroSerde, incomingTopic);


On Fri, Oct 20, 2017 at 12:50 AM, Kristopher Kane <kkane.list@gmail.com>
wrote:

> I fixated on using the key/value deserializer classes in the consumer
> properties.  Overloading the consumer constructor is the way to enable
> schema caching:
>
> CachedSchemaRegistryClient cachedSchemaRegistryClient = new
> CachedSchemaRegistryClient("registry_url", 1000);
> KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
> cachedSchemaRegistryClient);
> StringDeserializer stringDeserializer = new StringDeserializer();
>
> final KafkaConsumer consumer = new KafkaConsumer(consumerProps,
> stringDeserializer , kafkaAvroDeserializer);
>
> In Streams, there is a similar overload for addSource:
>
> TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer,
String... topics)
>
> Kris
>
>
> On Tue, Oct 3, 2017 at 4:34 PM, Svante Karlsson <svante.karlsson@csi.se>
> wrote:
>
>> I've implemented the same logic for a c++ client - caching is the only way
>> to go since the performance impact of not doing it would be to big. So bet
>> on caching on all clients.
>>
>> 2017-10-03 18:12 GMT+02:00 Damian Guy <damian.guy@gmail.com>:
>>
>> > If you are using the confluent schema registry then the will be cached
>> by
>> > the SchemaRegistryClient.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 3 Oct 2017 at 09:00 Ted Yu <yuzhihong@gmail.com> wrote:
>> >
>> > > I did a quick search in the code base - there doesn't seem to be
>> caching
>> > as
>> > > you described.
>> > >
>> > > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane <kkane.list@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > If using a Byte SerDe and schema registry in the consumer configs
>> of a
>> > > > Kafka streams application, does it cache the Avro schemas by ID and
>> > > version
>> > > > after fetching from the registry once?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Kris
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message