kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dayong <will...@gmail.com>
Subject Re: Link read avro from Kafka Connect Issue
Date Thu, 03 Nov 2016 12:01:40 GMT
Not quite sure, will try to find out today.

Thanks,
Dayong

> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave" <Dave.Tauzell@surescripts.com> wrote:
> 
> Is Kafka connect adding some bytes to the beginning of the avro with the scheme registry
id?
> 
> Dave
> 
>> On Nov 2, 2016, at 18:43, Will Du <willddy@gmail.com> wrote:
>> 
>> By using the kafka-avro-console-consumer I am able to get rich message from kafka
connect with AvroConvert, but it got no output except schema from Flink
>> 
>> By using the producer with defaultEncoding, the kafka-avro-console-consumer throws
exceptions show how. But Flink consumer works. But my target is to get Flink costume avro
data produced by Kafka connect
>> 
>>> On Nov 2, 2016, at 7:36 PM, Will Du <willddy@gmail.com> wrote:
>>> 
>>> 
>>> On Nov 2, 2016, at 7:31 PM, Will Du <willddy@gmail.com <mailto:willddy@gmail.com>>
wrote:
>>> 
>>> Hi folks,
>>> I am trying to consume avro data from Kafka in Flink. The data is produced by
Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123>
used by Flink consumer. Then, I use following code to read it.
>>> 
>>> public static void main(String[] args) throws Exception {
>>>             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>             Properties properties = new Properties();
>>>             properties.setProperty("bootstrap.servers", “localhost:9092");
>>>             properties.setProperty("zookeeper.connect", “localhost:2181”);
>>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>>                                      + "\"type\": \"record\", "
>>>                                      + "\"fields\": "
>>>                                      +" [ "
>>>                                      + "  { \"name\": \"name\", \"type\": \"string\"
},"
>>>                                      + "  { \"name\": \"symbol\", \"type\": \"string\"
},"
>>>                                      + "  { \"name\": \"exchange\", \"type\":
\"string\"}"
>>>                                      + "] "
>>>                                      +"}");
>>> 
>>>             AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema);
>>>             FlinkKafkaConsumer09<GenericRecord> kafkaConsumer =
>>>                 new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties);
>>>             DataStream<GenericRecord> messageStream = env.addSource(kafkaConsumer);
>>>             messageStream.rebalance().print();
>>>             env.execute("Flink AVRO KAFKA Test");
>>> }
>>> 
>>> Once, I run the code, I am able to get the schema information only as follows.
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":”"}
>>> 
>>> Could anyone help to find out the issues why I cannot decode it?
>>> 
>>> Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c>
to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get
correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set
it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java
for instead?
>>> 
>>> Thanks, I’ll post this to the Flink user group as well.
>>> Will
>> 
> This e-mail and any files transmitted with it are confidential, may contain sensitive
information, and are intended solely for the use of the individual or entity to whom they
are addressed. If you have received this e-mail in error, please notify the sender by reply
e-mail immediately and destroy all copies of the e-mail and any attachments.

Mime
View raw message