kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Baris Akgun (Garanti Teknoloji)" <BarisA...@garanti.com.tr>
Subject Kafka Consumer Problem
Date Wed, 06 Apr 2016 11:30:07 GMT
Hi,

I am trying to consume message from kafka with using web service. For exapmle; when the user
call that web service, the all messages from partitions will be read by consumers and web
service will returns the kafka messages to user who called it.

I used below code for that purpose but ConsumerIterator's next or hasNext() methods never
returns false expect any error occurs, so The web service method will never be ended.

I also try to set props.put("consumer.timeout.ms", "1000"); for taking exception but at that
time the threads were blocked in

/**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

That method.

Is there any body who achived the consume messages with invoking any function like web service
or etc?

Thanks.

Code ;

package org.gt.kafka.lastvers;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class gtKafkaConsumerTask implements Callable {
      private KafkaStream stream;
      private int threadNumber;


      public gtKafkaConsumerTask(KafkaStream stream, int threadNumber) {
            this.threadNumber = threadNumber;
            this.stream = stream;
      }

    @Override
      public Object call() throws Exception {
            String currMessage="";
            String retMessages="";
            int sizes;

            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            currMessage=new String(it.next().message(),"UTF-8");
            retMessages+=currMessage+"\n";

            while (currMessage !=null)
                  try {
                        currMessage=new String(it.next().message(),"UTF-8");
                        /*System.out.println("Thread " + threadNumber + ": "
                                          + currMessage);*/
                        retMessages+=currMessage+"\n";
                  } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                  }
                System.out.println("Shutting down Thread: " + threadNumber);

                return retMessages.substring(0,retMessages.length()-1);
      }

}



Barış Akgün
Analitik Veri Ambarı ve Büyük Veri Yönetimi
Uzman

Tel

:

Dahili

:

Faks

:



Bu mesaj ve ekleri, mesajda gonderildigi belirtilen kisi/kisilere ozeldir ve gizlidir. Bu
mesajin muhatabi olmamaniza ragmen tarafiniza ulasmis olmasi halinde mesaj iceriginin gizliligi
ve bu gizlilik yukumlulugune uyulmasi zorunlulugu tarafiniz icin de soz konusudur. Mesaj ve
eklerinde yer alan bilgilerin dogrulugu ve guncelligi konusunda gonderenin ya da sirketimizin
herhangi bir sorumlulugu bulunmamaktadir. Sirketimiz mesajin ve bilgilerinin size degisiklige
ugrayarak veya gec ulasmasindan, butunlugunun ve gizliliginin korunamamasindan, virus icermesinden
ve bilgisayar sisteminize verebilecegi herhangi bir zarardan sorumlu tutulamaz.

This message and attachments are confidential and intended solely for the individual(s) stated
in this message. If you received this message although you are not the addressee, you are
responsible to keep the message confidential. The sender has no responsibility for the accuracy
or correctness of the information in the message and its attachments. Our company shall have
no liability for any changes or late receiving, loss of integrity and confidentiality, viruses
and any damages caused in anyway to your computer system.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message