beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raghu Angadi (JIRA)" <>
Subject [jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers
Date Mon, 06 Mar 2017 18:54:32 GMT


Raghu Angadi commented on BEAM-1573:

New API ({{withKeySerializer() & withValueSerializer()}}) sounds good. We can mark the
old API deprecated and also provide Coder based Kafka Serializer and Deserializer for if users
still want to use the coders (say for transition).

Implementation wise, note that Kafka deserializer would run on Kafka consumer thread, which
is outside normal 'advance()' (invoked by the runner on the reader). That implies we need
to propagate serialization errors appropriately and throw them in advance(). Alternately we
could invoke deserializer explicitly inside advance() rather than 'consumer poll thread',
not sure if there are any drawbacks to that.

[~peay] PR will be useful.

> KafkaIO does not allow using Kafka serializers and deserializers
> ----------------------------------------------------------------
>                 Key: BEAM-1573
>                 URL:
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>    Affects Versions: 0.4.0, 0.5.0
>            Reporter: peay
>            Assignee: Raghu Angadi
>            Priority: Minor
> KafkaIO does not allow to override the serializer and deserializer settings of the Kafka
consumer and producers it uses internally. Instead, it allows to set a `Coder`, and has a
simple Kafka serializer/deserializer wrapper class that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the rest of
the system. However, is there a reason to completely disallow to use custom Kafka serializers
> This is a limitation when working with an Avro schema registry for instance, which requires
custom serializers. One can write a `Coder` that wraps a custom Kafka serializer, but that
means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's `Serializer` which
gets the topic name as input. Using a `Coder` wrapper would require duplicating the output
topic setting in the argument to `KafkaIO` and when building the wrapper, which is not elegant
and error prone.

This message was sent by Atlassian JIRA

View raw message