kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Kafka SimpleConsumer not working
Date Thu, 20 Feb 2014 22:40:54 GMT
Hi,
I am using kafka for the first time, and was running the sample from
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

However, I cannot read any data from kafka. The kafka has 10 partitions,and
I tried to read from any of them. The fetch can succeed, however, the
message size returned is always 0( System.out
        .println("the message size is" + messageSet
            .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
            .size());). Is there something apparent missing for my case?


while (a_maxReads > 0) {
    if (consumer == null) {
        consumer = new SimpleConsumer(leadBroker, a_port, 100000,
            10240 * 1024, clientName);
    }

    System.out.println("start fetching");
    System.out.println("the readoffset is" + readOffset);

    FetchRequest req = new FetchRequestBuilder().clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 1000000)
        .build();
    FetchResponse fetchResponse = consumer.fetch(req);
    System.out.println("finish fetching");
    if (fetchResponse.hasError()) {
        numErrors++;
        // Something went wrong!
        short code = fetchResponse.errorCode(a_topic, a_partition);
        System.out.println("Error fetching data from the Broker:" +
leadBroker + " Reason: " + code);
        if (numErrors > 5)
            break;
        if (code == ErrorMapping.OffsetOutOfRangeCode()) {
            // We asked for an invalid offset. For simple case ask for
            // the last element to reset
            readOffset = getLastOffset(consumer, a_topic, a_partition,
                kafka.api.OffsetRequest.LatestTime(), clientName);
            continue;
        }
        consumer.close();
        consumer = null;
        leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
            a_port);
        continue;
    }
    numErrors = 0;

    long numRead = 0;
    System.out.println("The topic is:" + a_topic + " partition is : " +
a_partition);
    ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
        a_partition);
    System.out
        .println("the message size is" + messageSet
            .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
            .size());
    for (MessageAndOffset messageAndOffset: messageSet) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            System.out.println("Found an old offset: " + currentOffset + "
Expecting: " + readOffset);
            continue;
        }
        readOffset = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
+ new String(bytes, "UTF-8"));
        numRead++;
        a_maxReads--;
    }

    if (numRead == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ie) {}

Thanks much!
Chen

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message