kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gupta, Swati" <Swati.Gu...@anz.com>
Subject Apache Kafka integration using Apache Camel
Date Fri, 06 Jan 2017 03:48:10 GMT
Hello All,

I am trying to create a Consumer using Apache Camel for a topic in Apache Kafka.
I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
I have attached a file, KafkaCamelTestConsumer.java which is a standalone application trying
to read from a topic  "test1"created in Apache Kafka
I am producing messages from the console and also was successful to produce messages using
a Camel program in the topic "test1", but not able to consume messages. Ideally, it should
get printed, but nothing seems to happen. The log says that the route has started but does
not process any message.

Please help to confirm if there is anything wrong with the below syntax:
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
+
                "&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true").split()
                .body()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange)
                            throws Exception {
                        String messageKey = "";
                        if (exchange.getIn() != null) {
                            Message message = exchange.getIn();
                            Integer partitionId = (Integer) message
                                    .getHeader(KafkaConstants.PARTITION);
                            String topicName = (String) message
                                    .getHeader(KafkaConstants.TOPIC);
                            if (message.getHeader(KafkaConstants.KEY) != null)
                                messageKey = (String) message
                                        .getHeader(KafkaConstants.KEY);
                            Object data = message.getBody();


                            System.out.println("topicName :: "
                                    + topicName + " partitionId :: "
                                    + partitionId + " messageKey :: "
                                    + messageKey + " message :: "
                                    + data + "\n");
                        }
                    }
                }).to("file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8");
    }
});

I have also tried with the basic parameters as below and it still fails to read messages.
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")

Any help on this will be greatly appreciated.

Thanks in advance

Thanks & Regards
Swati


This e-mail and any attachments to it (the "Communication") is, unless otherwise stated, confidential,
may contain copyright material and is for the use only of the intended recipient. If you receive
the Communication in error, please notify the sender immediately by return e-mail, delete
the Communication and the return e-mail, and do not read, copy, retransmit or otherwise deal
with it. Any views expressed in the Communication are those of the individual sender only,
unless expressly stated to be those of Australia and New Zealand Banking Group Limited ABN
11 005 357 522, or any of its related entities including ANZ Bank New Zealand Limited (together
"ANZ"). ANZ does not accept liability in connection with the integrity of or errors in the
Communication, computer virus, data corruption, interference or delay arising from or in respect
of the Communication.

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message