kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brenden Cobb <Brenden.C...@humedica.com>
Subject Re: Kafka trouble with Avro encoding
Date Mon, 02 Dec 2013 23:28:03 GMT
Rob- Thanks so much. I've got progress with your suggestions. Hopefully
things will go more smoothly now :)

-Brenden

On 12/2/13 5:28 PM, "Robert Turner" <robair@bigfoot.com> wrote:

>Hi Brendan,
>
>I would try using <Integer, byte[]> rather than <String, Message> and
>setting:
>
>    props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>
>Cheers
>   Rob.
>
>
>On 2 December 2013 22:01, Brenden Cobb <Brenden.Cobb@humedica.com> wrote:
>
>> Hello - I am trying to understand my trouble passing an Avro message
>> through Kafka (0.8)
>> From what I see, the class tries to create an instance of the encoder
>>but
>> fails as it can not find the constructor, although it is there.
>>
>> Here's the code and subsequent error. Appreciate any help!
>>
>> Thank you,
>> Brenden
>> ----
>>
>> public class AvroProducer {
>>
>>     //    public final String zkConnection =
>> "tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181";
>>     public final String zkConnection = "localhost:2181";
>>     public final String brokerList = "localhost:9092, localhost:9093,
>> localhost:9094";
>>     public final String topic = "cdrTopic";
>>
>>     public static void main(String args[]){
>>         AvroProducer avroProducer = new AvroProducer();
>>         try {
>> //            avroProducer.testGenericRecord();
>>             avroProducer.sendCDRAvroMessage();
>>         } catch (Exception e) {
>>             e.printStackTrace();  //To change body of catch statement
>>use
>> File | Settings | File Templates.
>>         }
>>     }
>>
>>     private void sendCDRAvroMessage() throws IOException {
>>         User user1 = new User();
>>         user1.setName("Brenden");
>>         user1.setFavoriteNumber(256);
>>         Properties props = new Properties();
>>         props.put("zk.connect", zkConnection);
>>         props.put("metadata.broker.list", brokerList);
>> //        props.put("serializer.class",
>>"kafka.serializer.DefaultEncoder");
>>         props.put("serializer.class",
>> "org.apache.avro.io.BufferedBinaryEncoder");
>> //        props.put("serializer.class",
>> 
>>"org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumB
>>ackedKafkaEncoder");
>> //        props.put("serializer.class",
>> 
>>"org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBa
>>ckedKafkaEncoder");
>>
>> //        Producer<String, Message> producer = new Producer<String,
>> Message>(new ProducerConfig(props));
>>         Producer<String, Message> producer = new Producer<String,
>> Message>(new ProducerConfig(props));
>>
>>
>>         ByteArrayOutputStream out = new ByteArrayOutputStream();
>>         DatumWriter<User> userDatumWriter = new
>> SpecificDatumWriter<User>(User.class);
>> //        Encoder encoder = EncoderFactory.get().binaryEncoder(out,
>>null);
>> //        Encoder encoder =
>>EncoderFactory.get().directBinaryEncoder(out,
>> null);
>>         EncoderFactory factory = new
>> EncoderFactory().configureBufferSize(4096);
>>         Encoder encoder = factory.directBinaryEncoder(out, null);
>>
>>             userDatumWriter.write(user1, encoder);
>>             encoder.flush();
>>             out.close();
>>             Message message = new Message(out.toByteArray());
>>
>>         producer.send(new KeyedMessage<String, Message>(topic,
>>message));
>> //        producer.send(new KeyedMessage<String, Message>(topic, null,
>> message));
>>     }
>>
>> }
>>
>> ---
>> The Error stack:
>> ...
>> Message(magic = 0, attributes = 0, crc = 2755187525, key = null,
>>payload =
>> java.nio.HeapByteBuffer[pos=0 lim=12 cap=12])
>> Exception in thread "main" java.lang.NoSuchMethodException:
>> 
>>org.apache.avro.io.DirectBinaryEncoder.<init>(kafka.utils.VerifiablePrope
>>rties)
>>     at java.lang.Class.getConstructor0(Class.java:2810)
>>     at java.lang.Class.getConstructor(Class.java:1718)
>>     at kafka.utils.Utils$.createObject(Utils.scala:458)
>>     at kafka.producer.Producer.<init>(Producer.scala:60)
>>     at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
>>     at
>> 
>>com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProdu
>>cer.java:74)
>>     at
>> 
>>com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88
>>)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:57)
>>     at
>> 
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:606)
>>     at 
>>com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>>
>>
>
>
>-- 
>Cheers
>   Rob.


Mime
View raw message