kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gupta, Swati" <Swati.Gu...@anz.com>
Subject RE: Apache Kafka integration using Apache Camel
Date Fri, 06 Jan 2017 05:01:20 GMT
Yes, the kafka console consumer displays the message correctly.
I also tested the same with a Java application, it works fine. There seems to be an issue
with Camel route trying to consume.

There is no error in the console. But, the logs show as below:
kafka.KafkaCamelTestConsumer
Connected to the target VM, address: '127.0.0.1:65007', transport: 'socket'
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0 (CamelContext: camel-1)
is starting
PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended
mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is enabled. If access
to the original message is not needed, then its recommended to turn this option off as it
may improve performance.
PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in use. If using streams
then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1024
        group.id = testing
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 30000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = consumer-1
        connections.max.idle.ms = 540000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1024
        group.id = testing
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 32768
        reconnect.backoff.ms = 50
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 30000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka version : 0.10.1.0
PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka commitId : 3402a74efb23d1d4
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Route: route1 started and consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest&consumersCount=1&groupId=testing&topic=test]
PID_IS_UNDEFINED: INFO  DefaultCamelContext - Total 1 routes, of which 1 are started.
PID_IS_UNDEFINED: INFO  DefaultCamelContext

-----Original Message-----
From: Ewen Cheslack-Postava [mailto:ewen@confluent.io] 
Sent: Friday, 6 January 2017 3:58 PM
To: users@kafka.apache.org
Subject: Re: Apache Kafka integration using Apache Camel

More generally, do you have any log errors/messages or additional info?
It's tough to debug issues like this from 3rd party libraries if they don't provide logs/exception
info that indicates why processing a specific message failed.

-Ewen

On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9794@gmail.com> wrote:

> Did you test that kafka console consumer is displaying the produced 
> message?
>
> On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <Swati.Gupta@anz.com> wrote:
>
> > Hello All,
> >
> >
> >
> > I am trying to create a Consumer using Apache Camel for a topic in 
> > Apache Kafka.
> > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > I have attached a file, KafkaCamelTestConsumer.java which is a 
> > standalone application trying to read from a topic  “test1”created 
> > in Apache Kafka I am producing messages from the console and also 
> > was successful to produce messages using a Camel program in the 
> > topic "test1", but not able to consume messages. Ideally, it should 
> > get printed, but nothing seems to happen. The log says that the 
> > route has started but does not process any message.
> >
> > Please help to confirm if there is anything wrong with the below syntax:
> >
> > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest"
> > *+
> >
> > *"&consumersCount=1&keyDeserializer=org.apache.
> kafka.common.serialization.StringDeserializer&"
> >                 *+
> > *"valueDeserializer=org.apache.kafka.common.serialization.
> StringDeserializer"
> >                 *+
> > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&
> autoCommitEnable=true"*
> > ).split()
> >                 .body()
> >                 .process(*new *Processor() {
> >                     @Override
> >                     *public void *process(Exchange exchange)
> >                             *throws *Exception {
> >                         String messageKey = *""*;
> >                         *if *(exchange.getIn() != *null*) {
> >                             Message message = exchange.getIn();
> >                             Integer partitionId = (Integer) message
> >                                     .getHeader(KafkaConstants.*
> PARTITION*
> > );
> >                             String topicName = (String) message
> >                                     .getHeader(KafkaConstants.*TOPIC*);
> >                             *if *(message.getHeader(
> KafkaConstants.*KEY*)
> > != *null*)
> >                                 messageKey = (String) message
> >                                         .getHeader(KafkaConstants.*
> KEY*);
> >                             Object data = message.getBody();
> >
> >
> >                             System.*out*.println(
> > *"topicName :: "                                     *+ topicName +
> > *" partitionId :: "                                     *+ partitionId +
> > *" messageKey :: "                                     *+ messageKey +
> > *" message :: "                                     *+ data +
> *"**\n**"*);
> >                         }
> >                     }
> >                 }).to(
> > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> >     }
> > });
> >
> >
> >
> > I have also tried with the basic parameters as below and it still 
> > fails
> to
> > read messages.
> >
> > from(
> > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> autoOffsetReset=earliest")*
> >
> > Any help on this will be greatly appreciated.
> >
> > Thanks in advance
> >
> >
> >
> > Thanks & Regards
> >
> > Swati
> >
> > ------------------------------
> > This e-mail and any attachments to it (the "Communication") is, 
> > unless otherwise stated, confidential, may contain copyright 
> > material and is for the use only of the intended recipient. If you 
> > receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete 
> > the Communication and the return e-mail, and do not read, copy, 
> > retransmit or otherwise deal with it. Any views expressed in the 
> > Communication are
> those
> > of the individual sender only, unless expressly stated to be those 
> > of Australia and New Zealand Banking Group Limited ABN 11 005 357 
> > 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited 
> > (together "ANZ"). ANZ does not accept liability in connection with 
> > the integrity of or errors in the Communication, computer virus, 
> > data corruption, interference or delay arising from or in respect of the Communication.
> >
> >
>


This e-mail and any attachments to it (the "Communication") is, unless otherwise stated, confidential,
may contain copyright material and is for the use only of the intended recipient. If you receive
the Communication in error, please notify the sender immediately by return e-mail, delete
the Communication and the return e-mail, and do not read, copy, retransmit or otherwise deal
with it. Any views expressed in the Communication are those of the individual sender only,
unless expressly stated to be those of Australia and New Zealand Banking Group Limited ABN
11 005 357 522, or any of its related entities including ANZ Bank New Zealand Limited (together
"ANZ"). ANZ does not accept liability in connection with the integrity of or errors in the
Communication, computer virus, data corruption, interference or delay arising from or in respect
of the Communication.
Mime
View raw message