kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kamal C <kamaltar...@gmail.com>
Subject Re: Apache Kafka integration using Apache Camel
Date Mon, 09 Jan 2017 12:20:12 GMT
Can you enable DEBUG logs ? It'll be helpful to debug.

-- Kamal

On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati <Swati.Gupta@anz.com> wrote:

> Hello All,
>
> Any help on this would be appreciated.
> There seems to be no error. Does it look like a version issue?
>
> I have updated my pom.xml with the below:
>                 <dependency>
>                         <groupId>org.springframework.kafka</groupId>
>                         <artifactId>spring-kafka</artifactId>
>                         <version>1.1.2.BUILD-SNAPSHOT</version>
>                 </dependency>
>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>             <artifactId>camel-kafka</artifactId>
>             <version>2.17.0</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka-clients</artifactId>
>             <version>0.10.1.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.11</artifactId>
>             <version>0.10.1.0</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.camel</groupId>
>             <artifactId>camel-core</artifactId>
>             <version>2.17.0</version>
>         </dependency>
>
> Thanks & Regards
> Swati
>
> -----Original Message-----
> From: Gupta, Swati [mailto:Swati.Gupta@anz.com]
> Sent: Friday, 6 January 2017 4:01 PM
> To: users@kafka.apache.org
> Subject: RE: Apache Kafka integration using Apache Camel
>
> Yes, the kafka console consumer displays the message correctly.
> I also tested the same with a Java application, it works fine. There seems
> to be an issue with Camel route trying to consume.
>
> There is no error in the console. But, the logs show as below:
> kafka.KafkaCamelTestConsumer
> Connected to the target VM, address: '127.0.0.1:65007', transport:
> 'socket'
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0
> (CamelContext: camel-1) is starting
> PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
> PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
> PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint
> registry is in extended mode gathering usage statistics of all incoming and
> outgoing endpoints (cache limit: 1000)
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is
> enabled. If access to the original message is not needed, then its
> recommended to turn this option off as it may improve performance.
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in use.
> If using streams then its recommended to enable stream caching. See more
> details at http://camel.apache.org/stream-caching.html
> PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
> PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = [localhost:9092]
>         check.crcs = true
>         client.id =
>         connections.max.idle.ms = 540000
>         enable.auto.commit = true
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1024
>         group.id = testing
>         heartbeat.interval.ms = 3000
>         interceptor.classes = null
>         key.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 500
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>         receive.buffer.bytes = 32768
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 40000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         session.timeout.ms = 30000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>
> PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = [localhost:9092]
>         check.crcs = true
>         client.id = consumer-1
>         connections.max.idle.ms = 540000
>         enable.auto.commit = true
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1024
>         group.id = testing
>         heartbeat.interval.ms = 3000
>         interceptor.classes = null
>         key.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 500
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>         receive.buffer.bytes = 32768
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 40000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         session.timeout.ms = 30000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>
> PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka version : 0.10.1.0
> PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka commitId : 3402a74efb23d1d4
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - Route: route1 started and
> consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest&
> consumersCount=1&groupId=testing&topic=test]
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - Total 1 routes, of which 1
> are started.
> PID_IS_UNDEFINED: INFO  DefaultCamelContext
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> Sent: Friday, 6 January 2017 3:58 PM
> To: users@kafka.apache.org
> Subject: Re: Apache Kafka integration using Apache Camel
>
> More generally, do you have any log errors/messages or additional info?
> It's tough to debug issues like this from 3rd party libraries if they
> don't provide logs/exception info that indicates why processing a specific
> message failed.
>
> -Ewen
>
> On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9794@gmail.com>
> wrote:
>
> > Did you test that kafka console consumer is displaying the produced
> > message?
> >
> > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <Swati.Gupta@anz.com>
> wrote:
> >
> > > Hello All,
> > >
> > >
> > >
> > > I am trying to create a Consumer using Apache Camel for a topic in
> > > Apache Kafka.
> > > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > > I have attached a file, KafkaCamelTestConsumer.java which is a
> > > standalone application trying to read from a topic  “test1”created
> > > in Apache Kafka I am producing messages from the console and also
> > > was successful to produce messages using a Camel program in the
> > > topic "test1", but not able to consume messages. Ideally, it should
> > > get printed, but nothing seems to happen. The log says that the
> > > route has started but does not process any message.
> > >
> > > Please help to confirm if there is anything wrong with the below
> syntax:
> > >
> > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> > autoOffsetReset=earliest"
> > > *+
> > >
> > > *"&consumersCount=1&keyDeserializer=org.apache.
> > kafka.common.serialization.StringDeserializer&"
> > >                 *+
> > > *"valueDeserializer=org.apache.kafka.common.serialization.
> > StringDeserializer"
> > >                 *+
> > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&
> > autoCommitEnable=true"*
> > > ).split()
> > >                 .body()
> > >                 .process(*new *Processor() {
> > >                     @Override
> > >                     *public void *process(Exchange exchange)
> > >                             *throws *Exception {
> > >                         String messageKey = *""*;
> > >                         *if *(exchange.getIn() != *null*) {
> > >                             Message message = exchange.getIn();
> > >                             Integer partitionId = (Integer) message
> > >                                     .getHeader(KafkaConstants.*
> > PARTITION*
> > > );
> > >                             String topicName = (String) message
> > >                                     .getHeader(KafkaConstants.*
> TOPIC*);
> > >                             *if *(message.getHeader(
> > KafkaConstants.*KEY*)
> > > != *null*)
> > >                                 messageKey = (String) message
> > >                                         .getHeader(KafkaConstants.*
> > KEY*);
> > >                             Object data = message.getBody();
> > >
> > >
> > >                             System.*out*.println(
> > > *"topicName :: "                                     *+ topicName +
> > > *" partitionId :: "                                     *+ partitionId
> +
> > > *" messageKey :: "                                     *+ messageKey +
> > > *" message :: "                                     *+ data +
> > *"**\n**"*);
> > >                         }
> > >                     }
> > >                 }).to(
> > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> > >     }
> > > });
> > >
> > >
> > >
> > > I have also tried with the basic parameters as below and it still
> > > fails
> > to
> > > read messages.
> > >
> > > from(
> > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> > autoOffsetReset=earliest")*
> > >
> > > Any help on this will be greatly appreciated.
> > >
> > > Thanks in advance
> > >
> > >
> > >
> > > Thanks & Regards
> > >
> > > Swati
> > >
> > > ------------------------------
> > > This e-mail and any attachments to it (the "Communication") is,
> > > unless otherwise stated, confidential, may contain copyright
> > > material and is for the use only of the intended recipient. If you
> > > receive the Communication
> > in
> > > error, please notify the sender immediately by return e-mail, delete
> > > the Communication and the return e-mail, and do not read, copy,
> > > retransmit or otherwise deal with it. Any views expressed in the
> > > Communication are
> > those
> > > of the individual sender only, unless expressly stated to be those
> > > of Australia and New Zealand Banking Group Limited ABN 11 005 357
> > > 522, or
> > any
> > > of its related entities including ANZ Bank New Zealand Limited
> > > (together "ANZ"). ANZ does not accept liability in connection with
> > > the integrity of or errors in the Communication, computer virus,
> > > data corruption, interference or delay arising from or in respect of
> the Communication.
> > >
> > >
> >
>
>
> This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication.
>
>
> This e-mail and any attachments to it (the "Communication") is, unless
> otherwise stated, confidential, may contain copyright material and is for
> the use only of the intended recipient. If you receive the Communication in
> error, please notify the sender immediately by return e-mail, delete the
> Communication and the return e-mail, and do not read, copy, retransmit or
> otherwise deal with it. Any views expressed in the Communication are those
> of the individual sender only, unless expressly stated to be those of
> Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or any
> of its related entities including ANZ Bank New Zealand Limited (together
> "ANZ"). ANZ does not accept liability in connection with the integrity of
> or errors in the Communication, computer virus, data corruption,
> interference or delay arising from or in respect of the Communication.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message