kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Turner <rob...@bigfoot.com>
Subject Re: Kafka trouble with Avro encoding
Date Mon, 02 Dec 2013 22:28:26 GMT
Hi Brendan,

I would try using <Integer, byte[]> rather than <String, Message> and
setting:

    props.put("serializer.class", "kafka.serializer.DefaultEncoder");

Cheers
   Rob.


On 2 December 2013 22:01, Brenden Cobb <Brenden.Cobb@humedica.com> wrote:

> Hello - I am trying to understand my trouble passing an Avro message
> through Kafka (0.8)
> From what I see, the class tries to create an instance of the encoder but
> fails as it can not find the constructor, although it is there.
>
> Here's the code and subsequent error. Appreciate any help!
>
> Thank you,
> Brenden
> ----
>
> public class AvroProducer {
>
>     //    public final String zkConnection =
> "tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181";
>     public final String zkConnection = "localhost:2181";
>     public final String brokerList = "localhost:9092, localhost:9093,
> localhost:9094";
>     public final String topic = "cdrTopic";
>
>     public static void main(String args[]){
>         AvroProducer avroProducer = new AvroProducer();
>         try {
> //            avroProducer.testGenericRecord();
>             avroProducer.sendCDRAvroMessage();
>         } catch (Exception e) {
>             e.printStackTrace();  //To change body of catch statement use
> File | Settings | File Templates.
>         }
>     }
>
>     private void sendCDRAvroMessage() throws IOException {
>         User user1 = new User();
>         user1.setName("Brenden");
>         user1.setFavoriteNumber(256);
>         Properties props = new Properties();
>         props.put("zk.connect", zkConnection);
>         props.put("metadata.broker.list", brokerList);
> //        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>         props.put("serializer.class",
> "org.apache.avro.io.BufferedBinaryEncoder");
> //        props.put("serializer.class",
> "org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder");
> //        props.put("serializer.class",
> "org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder");
>
> //        Producer<String, Message> producer = new Producer<String,
> Message>(new ProducerConfig(props));
>         Producer<String, Message> producer = new Producer<String,
> Message>(new ProducerConfig(props));
>
>
>         ByteArrayOutputStream out = new ByteArrayOutputStream();
>         DatumWriter<User> userDatumWriter = new
> SpecificDatumWriter<User>(User.class);
> //        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
> //        Encoder encoder = EncoderFactory.get().directBinaryEncoder(out,
> null);
>         EncoderFactory factory = new
> EncoderFactory().configureBufferSize(4096);
>         Encoder encoder = factory.directBinaryEncoder(out, null);
>
>             userDatumWriter.write(user1, encoder);
>             encoder.flush();
>             out.close();
>             Message message = new Message(out.toByteArray());
>
>         producer.send(new KeyedMessage<String, Message>(topic, message));
> //        producer.send(new KeyedMessage<String, Message>(topic, null,
> message));
>     }
>
> }
>
> ---
> The Error stack:
> ...
> Message(magic = 0, attributes = 0, crc = 2755187525, key = null, payload =
> java.nio.HeapByteBuffer[pos=0 lim=12 cap=12])
> Exception in thread "main" java.lang.NoSuchMethodException:
> org.apache.avro.io.DirectBinaryEncoder.<init>(kafka.utils.VerifiableProperties)
>     at java.lang.Class.getConstructor0(Class.java:2810)
>     at java.lang.Class.getConstructor(Class.java:1718)
>     at kafka.utils.Utils$.createObject(Utils.scala:458)
>     at kafka.producer.Producer.<init>(Producer.scala:60)
>     at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
>     at
> com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProducer.java:74)
>     at
> com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>
>


-- 
Cheers
   Rob.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message