kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Kafka consumer hung with tomcat
Date Fri, 08 Jun 2012 14:22:48 GMT
Shyarmal,

Not sure why the consumer got stuck in tomcat. I would add try/catch clause
in dequeue() and log all throwables. BTW, the first release of Kafka in
Apache is 0.7 and 0.6 is not long officially  supported. Could you upgrade?

Thanks,

Jun

On Fri, Jun 8, 2012 at 1:21 AM, Danuka Aluthge <danukas@hsenidmobile.com>wrote:

> hi,
>
> I'm using kafka-0.6 library with spring for a web application which is
> intended to read and write from a queue. The java class is as below;
>
>
>
> =============================================================================
> package xxx.xxxx.handlers;
>
> import xxx.xxxx.kafka.builders.Builder;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaMessageStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.javaapi.producer.ProducerData;
> import kafka.message.Message;
> import org.slf4j.Logger;
>
> import java.io.*;
> import java.nio.ByteBuffer;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
>  *
>  */
> public class DefaultQueueHandler implements QueueHandler {
>
>    private Producer<String, Serializable> producer;
>    private ConsumerConnector consumer;
>    private ConsumerIterator it;
>    private Properties consumerProperties;
>    private Properties producerProperties;
>    private String messageKey;
>    private String topic = "topic";
>    private static final Logger LOGGER =
> org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class);
>
>    public void init() {
>        LOGGER.debug("INIT" + this);
>        producer = Builder.buildProducer(producerProperties);
>        consumer = Builder.buildConsumerConnector(consumerProperties);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topic, new Integer(1));
>        Map<String, List<KafkaMessageStream>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream stream = consumerMap.get(topic).get(0);
>        it = stream.iterator();
>    }
>
>    public void enqueue(Map<String, Serializable> message) {
>        producer.send(new ProducerData<String, Serializable>(topic,
> message.get(messageKey)));
>    }
>
>    public Map<String, Serializable> dequeue() throws
> ClassNotFoundException, IOException {
>        if (it.hasNext()) {
>            return getMapFromMessage(it.next());
>        }
>        return null;
>    }
>
>    private Map<String, Serializable> getMapFromMessage(Message message)
> throws IOException, ClassNotFoundException {
>        ByteBuffer buffer = message.payload();
>        byte[] dataBytes = new byte[buffer.remaining()];
>        buffer.get(dataBytes);
>        ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes);
>        ObjectInput oi = new ObjectInputStream(bais);
>        return (Map<String, Serializable>) oi.readObject();
>    }
>
>    public void setConsumerProperties(Properties consumerProperties) {
>        this.consumerProperties = consumerProperties;
>    }
>
>    public void setProducerProperties(Properties producerProperties) {
>        this.producerProperties = producerProperties;
>    }
>
>    public void setMessageKey(String messageKey) {
>        this.messageKey = messageKey;
>    }
>
>    public void setTopic(String topic) {
>        this.topic = topic;
>    }
> }
>
>
> =============================================================================
>
> The init() method is called on system start up (soon after the war file is
> deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue()
> method and no errors are thrown. However the code runs fine in a
> stand-alone application (without tomcat).
>
> What should be the issue? Please help.
>
>
> thanks,
> Shyarmal.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message