kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Li Tao <ahumbleco...@gmail.com>
Subject Re: invoking kafka consumer as soon as message arrives in topic
Date Wed, 04 Nov 2015 14:47:34 GMT
Hi, though i don't fully understand your question, i'd like to comment on
your code design.
1. it's better for you to run you consumer in a separate thread instead of
main thread, because it blocks the execution main thread.

2. you created a thread for each message, this is very costly. if it takes
less time to process the message, you can process it in the consumer
thread, otherwise you'd better to process it in a thread pool.


On Mon, Oct 26, 2015 at 4:59 PM, Kudumula, Surender <
surender.kudumula@hpe.com> wrote:

> Hi all
>
> Iam trying to write a web application which is invoked when the message
> arrives in topic. The code is waiting for a message in kafka consumer in
> while loop and sometimes it picks up the message and sometimes its waiting
> forever even when the message is produced in the topic. I am invoking the
> kafka consumer in servlet init method as a result its waiting in the while
> and not fully deploying the web app. I would appreciate any suggestions as
> iam looking to invoke my consumer as soon any message arrives in topic.
>
> Map<String, Integer> topicMap = *new* HashMap<String, Integer>();
>
>                             // 1 represents the single thread
>
>                             topicMap.put(topic, *new* Integer(1));
>
>                             Map<String, List<KafkaStream<*byte*[], *byte*[]>>>
> consumerStreamsMap = consumerConnector
>
>
> .createMessageStreams(topicMap);
>
>                             // Get the list of message streams for each
> topic, using the default
>
>                             // decoder.
>
>                             KafkaStream<*byte*[], *byte*[]> stream =
> consumerStreamsMap.get(topic).get(0);
>
>                             ConsumerIterator<*byte*[], *byte*[]> it =
> stream.iterator();
>
>                             *try* {
>
>                                          // for (final
> KafkaStream<byte[], byte[]> stream : streamList) {
>
>                                          // ConsumerIterator<byte[],
> byte[]> consumerIte = stream.iterator();
>
>                                          // reads from *Kafka* until you
> stop it.
>
>                                          *while* (it.hasNext()) {
>
>                                                        //
> System.out.println("Message from Single Topic :: " +
>
>                                                        // new
>
>                                                        *byte*[] msg =
> ((MessageAndMetadata<*byte*[], *byte*[]>) it.next()).message();
>
>                                                        // byte[] rawMsg =
> msg.message();
>
>                                                        // byte[]
> ObjInBytes = it.next().message();
>
>                                                        *final*
> RequestMessage msgAsObject = convertFromBytes(msg);
>
>                                                        *if* (msgAsObject
> != *null*) {
>
>
> Thread thread = *new* Thread() {
>
>
> *public* *void* run() {
>
>
> *logger*.info("Starting new Thread for request Object" + msgAsObject
> .getRequestId());
>
>
> StateMachine sm = *new* StateMachine(msgAsObject);
>
>
> sm.processPdfaRequest();
>
>
> System.*out*.println("Request Fully completed");
>
>
> *logger*.info("Request Fully completed");
>
>
> Thread.*currentThread*().*stop**()*;
>
>
> sm = *null*;
>
>
> }
>
>                                                                      };
>
>
> thread.run();
>
>                                                        }
>
>                                          }
>
>                             } *catch* (Exception e) {
>
>                                          e.printStackTrace();
>
>                             }
>
>
>
> Regards
>
>
>
>
> *Surender Kudumula *Big Data Consultant - EMEA
> Analytics & Data Management
>
>
>
> Surender.kudumula@hpe.com
> M +44 7795970923
>
>
> Hewlett-Packard Enterprise
> Cain Rd,
>
> Bracknell
>
> RG12 1HN
> UK
>
> [image:
> http://graphics8.nytimes.com/images/2015/06/03/technology/03bits-hp/03bits-hp-master315.png]
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message