beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Eugene Kirpichov (JIRA)" <>
Subject [jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers
Date Mon, 06 Mar 2017 09:45:33 GMT


Eugene Kirpichov commented on BEAM-1573:

At a conference right now, but quick comment: yes, as Raghu said, we're getting rid of Coder's
as a general parsing mechanism. Use of coders for the purpose for which KafkaIO currently
uses them is explicitly forbidden by the Beam PTransform Style Guide

We should replace that with having KafkaIO return byte[] and having convenience utilities
for deserializing these byte[] using Kafka deserializers, e.g. by wrapping the code Raghu
posted as a utility in the kafka module (packaged, say, as a SerializableFunction).

Raghu or @peay, perhaps consider sending a PR to fix this? It seems like it should be rather
easy. Though it would merit a short discussion on first.

> KafkaIO does not allow using Kafka serializers and deserializers
> ----------------------------------------------------------------
>                 Key: BEAM-1573
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>    Affects Versions: 0.4.0, 0.5.0
>            Reporter: peay
>            Assignee: Raghu Angadi
>            Priority: Minor
> KafkaIO does not allow to override the serializer and deserializer settings of the Kafka
consumer and producers it uses internally. Instead, it allows to set a `Coder`, and has a
simple Kafka serializer/deserializer wrapper class that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the rest of
the system. However, is there a reason to completely disallow to use custom Kafka serializers
> This is a limitation when working with an Avro schema registry for instance, which requires
custom serializers. One can write a `Coder` that wraps a custom Kafka serializer, but that
means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's `Serializer` which
gets the topic name as input. Using a `Coder` wrapper would require duplicating the output
topic setting in the argument to `KafkaIO` and when building the wrapper, which is not elegant
and error prone.

This message was sent by Atlassian JIRA

View raw message