kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <...@confluent.io>
Subject Re: [DISCUSSION] adding the serializer api back to the new java producer
Date Mon, 08 Dec 2014 22:09:36 GMT
Ok, based on all the feedbacks that we have heard, I plan to do the
following.

1. Keep the generic api in KAFKA-1797.
2. Add a new constructor in Producer/Consumer that takes the key and the
value serializer instance.
3. Have KAFKA-1797 reviewed and checked into 0.8.2 and trunk.

This will make it easy for people to reuse common serializers while at the
same time allow people to use the byte array api if one chooses to do so.

I plan to make those changes in the next couple of days unless someone
strongly objects.

Thanks,

Jun


On Fri, Dec 5, 2014 at 5:46 PM, Jiangjie Qin <jqin@linkedin.com.invalid>
wrote:

> Hi Jun,
>
> Thanks for pointing out this. Yes, putting serialization/deserialization
> code into record does lose some flexibility. Some more thinking, I think
> no matter what we do to bind the producer and serializer/deserializer, we
> can always to the same thing on Record, i.e. We can also have some
> constructor like ProducerRecor<Serializer<K, V>, Deserializer<K, V>>.
The
> downside of this is that we could potentially have a
> serializer/deserializer instance for each record (that's actually the very
> reason that I propose to put the code in record). This problem could be
> addressed by either using a singleton class or factory for
> serializer/deserializer library. But it might be a little bit complicated
> and we are not able to enforce that to external library either. So it
> seems only make sense if we really want to:
> 1. Have a single simple producer interface.
> AND
> 2. use a single producer send all type of messages
>
> I'm not sure if these requirement are strong enough to make us take the
> complexity of singleton/factory class serializer/deserializer library.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 12/5/14, 3:16 PM, "Jun Rao" <jun@confluent.io> wrote:
>
> >Jiangjie,
> >
> >The issue with adding the serializer in ProducerRecord is that you need to
> >implement all combinations of serializers for key and value. So, instead
> >of
> >just implementing int and string serializers, you will have to implement
> >all 4 combinations.
> >
> >Adding a new producer constructor like Producer<K, V>(KeySerializer<K>,
> >ValueSerializer<V>, Properties properties) can be useful.
> >
> >Thanks,
> >
> >Jun
> >
> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin <jqin@linkedin.com.invalid>
> >wrote:
> >
> >>
> >> I'm just thinking instead of binding serialization with producer,
> >>another
> >> option is to bind serializer/deserializer with
> >> ProducerRecord/ConsumerRecord (please see the detail proposal below.)
> >>            The arguments for this option is:
> >>         A. A single producer could send different message types. There
> >>are
> >> several use cases in LinkedIn for per record serializer
> >>         - In Samza, there are some in-stream order-sensitive control
> >> messages
> >> having different deserializer from other messages.
> >>         - There are use cases which need support for sending both Avro
> >> messages
> >> and raw bytes.
> >>         - Some use cases needs to deserialize some Avro messages into
> >> generic
> >> record and some other messages into specific record.
> >>         B. In current proposal, the serializer/deserilizer is
> >>instantiated
> >> according to config. Compared with that, binding serializer with
> >> ProducerRecord and ConsumerRecord is less error prone.
> >>
> >>
> >>         This option includes the following changes:
> >>         A. Add serializer and deserializer interfaces to replace
> >>serializer
> >> instance from config.
> >>                 Public interface Serializer <K, V> {
> >>                         public byte[] serializeKey(K key);
> >>                         public byte[] serializeValue(V value);
> >>                 }
> >>                 Public interface deserializer <K, V> {
> >>                         Public K deserializeKey(byte[] key);
> >>                         public V deserializeValue(byte[] value);
> >>                 }
> >>
> >>         B. Make ProducerRecord and ConsumerRecord abstract class
> >> implementing
> >> Serializer <K, V> and Deserializer <K, V> respectively.
> >>                 Public abstract class ProducerRecord <K, V> implements
> >> Serializer <K, V>
> >> {...}
> >>                 Public abstract class ConsumerRecord <K, V> implements
> >> Deserializer <K,
> >> V> {...}
> >>
> >>         C. Instead of instantiate the serializer/Deserializer from
> >>config,
> >> let
> >> concrete ProducerRecord/ConsumerRecord extends the abstract class and
> >> override the serialize/deserialize methods.
> >>
> >>                 Public class AvroProducerRecord extends ProducerRecord
> >> <String,
> >> GenericRecord> {
> >>                         ...
> >>                         @Override
> >>                         Public byte[] serializeKey(String key) {Š}
> >>                         @Override
> >>                         public byte[] serializeValue(GenericRecord
> >>value);
> >>                 }
> >>
> >>                 Public class AvroConsumerRecord extends ConsumerRecord
> >> <String,
> >> GenericRecord> {
> >>                         ...
> >>                         @Override
> >>                         Public K deserializeKey(byte[] key) {Š}
> >>                         @Override
> >>                         public V deserializeValue(byte[] value);
> >>                 }
> >>
> >>         D. The producer API changes to
> >>                 Public class KafkaProducer {
> >>                         ...
> >>
> >>                         Future<RecordMetadata> send (ProducerRecord <K,
> >>V>
> >> record) {
> >>                                 ...
> >>                                 K key = record.serializeKey(record.key);
> >>                                 V value =
> >> record.serializedValue(record.value);
> >>                                 BytesProducerRecord bytesProducerRecord
> >>=
> >> new
> >> BytesProducerRecord(topic, partition, key, value);
> >>                                 ...
> >>                         }
> >>                         ...
> >>                 }
> >>
> >>
> >>
> >> We also had some brainstorm in LinkedIn and here are the feedbacks:
> >>
> >> If the community decide to add the serialization back to new producer,
> >> besides current proposal which changes new producer API to be a
> >>template,
> >> there are some other options raised during our discussion:
> >>         1) Rather than change current new producer API, we can provide a
> >> wrapper
> >> of current new producer (e.g. KafkaSerializedProducer) and make it
> >> available to users. As there is value in the simplicity of current API.
> >>
> >>         2) If we decide to go with tempalated new producer API,
> >>according
> >> to
> >> experience in LinkedIn, it might worth considering to instantiate the
> >> serializer in code instead of from config so we can avoid runtime errors
> >> due to dynamic instantiation from config, which is more error prone. If
> >> that is the case, the producer API could be changed to something like:
> >>                 producer = new Producer<K, V>(KeySerializer<K>,
> >> ValueSerializer<V>)
> >>
> >> --Jiangjie (Becket) Qin
> >>
> >>
> >> On 11/24/14, 5:58 PM, "Jun Rao" <junrao@gmail.com> wrote:
> >>
> >> >Hi, Everyone,
> >> >
> >> >I'd like to start a discussion on whether it makes sense to add the
> >> >serializer api back to the new java producer. Currently, the new java
> >> >producer takes a byte array for both the key and the value. While this
> >>api
> >> >is simple, it pushes the serialization logic into the application. This
> >> >makes it hard to reason about what type of data is being sent to Kafka
> >>and
> >> >also makes it hard to share an implementation of the serializer. For
> >> >example, to support Avro, the serialization logic could be quite
> >>involved
> >> >since it might need to register the Avro schema in some remote registry
> >> >and
> >> >maintain a schema cache locally, etc. Without a serialization api, it's
> >> >impossible to share such an implementation so that people can easily
> >> >reuse.
> >> >We sort of overlooked this implication during the initial discussion of
> >> >the
> >> >producer api.
> >> >
> >> >So, I'd like to propose an api change to the new producer by adding
> >>back
> >> >the serializer api similar to what we had in the old producer.
> >>Specially,
> >> >the proposed api changes are the following.
> >> >
> >> >First, we change KafkaProducer to take generic types K and V for the
> >>key
> >> >and the value, respectively.
> >> >
> >> >public class KafkaProducer<K,V> implements Producer<K,V> {
> >> >
> >> >    public Future<RecordMetadata> send(ProducerRecord<K,V> record,
> >> >Callback
> >> >callback);
> >> >
> >> >    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
> >> >}
> >> >
> >> >Second, we add two new configs, one for the key serializer and another
> >>for
> >> >the value serializer. Both serializers will default to the byte array
> >> >implementation.
> >> >
> >> >public class ProducerConfig extends AbstractConfig {
> >> >
> >> >    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> >>Importance.HIGH,
> >> >KEY_SERIALIZER_CLASS_DOC)
> >> >    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> >>Importance.HIGH,
> >> >VALUE_SERIALIZER_CLASS_DOC);
> >> >}
> >> >
> >> >Both serializers will implement the following interface.
> >> >
> >> >public interface Serializer<T> extends Configurable {
> >> >    public byte[] serialize(String topic, T data, boolean isKey);
> >> >
> >> >    public void close();
> >> >}
> >> >
> >> >This is more or less the same as what's in the old producer. The slight
> >> >differences are (1) the serializer now only requires a parameter-less
> >> >constructor; (2) the serializer has a configure() and a close() method
> >>for
> >> >initialization and cleanup, respectively; (3) the serialize() method
> >> >additionally takes the topic and an isKey indicator, both of which are
> >> >useful for things like schema registration.
> >> >
> >> >The detailed changes are included in KAFKA-1797. For completeness, I
> >>also
> >> >made the corresponding changes for the new java consumer api as well.
> >> >
> >> >Note that the proposed api changes are incompatible with what's in the
> >> >0.8.2 branch. However, if those api changes are beneficial, it's
> >>probably
> >> >better to include them now in the 0.8.2 release, rather than later.
> >> >
> >> >I'd like to discuss mainly two things in this thread.
> >> >1. Do people feel that the proposed api changes are reasonable?
> >> >2. Are there any concerns of including the api changes in the 0.8.2
> >>final
> >> >release?
> >> >
> >> >Thanks,
> >> >
> >> >Jun
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message