kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Wu <stevenz...@gmail.com>
Subject Re: [DISCUSSION] adding the serializer api back to the new java producer
Date Tue, 09 Dec 2014 23:29:58 GMT
> In practice the cases that actually mix serialization types in a single
stream are pretty rare I think just because the consumer then has the
problem of guessing how to deserialize, so most of these will end up with
at least some marker or schema id or whatever that tells you how to read
the data. Arguable this mixed serialization with marker is itself a
serializer type and should have a serializer of its own...

agree that it is unlikely to have mixed serialization format for one
topic/type. But we sometimes/often create one Producer object for one
cluster. and there can be many topics on this cluster. different topics may
have different serialization formats. So I agree with Guozhang's point
regarding "data type flexibility" of using simple byte[] (instead of
generic <K, V>).

On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps <jay@confluent.io> wrote:

> Hey Sriram,
>
> Thanks! I think this is a very helpful summary.
>
> Let me try to address your point about passing in the serde at send time.
>
> I think the first objection is really to the paired key/value serializer
> interfaces. This leads to kind of a weird combinatorial thing where you
> would have an avro/avro serializer a string/avro serializer, a pb/pb
> serializer, and a string/pb serializer, and so on. But your proposal would
> work as well with separate serializers for key and value.
>
> I think the downside is just the one you call out--that this is a corner
> case and you end up with two versions of all the apis to support it. This
> also makes the serializer api more annoying to implement. I think the
> alternative solution to this case and any other we can give people is just
> configuring ByteArraySerializer which gives you basically the api that you
> have now with byte arrays. If this is incredibly common then this would be
> a silly solution, but I guess the belief is that these cases are rare and a
> really well implemented avro or json serializer should be 100% of what most
> people need.
>
> In practice the cases that actually mix serialization types in a single
> stream are pretty rare I think just because the consumer then has the
> problem of guessing how to deserialize, so most of these will end up with
> at least some marker or schema id or whatever that tells you how to read
> the data. Arguable this mixed serialization with marker is itself a
> serializer type and should have a serializer of its own...
>
> -Jay
>
> On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian <
> srsubramanian@linkedin.com.invalid> wrote:
>
> > This thread has diverged multiple times now and it would be worth
> > summarizing them.
> >
> > There seems to be the following points of discussion -
> >
> > 1. Can we keep the serialization semantics outside the Producer interface
> > and have simple bytes in / bytes out for the interface (This is what we
> > have today).
> >
> > The points for this is to keep the interface simple and usage easy to
> > understand. The points against this is that it gets hard to share common
> > usage patterns around serialization/message validations for the future.
> >
> > 2. Can we create a wrapper producer that does the serialization and have
> > different variants of it for different data formats?
> >
> > The points for this is again to keep the main API clean. The points
> > against this is that it duplicates the API, increases the surface area
> and
> > creates redundancy for a minor addition.
> >
> > 3. Do we need to support different data types per record? The current
> > interface (bytes in/bytes out) lets you instantiate one producer and use
> > it to send multiple data formats. There seems to be some valid use cases
> > for this.
> >
> > I have still not seen a strong argument against not having this
> > functionality. Can someone provide their views on why we don't need this
> > support that is possible with the current API?
> >
> > One possible approach for the per record serialization would be to define
> >
> > public interface SerDe<K,V> {
> >   public byte[] serializeKey();
> >
> >   public K deserializeKey();
> >
> >   public byte[] serializeValue();
> >
> >   public V deserializeValue();
> > }
> >
> > This would be used by both the Producer and the Consumer.
> >
> > The send APIs can then be
> >
> > public Future<RecordMetadata> send(ProducerRecord<K,V> record);
> > public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback
> > callback);
> >
> >
> > public Future<RecordMetadata> send(ProducerRecord<K,V> record, SerDe<K,V>
> > serde);
> >
> > public Future<RecordMetadata> send(ProducerRecord<K,V> record, SerDe<K,V>
> > serde, Callback callback);
> >
> >
> > A default SerDe can be set in the config. The producer would use the
> > default from the config if the non-serde send APIs are used. The downside
> > to this approach is that we would need to have four variants of Send API
> > for the Producer.
> >
> >
> >
> >
> >
> >
> > On 12/5/14 3:16 PM, "Jun Rao" <jun@confluent.io> wrote:
> >
> > >Jiangjie,
> > >
> > >The issue with adding the serializer in ProducerRecord is that you need
> to
> > >implement all combinations of serializers for key and value. So, instead
> > >of
> > >just implementing int and string serializers, you will have to implement
> > >all 4 combinations.
> > >
> > >Adding a new producer constructor like Producer<K, V>(KeySerializer<K>,
> > >ValueSerializer<V>, Properties properties) can be useful.
> > >
> > >Thanks,
> > >
> > >Jun
> > >
> > >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin <jqin@linkedin.com.invalid
> >
> > >wrote:
> > >
> > >>
> > >> I'm just thinking instead of binding serialization with producer,
> > >>another
> > >> option is to bind serializer/deserializer with
> > >> ProducerRecord/ConsumerRecord (please see the detail proposal below.)
> > >>            The arguments for this option is:
> > >>         A. A single producer could send different message types. There
> > >>are
> > >> several use cases in LinkedIn for per record serializer
> > >>         - In Samza, there are some in-stream order-sensitive control
> > >> messages
> > >> having different deserializer from other messages.
> > >>         - There are use cases which need support for sending both Avro
> > >> messages
> > >> and raw bytes.
> > >>         - Some use cases needs to deserialize some Avro messages into
> > >> generic
> > >> record and some other messages into specific record.
> > >>         B. In current proposal, the serializer/deserilizer is
> > >>instantiated
> > >> according to config. Compared with that, binding serializer with
> > >> ProducerRecord and ConsumerRecord is less error prone.
> > >>
> > >>
> > >>         This option includes the following changes:
> > >>         A. Add serializer and deserializer interfaces to replace
> > >>serializer
> > >> instance from config.
> > >>                 Public interface Serializer <K, V> {
> > >>                         public byte[] serializeKey(K key);
> > >>                         public byte[] serializeValue(V value);
> > >>                 }
> > >>                 Public interface deserializer <K, V> {
> > >>                         Public K deserializeKey(byte[] key);
> > >>                         public V deserializeValue(byte[] value);
> > >>                 }
> > >>
> > >>         B. Make ProducerRecord and ConsumerRecord abstract class
> > >> implementing
> > >> Serializer <K, V> and Deserializer <K, V> respectively.
> > >>                 Public abstract class ProducerRecord <K, V> implements
> > >> Serializer <K, V>
> > >> {...}
> > >>                 Public abstract class ConsumerRecord <K, V> implements
> > >> Deserializer <K,
> > >> V> {...}
> > >>
> > >>         C. Instead of instantiate the serializer/Deserializer from
> > >>config,
> > >> let
> > >> concrete ProducerRecord/ConsumerRecord extends the abstract class and
> > >> override the serialize/deserialize methods.
> > >>
> > >>                 Public class AvroProducerRecord extends ProducerRecord
> > >> <String,
> > >> GenericRecord> {
> > >>                         ...
> > >>                         @Override
> > >>                         Public byte[] serializeKey(String key) {Š}
> > >>                         @Override
> > >>                         public byte[] serializeValue(GenericRecord
> > >>value);
> > >>                 }
> > >>
> > >>                 Public class AvroConsumerRecord extends ConsumerRecord
> > >> <String,
> > >> GenericRecord> {
> > >>                         ...
> > >>                         @Override
> > >>                         Public K deserializeKey(byte[] key) {Š}
> > >>                         @Override
> > >>                         public V deserializeValue(byte[] value);
> > >>                 }
> > >>
> > >>         D. The producer API changes to
> > >>                 Public class KafkaProducer {
> > >>                         ...
> > >>
> > >>                         Future<RecordMetadata> send (ProducerRecord
> <K,
> > >>V>
> > >> record) {
> > >>                                 ...
> > >>                                 K key =
> record.serializeKey(record.key);
> > >>                                 V value =
> > >> record.serializedValue(record.value);
> > >>                                 BytesProducerRecord
> bytesProducerRecord
> > >>=
> > >> new
> > >> BytesProducerRecord(topic, partition, key, value);
> > >>                                 ...
> > >>                         }
> > >>                         ...
> > >>                 }
> > >>
> > >>
> > >>
> > >> We also had some brainstorm in LinkedIn and here are the feedbacks:
> > >>
> > >> If the community decide to add the serialization back to new producer,
> > >> besides current proposal which changes new producer API to be a
> > >>template,
> > >> there are some other options raised during our discussion:
> > >>         1) Rather than change current new producer API, we can
> provide a
> > >> wrapper
> > >> of current new producer (e.g. KafkaSerializedProducer) and make it
> > >> available to users. As there is value in the simplicity of current
> API.
> > >>
> > >>         2) If we decide to go with tempalated new producer API,
> > >>according
> > >> to
> > >> experience in LinkedIn, it might worth considering to instantiate the
> > >> serializer in code instead of from config so we can avoid runtime
> errors
> > >> due to dynamic instantiation from config, which is more error prone.
> If
> > >> that is the case, the producer API could be changed to something like:
> > >>                 producer = new Producer<K, V>(KeySerializer<K>,
> > >> ValueSerializer<V>)
> > >>
> > >> --Jiangjie (Becket) Qin
> > >>
> > >>
> > >> On 11/24/14, 5:58 PM, "Jun Rao" <junrao@gmail.com> wrote:
> > >>
> > >> >Hi, Everyone,
> > >> >
> > >> >I'd like to start a discussion on whether it makes sense to add the
> > >> >serializer api back to the new java producer. Currently, the new java
> > >> >producer takes a byte array for both the key and the value. While
> this
> > >>api
> > >> >is simple, it pushes the serialization logic into the application.
> This
> > >> >makes it hard to reason about what type of data is being sent to
> Kafka
> > >>and
> > >> >also makes it hard to share an implementation of the serializer. For
> > >> >example, to support Avro, the serialization logic could be quite
> > >>involved
> > >> >since it might need to register the Avro schema in some remote
> registry
> > >> >and
> > >> >maintain a schema cache locally, etc. Without a serialization api,
> it's
> > >> >impossible to share such an implementation so that people can easily
> > >> >reuse.
> > >> >We sort of overlooked this implication during the initial discussion
> of
> > >> >the
> > >> >producer api.
> > >> >
> > >> >So, I'd like to propose an api change to the new producer by adding
> > >>back
> > >> >the serializer api similar to what we had in the old producer.
> > >>Specially,
> > >> >the proposed api changes are the following.
> > >> >
> > >> >First, we change KafkaProducer to take generic types K and V for the
> > >>key
> > >> >and the value, respectively.
> > >> >
> > >> >public class KafkaProducer<K,V> implements Producer<K,V>
{
> > >> >
> > >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
record,
> > >> >Callback
> > >> >callback);
> > >> >
> > >> >    public Future<RecordMetadata> send(ProducerRecord<K,V>
record);
> > >> >}
> > >> >
> > >> >Second, we add two new configs, one for the key serializer and
> another
> > >>for
> > >> >the value serializer. Both serializers will default to the byte array
> > >> >implementation.
> > >> >
> > >> >public class ProducerConfig extends AbstractConfig {
> > >> >
> > >> >    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > >>Importance.HIGH,
> > >> >KEY_SERIALIZER_CLASS_DOC)
> > >> >    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
> > >> >"org.apache.kafka.clients.producer.ByteArraySerializer",
> > >>Importance.HIGH,
> > >> >VALUE_SERIALIZER_CLASS_DOC);
> > >> >}
> > >> >
> > >> >Both serializers will implement the following interface.
> > >> >
> > >> >public interface Serializer<T> extends Configurable {
> > >> >    public byte[] serialize(String topic, T data, boolean isKey);
> > >> >
> > >> >    public void close();
> > >> >}
> > >> >
> > >> >This is more or less the same as what's in the old producer. The
> slight
> > >> >differences are (1) the serializer now only requires a parameter-less
> > >> >constructor; (2) the serializer has a configure() and a close()
> method
> > >>for
> > >> >initialization and cleanup, respectively; (3) the serialize() method
> > >> >additionally takes the topic and an isKey indicator, both of which
> are
> > >> >useful for things like schema registration.
> > >> >
> > >> >The detailed changes are included in KAFKA-1797. For completeness,
I
> > >>also
> > >> >made the corresponding changes for the new java consumer api as well.
> > >> >
> > >> >Note that the proposed api changes are incompatible with what's in
> the
> > >> >0.8.2 branch. However, if those api changes are beneficial, it's
> > >>probably
> > >> >better to include them now in the 0.8.2 release, rather than later.
> > >> >
> > >> >I'd like to discuss mainly two things in this thread.
> > >> >1. Do people feel that the proposed api changes are reasonable?
> > >> >2. Are there any concerns of including the api changes in the 0.8.2
> > >>final
> > >> >release?
> > >> >
> > >> >Thanks,
> > >> >
> > >> >Jun
> > >>
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message