Hi Brendan,
I would try using <Integer, byte[]> rather than <String, Message> and
setting:
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
Cheers
Rob.
On 2 December 2013 22:01, Brenden Cobb <Brenden.Cobb@humedica.com> wrote:
> Hello - I am trying to understand my trouble passing an Avro message
> through Kafka (0.8)
> From what I see, the class tries to create an instance of the encoder but
> fails as it can not find the constructor, although it is there.
>
> Here's the code and subsequent error. Appreciate any help!
>
> Thank you,
> Brenden
> ----
>
> public class AvroProducer {
>
> // public final String zkConnection =
> "tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181";
> public final String zkConnection = "localhost:2181";
> public final String brokerList = "localhost:9092, localhost:9093,
> localhost:9094";
> public final String topic = "cdrTopic";
>
> public static void main(String args[]){
> AvroProducer avroProducer = new AvroProducer();
> try {
> // avroProducer.testGenericRecord();
> avroProducer.sendCDRAvroMessage();
> } catch (Exception e) {
> e.printStackTrace(); //To change body of catch statement use
> File | Settings | File Templates.
> }
> }
>
> private void sendCDRAvroMessage() throws IOException {
> User user1 = new User();
> user1.setName("Brenden");
> user1.setFavoriteNumber(256);
> Properties props = new Properties();
> props.put("zk.connect", zkConnection);
> props.put("metadata.broker.list", brokerList);
> // props.put("serializer.class", "kafka.serializer.DefaultEncoder");
> props.put("serializer.class",
> "org.apache.avro.io.BufferedBinaryEncoder");
> // props.put("serializer.class",
> "org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder");
> // props.put("serializer.class",
> "org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder");
>
> // Producer<String, Message> producer = new Producer<String,
> Message>(new ProducerConfig(props));
> Producer<String, Message> producer = new Producer<String,
> Message>(new ProducerConfig(props));
>
>
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter<User> userDatumWriter = new
> SpecificDatumWriter<User>(User.class);
> // Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
> // Encoder encoder = EncoderFactory.get().directBinaryEncoder(out,
> null);
> EncoderFactory factory = new
> EncoderFactory().configureBufferSize(4096);
> Encoder encoder = factory.directBinaryEncoder(out, null);
>
> userDatumWriter.write(user1, encoder);
> encoder.flush();
> out.close();
> Message message = new Message(out.toByteArray());
>
> producer.send(new KeyedMessage<String, Message>(topic, message));
> // producer.send(new KeyedMessage<String, Message>(topic, null,
> message));
> }
>
> }
>
> ---
> The Error stack:
> ...
> Message(magic = 0, attributes = 0, crc = 2755187525, key = null, payload =
> java.nio.HeapByteBuffer[pos=0 lim=12 cap=12])
> Exception in thread "main" java.lang.NoSuchMethodException:
> org.apache.avro.io.DirectBinaryEncoder.<init>(kafka.utils.VerifiableProperties)
> at java.lang.Class.getConstructor0(Class.java:2810)
> at java.lang.Class.getConstructor(Class.java:1718)
> at kafka.utils.Utils$.createObject(Utils.scala:458)
> at kafka.producer.Producer.<init>(Producer.scala:60)
> at kafka.javaapi.producer.Producer.<init>(Producer.scala:25)
> at
> com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProducer.java:74)
> at
> com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>
>
--
Cheers
Rob.
|