kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philippe Derome <phder...@gmail.com>
Subject Re: documenting Kafka Streams PageView examples
Date Thu, 07 Jul 2016 03:12:07 GMT
I think I should simply follow Kafka The Definitive Guide Chapter 3 for a
good Avro producer example instead. It does not introduce some Jackson JSON
layer and still provides the type safety using POJO generated classes from
Avro.

On Wed, Jul 6, 2016 at 9:20 PM, Philippe Derome <phderome@gmail.com> wrote:

> The typed version of the example (PageViewTypedDemo) is what represents
> some difficulty for someone new to Kafka (or CP3). *I think it would be
> easier/quicker to complete the documentation of that example than to answer
> my questions below....*
>
> I am reusing the same JsonPOJOSerializer and JsonPOJODeserializer classes
> as in the Kafka example for PageViewTypedDemo and that leads me into
> exceptions in Jackson JSON serialization. Could I be missing some
> configuration? In debugger, I see that Jackson does not seem to like the
> end of my data as it goes through my 11 fields apparently ok.
>
> On a processor similar to PageViewTypedDemo (mine with data more relevant
> to what I have in mind and I mask my AVRO generated POJO class with
> XXXXXXXX below), I write a producer in a separate app to target the
> processor. I get following stack trace for a producer that would match the
> producer (meaning using JsonPOJOSerializer to match the example that uses a
> JsonPOJODeserializer).
>
> Here's what I do:
> I set up normally Properties
> I set up normally on a different object Map<String, Object> with
> BootstrapServer and REGISTRY_URL
>
> final Serializer<XXXXXXXX> xxxXXXXXXXXSerializer = new JsonPOJOSerializer<>();
>
> propMap.put("JsonPOJOClass", XXXXXXXX.class);
>
> xxxXXXXXXXXSerializer.configure(propMap, false);   // *presumably* configures my POJO
class with value of message?
>
> final StringSerializer strSerializer = new StringSerializer();
>
> final KafkaProducer<String, XXXXXXXX > producer1 =
>
>     new KafkaProducer<>(propMap, strSerializer, xxxXXXXXXXXSerializer);
>
> for (String[] user : users) {
>
>     XXXXXXXX x = new XXXXXXXX();
>
>     x.setPropertyA("something); // and many other properties/fields like this
>
>     producer1.send(new ProducerRecord<>("input-topic", user[1], x));
>
> }
>
> producer1.flush();
>
>
> Exception in thread "main"
> org.apache.kafka.common.errors.SerializationException: Error serializing
> JSON message
>
> Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an
> enum:
> {"type":"record","name":"XXXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
> (through reference chain:
> io.confluent.examples.streams.avro.XXXXXXXXX["schema"]->org.apache.avro.RecordSchema["enumSymbols"])
>
> at
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:210)
>
> at
> com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:177)
>
> at
> com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:190)
>
> at
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:674)
>
> at
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
>
> at
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)
>
> at
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
>
> at
> com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
>
> at
> com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:129)
>
> at
> com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3387)
>
> * at
> com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2805)*
>
> * at
> io.confluent.examples.streams.JsonPOJOSerializer.serialize(JsonPOJOSerializer.java:50)*
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
>
> at
> io.confluent.examples.streams.XXXExampleTypedProducer.produceInputs(XXXExampleTypedProducer.java:122)
>
> at
> io.confluent.examples.streams.XXXExampleTypedProducer.main(XXXExampleTypedProducer.java:51)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
> Caused by: org.apache.avro.AvroRuntimeException: Not an enum:
> {"type":"record","name":"XXXXXXXX","namespace":"io.confluent.examples.streams.avro","fields":[{"name":"firstName","type":{"type":"string","avro.java.string":"String"}},{"name":"lastName","type":{"type":"string","avro.java.string":"String"}},{"name":"companyName","type":{"type":"string","avro.java.string":"String"}},{"name":"address","type":{"type":"string","avro.java.string":"String"}},{"name":"city","type":{"type":"string","avro.java.string":"String"}},{"name":"province","type":{"type":"string","avro.java.string":"String"}},{"name":"postal","type":{"type":"string","avro.java.string":"String"}},{"name":"phone1","type":{"type":"string","avro.java.string":"String"}},{"name":"phone2","type":{"type":"string","avro.java.string":"String"}},{"name":"email","type":{"type":"string","avro.java.string":"String"}},{"name":"web","type":{"type":"string","avro.java.string":"String"}}]}
>
> at org.apache.avro.Schema.getEnumSymbols(Schema.java:206)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:536)
>
> at
> com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
>
> ... 18 more
>
>
> On Wed, Jul 6, 2016 at 8:18 AM, Philippe Derome <phderome@gmail.com>
> wrote:
>
>> yes, it's a very similar example and I am interested in the Kafka one for
>> the serialization aspect of it, which is a bit richer than on Confluent's...
>>
>> On Wed, Jul 6, 2016 at 5:35 AM, Michael Noll <michael@confluent.io>
>> wrote:
>>
>>> Correction:  Just realized that I misread your message.  You are indeed
>>> referring to the code examples in Apache Kafka. ;-)
>>>
>>> On Wed, Jul 6, 2016 at 11:35 AM, Michael Noll <michael@confluent.io>
>>> wrote:
>>>
>>> > Phil,
>>> >
>>> > I suggest to ask this question in the Confluent Platform mailing list
>>> > because you're referring to code under
>>> > https://github.com/confluentinc/examples (i.e. code that is not part
>>> of
>>> > the Apache Kafka project).
>>> >
>>> > Best,
>>> > Michael
>>> >
>>> >
>>> > On Tue, Jul 5, 2016 at 5:34 PM, Philippe Derome <phderome@gmail.com>
>>> > wrote:
>>> >
>>> >> Would anyone with a good understanding of serialization be available
>>> to
>>> >> enhance documentation of the Apache Streams examples? I mean
>>> specifically:
>>> >> PageViewTypedDemo, PageViewUntypedDemo in package org
>>> >> .apache.kafka.streams.examples.pageview
>>> >>
>>> >> I'd be happy to run them with Confluent Platform 3 producer/consumer
>>> shell
>>> >> scripts but would need guidance as to how to invoke them and how to
>>> >> specify
>>> >> some input file (or stdin format).
>>> >>
>>> >> That would help me better understand how to get serialization and
>>> streams
>>> >> to work together.
>>> >>
>>> >> Thanks,
>>> >>
>>> >> Phil Derome
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > Best regards,
>>> > Michael Noll
>>> >
>>> >
>>> >
>>> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
>>> > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
>>> > www.confluent.io/download <http://www.confluent.io/download>*
>>> >
>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Michael Noll
>>>
>>>
>>>
>>> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
>>> Apache Kafka and Confluent Platform: www.confluent.io/download
>>> <http://www.confluent.io/download>*
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message