kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ewen Cheslack-Postava <e...@confluent.io>
Subject Re: Apache Kafka integration using Apache Camel
Date Fri, 06 Jan 2017 04:57:31 GMT
More generally, do you have any log errors/messages or additional info?
It's tough to debug issues like this from 3rd party libraries if they don't
provide logs/exception info that indicates why processing a specific
message failed.

-Ewen

On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9794@gmail.com> wrote:

> Did you test that kafka console consumer is displaying the produced
> message?
>
> On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <Swati.Gupta@anz.com> wrote:
>
> > Hello All,
> >
> >
> >
> > I am trying to create a Consumer using Apache Camel for a topic in Apache
> > Kafka.
> > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > I have attached a file, KafkaCamelTestConsumer.java which is a standalone
> > application trying to read from a topic  “test1”created in Apache Kafka
> > I am producing messages from the console and also was successful to
> > produce messages using a Camel program in the topic "test1", but not able
> > to consume messages. Ideally, it should get printed, but nothing seems to
> > happen. The log says that the route has started but does not process any
> > message.
> >
> > Please help to confirm if there is anything wrong with the below syntax:
> >
> > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest"
> > *+
> >
> > *"&consumersCount=1&keyDeserializer=org.apache.
> kafka.common.serialization.StringDeserializer&"
> >                 *+
> > *"valueDeserializer=org.apache.kafka.common.serialization.
> StringDeserializer"
> >                 *+
> > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&
> autoCommitEnable=true"*
> > ).split()
> >                 .body()
> >                 .process(*new *Processor() {
> >                     @Override
> >                     *public void *process(Exchange exchange)
> >                             *throws *Exception {
> >                         String messageKey = *""*;
> >                         *if *(exchange.getIn() != *null*) {
> >                             Message message = exchange.getIn();
> >                             Integer partitionId = (Integer) message
> >                                     .getHeader(KafkaConstants.*
> PARTITION*
> > );
> >                             String topicName = (String) message
> >                                     .getHeader(KafkaConstants.*TOPIC*);
> >                             *if *(message.getHeader(
> KafkaConstants.*KEY*)
> > != *null*)
> >                                 messageKey = (String) message
> >                                         .getHeader(KafkaConstants.*
> KEY*);
> >                             Object data = message.getBody();
> >
> >
> >                             System.*out*.println(
> > *"topicName :: "                                     *+ topicName +
> > *" partitionId :: "                                     *+ partitionId +
> > *" messageKey :: "                                     *+ messageKey +
> > *" message :: "                                     *+ data +
> *"**\n**"*);
> >                         }
> >                     }
> >                 }).to(
> > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> >     }
> > });
> >
> >
> >
> > I have also tried with the basic parameters as below and it still fails
> to
> > read messages.
> >
> > from(
> > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest")*
> >
> > Any help on this will be greatly appreciated.
> >
> > Thanks in advance
> >
> >
> >
> > Thanks & Regards
> >
> > Swati
> >
> > ------------------------------
> > This e-mail and any attachments to it (the "Communication") is, unless
> > otherwise stated, confidential, may contain copyright material and is for
> > the use only of the intended recipient. If you receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete the
> > Communication and the return e-mail, and do not read, copy, retransmit or
> > otherwise deal with it. Any views expressed in the Communication are
> those
> > of the individual sender only, unless expressly stated to be those of
> > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited (together
> > "ANZ"). ANZ does not accept liability in connection with the integrity of
> > or errors in the Communication, computer virus, data corruption,
> > interference or delay arising from or in respect of the Communication.
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message