kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danuka Aluthge <danu...@hsenidmobile.com>
Subject Re: Kafka consumer hung with tomcat
Date Fri, 08 Jun 2012 08:23:25 GMT
hi,

I'm using tomcat-6.0.35 and java 1.7.0_04.

thanks.

On Fri, Jun 8, 2012 at 1:51 PM, Danuka Aluthge <danukas@hsenidmobile.com>wrote:

> hi,
>
> I'm using kafka-0.6 library with spring for a web application which is
> intended to read and write from a queue. The java class is as below;
>
>
>
> =============================================================================
> package xxx.xxxx.handlers;
>
> import xxx.xxxx.kafka.builders.Builder;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.javaapi.producer.ProducerData;
> import kafka.message.Message;
> import org.slf4j.Logger;
>
> import java.io.*;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
>  *
>  */
> public class DefaultQueueHandler implements QueueHandler {
>
>     private Producer<String, Serializable> producer;
>     private ConsumerConnector consumer;
>     private ConsumerIterator it;
>     private Properties consumerProperties;
>     private Properties producerProperties;
>     private String messageKey;
>     private String topic = "topic";
>     private static final Logger LOGGER =
> org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);
>
>     public void init() {
>         LOGGER.debug("INIT" + this);
>         producer = Builder.buildProducer(producerProperties);
>         consumer = Builder.buildConsumerConnector(consumerProperties);
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaMessageStream>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>         KafkaMessageStream stream = consumerMap.get(topic).get(0);
>         it = stream.iterator();
>     }
>
>     public void enqueue(Map<String, Serializable> message) {
>         producer.send(new ProducerData<String, Serializable>(topic,
> message.get(messageKey)));
>     }
>
>     public Map<String, Serializable> dequeue() throws
> ClassNotFoundException, IOException {
>         if (it.hasNext()) {
>             return getMapFromMessage(it.next());
>         }
>         return null;
>     }
>
>     private Map<String, Serializable> getMapFromMessage(Message message)
> throws IOException, ClassNotFoundException {
>         ByteBuffer buffer = message.payload();
>         byte[] dataBytes = new byte[buffer.remaining()];
>         buffer.get(dataBytes);
>         ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
>         ObjectInput oi = new ObjectInputStream(bais);
>         return (Map<String, Serializable>) oi.readObject();
>     }
>
>     public void setConsumerProperties(Properties consumerProperties) {
>         this.consumerProperties = consumerProperties;
>     }
>
>     public void setProducerProperties(Properties producerProperties) {
>         this.producerProperties = producerProperties;
>     }
>
>     public void setMessageKey(String messageKey) {
>         this.messageKey = messageKey;
>     }
>
>     public void setTopic(String topic) {
>         this.topic = topic;
>     }
> }
>
>
> =============================================================================
>
> The init() method is called on system start up (soon after the war file is
> deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
> method and no errors are thrown. However the code runs fine in a
> stand-alone application (without tomcat).
>
> What should be the issue? Please help.
>
>
> thanks,
> Shyarmal.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message