beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raghu Angadi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1573) KafkaIO does not allow using Kafka serializers and deserializers
Date Tue, 14 Mar 2017 05:24:41 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923595#comment-15923595
] 

Raghu Angadi commented on BEAM-1573:
------------------------------------

Thanks [~peay]. Please go ahead a send github PR, I does not need to be ready, partial implementation
is also fine. Just mention that it is not ready.

A related issue is how we want to handle deserialization errors. Looks like KafkaConsumer.poll()
throws KafkaException in case of derealization errors. So we don't know which record results
in this error either. What if a runner want to skip the record (or a bundle) that causes such
errors (not sure if any of the runners does this). This is not deal breaker, it is fine as
long as we properly propagate the exception to user.

One option is to invoke the deserializer ourself inside advance(). This give full control
on how we want to handle deserialization errors.
Another option to keep track of serialized size : We could have a wrapper deserializer class
that stores the byte size and invokes user's deserializer. This also gives control on how
we want to handle the exceptions.


> KafkaIO does not allow using Kafka serializers and deserializers
> ----------------------------------------------------------------
>
>                 Key: BEAM-1573
>                 URL: https://issues.apache.org/jira/browse/BEAM-1573
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>    Affects Versions: 0.4.0, 0.5.0
>            Reporter: peay
>            Assignee: Raghu Angadi
>            Priority: Minor
>             Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings of the Kafka
consumer and producers it uses internally. Instead, it allows to set a `Coder`, and has a
simple Kafka serializer/deserializer wrapper class that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the rest of
the system. However, is there a reason to completely disallow to use custom Kafka serializers
instead?
> This is a limitation when working with an Avro schema registry for instance, which requires
custom serializers. One can write a `Coder` that wraps a custom Kafka serializer, but that
means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's `Serializer` which
gets the topic name as input. Using a `Coder` wrapper would require duplicating the output
topic setting in the argument to `KafkaIO` and when building the wrapper, which is not elegant
and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message