kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danuka Aluthge <danu...@hsenidmobile.com>
Subject Kafka consumer hung with tomcat
Date Fri, 08 Jun 2012 08:21:03 GMT
hi,

I'm using kafka-0.6 library with spring for a web application which is
intended to read and write from a queue. The java class is as below;


=============================================================================
package xxx.xxxx.handlers;

import xxx.xxxx.kafka.builders.Builder;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.message.Message;
import org.slf4j.Logger;

import java.io.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 *
 */
public class DefaultQueueHandler implements QueueHandler {

    private Producer<String, Serializable> producer;
    private ConsumerConnector consumer;
    private ConsumerIterator it;
    private Properties consumerProperties;
    private Properties producerProperties;
    private String messageKey;
    private String topic = "topic";
    private static final Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);

    public void init() {
        LOGGER.debug("INIT" + this);
        producer = Builder.buildProducer(producerProperties);
        consumer = Builder.buildConsumerConnector(consumerProperties);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaMessageStream>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        KafkaMessageStream stream = consumerMap.get(topic).get(0);
        it = stream.iterator();
    }

    public void enqueue(Map<String, Serializable> message) {
        producer.send(new ProducerData<String, Serializable>(topic,
message.get(messageKey)));
    }

    public Map<String, Serializable> dequeue() throws
ClassNotFoundException, IOException {
        if (it.hasNext()) {
            return getMapFromMessage(it.next());
        }
        return null;
    }

    private Map<String, Serializable> getMapFromMessage(Message message)
throws IOException, ClassNotFoundException {
        ByteBuffer buffer = message.payload();
        byte[] dataBytes = new byte[buffer.remaining()];
        buffer.get(dataBytes);
        ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
        ObjectInput oi = new ObjectInputStream(bais);
        return (Map<String, Serializable>) oi.readObject();
    }

    public void setConsumerProperties(Properties consumerProperties) {
        this.consumerProperties = consumerProperties;
    }

    public void setProducerProperties(Properties producerProperties) {
        this.producerProperties = producerProperties;
    }

    public void setMessageKey(String messageKey) {
        this.messageKey = messageKey;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

=============================================================================

The init() method is called on system start up (soon after the war file is
deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
method and no errors are thrown. However the code runs fine in a
stand-alone application (without tomcat).

What should be the issue? Please help.


thanks,
Shyarmal.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message