kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aarti Gupta <aartigup...@gmail.com>
Subject head() on Kafka stream gives NoSuchMethodError
Date Sat, 13 Sep 2014 08:01:29 GMT
The following error is thrown, (when I call KafkaStream.head(), as shown in
the code snippet below)

* WARN -  java.lang.NoSuchMethodError:

My use case, is that I want to block on the receive() method, and when
anything is published on the topic, I 'head' of the queue to the calling
method, that processes it.

I do not use partitioning and have a single stream.

import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

 * @author agupta
public class KafkaConsumerDelegate implements ConsumerDelegate {

    private ConsumerConnector consumerConnector;
    private String topicName;
    private static Logger LOG =
    private final Map<String, Integer> topicCount = Maps.newHashMap();
    private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
    private List<KafkaStream<byte[], byte[]>> kafkaStreams;

    public Task receive(final boolean consumerConfirms) {
        try {
            LOG.info("Kafka consumer delegate listening on topic " +
            kafkaStreams = messageStreams.get(getTopicName());
            final KafkaStream<byte[], byte[]> kafkaStream =
            return Executors.newSingleThreadExecutor().submit(new
Callable<Task>() {
                public Task call() throws Exception {
                    * final MessageAndMetadata<byte[], byte[]>
messageAndMetadata= kafkaStream.head();*

                        final Task message = new Task() {
                            public byte[] getBytes() {
                                return messageAndMetadata.message();

                        return message;

        } catch (Exception e) {
            LOG.warn("Error in consumer " + e.getMessage());
        return null;

    public void initialize(JSONObject configData, boolean publisherAckMode)
throws IOException {
        try {
            this.topicName = configData.getString("topicName");
            LOG.info("Topic name is " + topicName);
        } catch (JSONException e) {
            LOG.error("Error parsing configuration", e);
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "testgroup");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        //only one stream, and one topic, (Since we are not supporting
        topicCount.put(getTopicName(), 1);
        consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
        messageStreams = consumerConnector.createMessageStreams(topicCount);

    public void stop() throws IOException {
        throw new UnsupportedOperationException("Method Not Implemented");

    public String getTopicName() {
        return this.topicName;

Lastly, I am using the following binary


and the following maven dependency


Any suggestions?


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message