kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Asaf Mesika <asaf.mes...@gmail.com>
Subject Re: Apache Kafka integration using Apache Camel
Date Thu, 12 Jan 2017 05:02:25 GMT
Don't specify Kafka dependencies. Camel will transitively bring it.
Otherwise you are causing version conflict.
On Mon, 9 Jan 2017 at 14:20 Kamal C <kamaltarget@gmail.com> wrote:

> Can you enable DEBUG logs ? It'll be helpful to debug.
>
> -- Kamal
>
> On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati <Swati.Gupta@anz.com> wrote:
>
> > Hello All,
> >
> > Any help on this would be appreciated.
> > There seems to be no error. Does it look like a version issue?
> >
> > I have updated my pom.xml with the below:
> >                 <dependency>
> >                         <groupId>org.springframework.kafka</groupId>
> >                         <artifactId>spring-kafka</artifactId>
> >                         <version>1.1.2.BUILD-SNAPSHOT</version>
> >                 </dependency>
> >
> >         <dependency>
> >             <groupId>org.apache.camel</groupId>
> >             <artifactId>camel-kafka</artifactId>
> >             <version>2.17.0</version>
> >         </dependency>
> >
> >         <dependency>
> >             <groupId>org.apache.kafka</groupId>
> >             <artifactId>kafka-clients</artifactId>
> >             <version>0.10.1.0</version>
> >         </dependency>
> >         <dependency>
> >             <groupId>org.apache.kafka</groupId>
> >             <artifactId>kafka_2.11</artifactId>
> >             <version>0.10.1.0</version>
> >         </dependency>
> >
> >         <dependency>
> >             <groupId>org.apache.camel</groupId>
> >             <artifactId>camel-core</artifactId>
> >             <version>2.17.0</version>
> >         </dependency>
> >
> > Thanks & Regards
> > Swati
> >
> > -----Original Message-----
> > From: Gupta, Swati [mailto:Swati.Gupta@anz.com]
> > Sent: Friday, 6 January 2017 4:01 PM
> > To: users@kafka.apache.org
> > Subject: RE: Apache Kafka integration using Apache Camel
> >
> > Yes, the kafka console consumer displays the message correctly.
> > I also tested the same with a Java application, it works fine. There
> seems
> > to be an issue with Camel route trying to consume.
> >
> > There is no error in the console. But, the logs show as below:
> > kafka.KafkaCamelTestConsumer
> > Connected to the target VM, address: '127.0.0.1:65007', transport:
> > 'socket'
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0
> > (CamelContext: camel-1) is starting
> > PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
> > PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
> > PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint
> > registry is in extended mode gathering usage statistics of all incoming
> and
> > outgoing endpoints (cache limit: 1000)
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is
> > enabled. If access to the original message is not needed, then its
> > recommended to turn this option off as it may improve performance.
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in
> use.
> > If using streams then its recommended to enable stream caching. See more
> > details at http://camel.apache.org/stream-caching.html
> > PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
> > PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
> >         auto.commit.interval.ms = 5000
> >         auto.offset.reset = earliest
> >         bootstrap.servers = [localhost:9092]
> >         check.crcs = true
> >         client.id =
> >         connections.max.idle.ms = 540000
> >         enable.auto.commit = true
> >         exclude.internal.topics = true
> >         fetch.max.bytes = 52428800
> >         fetch.max.wait.ms = 500
> >         fetch.min.bytes = 1024
> >         group.id = testing
> >         heartbeat.interval.ms = 3000
> >         interceptor.classes = null
> >         key.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> >         max.partition.fetch.bytes = 1048576
> >         max.poll.interval.ms = 300000
> >         max.poll.records = 500
> >         metadata.max.age.ms = 300000
> >         metric.reporters = []
> >         metrics.num.samples = 2
> >         metrics.sample.window.ms = 30000
> >         partition.assignment.strategy = [org.apache.kafka.clients.
> > consumer.RangeAssignor]
> >         receive.buffer.bytes = 32768
> >         reconnect.backoff.ms = 50
> >         request.timeout.ms = 40000
> >         retry.backoff.ms = 100
> >         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >         sasl.kerberos.min.time.before.relogin = 60000
> >         sasl.kerberos.service.name = null
> >         sasl.kerberos.ticket.renew.jitter = 0.05
> >         sasl.kerberos.ticket.renew.window.factor = 0.8
> >         sasl.mechanism = GSSAPI
> >         security.protocol = PLAINTEXT
> >         send.buffer.bytes = 131072
> >         session.timeout.ms = 30000
> >         ssl.cipher.suites = null
> >         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >         ssl.endpoint.identification.algorithm = null
> >         ssl.key.password = null
> >         ssl.keymanager.algorithm = SunX509
> >         ssl.keystore.location = null
> >         ssl.keystore.password = null
> >         ssl.keystore.type = JKS
> >         ssl.protocol = TLS
> >         ssl.provider = null
> >         ssl.secure.random.implementation = null
> >         ssl.trustmanager.algorithm = PKIX
> >         ssl.truststore.location = null
> >         ssl.truststore.password = null
> >         ssl.truststore.type = JKS
> >         value.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> >
> > PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
> >         auto.commit.interval.ms = 5000
> >         auto.offset.reset = earliest
> >         bootstrap.servers = [localhost:9092]
> >         check.crcs = true
> >         client.id = consumer-1
> >         connections.max.idle.ms = 540000
> >         enable.auto.commit = true
> >         exclude.internal.topics = true
> >         fetch.max.bytes = 52428800
> >         fetch.max.wait.ms = 500
> >         fetch.min.bytes = 1024
> >         group.id = testing
> >         heartbeat.interval.ms = 3000
> >         interceptor.classes = null
> >         key.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> >         max.partition.fetch.bytes = 1048576
> >         max.poll.interval.ms = 300000
> >         max.poll.records = 500
> >         metadata.max.age.ms = 300000
> >         metric.reporters = []
> >         metrics.num.samples = 2
> >         metrics.sample.window.ms = 30000
> >         partition.assignment.strategy = [org.apache.kafka.clients.
> > consumer.RangeAssignor]
> >         receive.buffer.bytes = 32768
> >         reconnect.backoff.ms = 50
> >         request.timeout.ms = 40000
> >         retry.backoff.ms = 100
> >         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >         sasl.kerberos.min.time.before.relogin = 60000
> >         sasl.kerberos.service.name = null
> >         sasl.kerberos.ticket.renew.jitter = 0.05
> >         sasl.kerberos.ticket.renew.window.factor = 0.8
> >         sasl.mechanism = GSSAPI
> >         security.protocol = PLAINTEXT
> >         send.buffer.bytes = 131072
> >         session.timeout.ms = 30000
> >         ssl.cipher.suites = null
> >         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >         ssl.endpoint.identification.algorithm = null
> >         ssl.key.password = null
> >         ssl.keymanager.algorithm = SunX509
> >         ssl.keystore.location = null
> >         ssl.keystore.password = null
> >         ssl.keystore.type = JKS
> >         ssl.protocol = TLS
> >         ssl.provider = null
> >         ssl.secure.random.implementation = null
> >         ssl.trustmanager.algorithm = PKIX
> >         ssl.truststore.location = null
> >         ssl.truststore.password = null
> >         ssl.truststore.type = JKS
> >         value.deserializer = class org.apache.kafka.common.serialization.
> > StringDeserializer
> >
> > PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka version : 0.10.1.0
> > PID_IS_UNDEFINED: INFO  AppInfoParser - Kafka commitId : 3402a74efb23d1d4
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - Route: route1 started and
> > consuming from: Endpoint[kafka://localhost:9092?autoOffsetReset=earliest&
> > consumersCount=1&groupId=testing&topic=test]
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext - Total 1 routes, of which 1
> > are started.
> > PID_IS_UNDEFINED: INFO  DefaultCamelContext
> >
> > -----Original Message-----
> > From: Ewen Cheslack-Postava [mailto:ewen@confluent.io]
> > Sent: Friday, 6 January 2017 3:58 PM
> > To: users@kafka.apache.org
> > Subject: Re: Apache Kafka integration using Apache Camel
> >
> > More generally, do you have any log errors/messages or additional info?
> > It's tough to debug issues like this from 3rd party libraries if they
> > don't provide logs/exception info that indicates why processing a
> specific
> > message failed.
> >
> > -Ewen
> >
> > On Thu, Jan 5, 2017 at 8:29 PM, UMESH CHAUDHARY <umesh9794@gmail.com>
> > wrote:
> >
> > > Did you test that kafka console consumer is displaying the produced
> > > message?
> > >
> > > On Fri, Jan 6, 2017 at 9:18 AM, Gupta, Swati <Swati.Gupta@anz.com>
> > wrote:
> > >
> > > > Hello All,
> > > >
> > > >
> > > >
> > > > I am trying to create a Consumer using Apache Camel for a topic in
> > > > Apache Kafka.
> > > > I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
> > > > I have attached a file, KafkaCamelTestConsumer.java which is a
> > > > standalone application trying to read from a topic  “test1”created
> > > > in Apache Kafka I am producing messages from the console and also
> > > > was successful to produce messages using a Camel program in the
> > > > topic "test1", but not able to consume messages. Ideally, it should
> > > > get printed, but nothing seems to happen. The log says that the
> > > > route has started but does not process any message.
> > > >
> > > > Please help to confirm if there is anything wrong with the below
> > syntax:
> > > >
> > > > from(*"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> > > autoOffsetReset=earliest"
> > > > *+
> > > >
> > > > *"&consumersCount=1&keyDeserializer=org.apache.
> > > kafka.common.serialization.StringDeserializer&"
> > > >                 *+
> > > > *"valueDeserializer=org.apache.kafka.common.serialization.
> > > StringDeserializer"
> > > >                 *+
> > > > *"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&
> > > autoCommitEnable=true"*
> > > > ).split()
> > > >                 .body()
> > > >                 .process(*new *Processor() {
> > > >                     @Override
> > > >                     *public void *process(Exchange exchange)
> > > >                             *throws *Exception {
> > > >                         String messageKey = *""*;
> > > >                         *if *(exchange.getIn() != *null*) {
> > > >                             Message message = exchange.getIn();
> > > >                             Integer partitionId = (Integer) message
> > > >                                     .getHeader(KafkaConstants.*
> > > PARTITION*
> > > > );
> > > >                             String topicName = (String) message
> > > >                                     .getHeader(KafkaConstants.*
> > TOPIC*);
> > > >                             *if *(message.getHeader(
> > > KafkaConstants.*KEY*)
> > > > != *null*)
> > > >                                 messageKey = (String) message
> > > >                                         .getHeader(KafkaConstants.*
> > > KEY*);
> > > >                             Object data = message.getBody();
> > > >
> > > >
> > > >                             System.*out*.println(
> > > > *"topicName :: "                                     *+ topicName +
> > > > *" partitionId :: "                                     *+
> partitionId
> > +
> > > > *" messageKey :: "                                     *+ messageKey
> +
> > > > *" message :: "                                     *+ data +
> > > *"**\n**"*);
> > > >                         }
> > > >                     }
> > > >                 }).to(
> > > > *"file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8"*);
> > > >     }
> > > > });
> > > >
> > > >
> > > >
> > > > I have also tried with the basic parameters as below and it still
> > > > fails
> > > to
> > > > read messages.
> > > >
> > > > from(
> > > > *"kafka:localhost:9092?topic=test1&groupId=testingGroupNew&
> > > autoOffsetReset=earliest")*
> > > >
> > > > Any help on this will be greatly appreciated.
> > > >
> > > > Thanks in advance
> > > >
> > > >
> > > >
> > > > Thanks & Regards
> > > >
> > > > Swati
> > > >
> > > > ------------------------------
> > > > This e-mail and any attachments to it (the "Communication") is,
> > > > unless otherwise stated, confidential, may contain copyright
> > > > material and is for the use only of the intended recipient. If you
> > > > receive the Communication
> > > in
> > > > error, please notify the sender immediately by return e-mail, delete
> > > > the Communication and the return e-mail, and do not read, copy,
> > > > retransmit or otherwise deal with it. Any views expressed in the
> > > > Communication are
> > > those
> > > > of the individual sender only, unless expressly stated to be those
> > > > of Australia and New Zealand Banking Group Limited ABN 11 005 357
> > > > 522, or
> > > any
> > > > of its related entities including ANZ Bank New Zealand Limited
> > > > (together "ANZ"). ANZ does not accept liability in connection with
> > > > the integrity of or errors in the Communication, computer virus,
> > > > data corruption, interference or delay arising from or in respect of
> > the Communication.
> > > >
> > > >
> > >
> >
> >
> > This e-mail and any attachments to it (the "Communication") is, unless
> > otherwise stated, confidential, may contain copyright material and is for
> > the use only of the intended recipient. If you receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete the
> > Communication and the return e-mail, and do not read, copy, retransmit or
> > otherwise deal with it. Any views expressed in the Communication are
> those
> > of the individual sender only, unless expressly stated to be those of
> > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited (together
> > "ANZ"). ANZ does not accept liability in connection with the integrity of
> > or errors in the Communication, computer virus, data corruption,
> > interference or delay arising from or in respect of the Communication.
> >
> >
> > This e-mail and any attachments to it (the "Communication") is, unless
> > otherwise stated, confidential, may contain copyright material and is for
> > the use only of the intended recipient. If you receive the Communication
> in
> > error, please notify the sender immediately by return e-mail, delete the
> > Communication and the return e-mail, and do not read, copy, retransmit or
> > otherwise deal with it. Any views expressed in the Communication are
> those
> > of the individual sender only, unless expressly stated to be those of
> > Australia and New Zealand Banking Group Limited ABN 11 005 357 522, or
> any
> > of its related entities including ANZ Bank New Zealand Limited (together
> > "ANZ"). ANZ does not accept liability in connection with the integrity of
> > or errors in the Communication, computer virus, data corruption,
> > interference or delay arising from or in respect of the Communication.
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message