kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Costache, Vlad" <vlad.costa...@metrosystems.net>
Subject New question / request on kafka consumer
Date Thu, 15 Dec 2016 09:41:12 GMT

Hello,

We are trying to make a consumer for kafka, (client code alone and camel integrated) and we
ended in a blocking point.
Can you please give us an advice, or any other idea?

Our problem:

-          We create a kafka consumer that connects to a wrong server (wrong ip/port), and
the consumer get stuck in "poll" method even the connection is not created.

-          We tried also with camel, but the same problem (as the same kafka client is called)

-          It seems that there is a bug in kafka java client that the connection ends up in
a loop

-          For producer everything is fine.

Do you have any advice, or can you confirm that there is a bug? Do you plan to fix this? Our
production code will be with camel, so there will be needed an exception to be thrown so we
can make our data error handling.

How can we make the consumer throw exception if the connection to server is not created successfully
or the connection is lost at some point?


Our environment:
Standalone:
                    <dependencies>
              <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka_2.11</artifactId>
              <version>0.10.0.0</version>
              </dependency>

With camel:
              <dependency>
                     <groupId>org.apache.camel</groupId>
                     <artifactId>camel-kafka</artifactId>
                     <version>2.17.0.redhat-630187</version>
              </dependency>


The dummy code example is attached.

Or camel:

           //server do not exist
        from("kafka:10.97.210.222:8093?topic=testTopic_mip133&groupId=testing&autoOffsetReset=earliest&consumersCount=1").process(new
Processor(
                ) {

            @Override
            public void process(Exchange exchange) throws Exception {
                LOG.info(new MsbHeaderImpl(), "Am primit header: " + exchange.getIn().getHeaders());
                LOG.info(new MsbHeaderImpl(), "Am primit body: " + exchange.getIn().getBody());

            }
        });


Kafka server version: kafka_2.10-0.10.1.0


Thanks,
Vlad

Geschaeftsanschrift/Business Address: METRO SYSTEMS GmbH, Metro-Strasse 12, 40235 Duesseldorf,
Germany
Aufsichtsrat/Supervisory Board: Heiko Hutmacher (Vorsitzender/ Chairman)
Geschaeftsfuehrung/Management Board: Dr. Dirk Toepfer (Vorsitzender/CEO), Wim van Herwijnen
Sitz Duesseldorf, Amtsgericht Duesseldorf, HRB 18232/Registered Office Duesseldorf, Commercial
Register of the Duesseldorf Local Court, HRB 18232

---
Betreffend Mails von *@metrosystems.net

Die in dieser E-Mail enthaltenen Nachrichten und Anhaenge sind ausschliesslich fuer den bezeichneten
Adressaten bestimmt.
Sie koennen rechtlich geschuetzte, vertrauliche Informationen enthalten. Falls Sie nicht der
bezeichnete Empfaenger oder zum Empfang dieser E-Mail nicht berechtigt sind, ist die Verwendung,
Vervielfaeltigung oder Weitergabe der Nachrichten und Anhaenge untersagt. Falls Sie diese
E-Mail irrtuemlich erhalten haben, informieren Sie bitte unverzueglich den Absender und vernichten
Sie die E-Mail.

Regarding mails from *@metrosystems.net

This e-mail message and any attachment are intended exclusively for the named addressee. 
They may contain confidential information which may also be protected by professional secrecy.
Unless you are the named addressee (or authorised to receive for the addressee) you may not
copy or use this message or any attachment or disclose the contents to anyone else. If this
e-mail was sent to you by mistake please notify the sender immediately and delete this e-mail.

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message