kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Wang <chen.apache.s...@gmail.com>
Subject Re: Kafka SimpleConsumer not working
Date Thu, 20 Feb 2014 22:58:39 GMT
Never mind. It was actually working. I just need to wait a bit longer for
data to come into the partition i was testing for.
Chen


On Thu, Feb 20, 2014 at 2:41 PM, Chen Wang <chen.apache.solr@gmail.com>wrote:

> i am using 0.8.0. The high level api works as expected.
>
> <dependency>
>
>  <groupId>org.apache.kafka</groupId>
>
>  <artifactId>kafka_2.10</artifactId>
>
>  <version>0.8.0</version>
>
>
> On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <chen.apache.solr@gmail.com>wrote:
>
>> Hi,
>> I am using kafka for the first time, and was running the sample from
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>>
>> However, I cannot read any data from kafka. The kafka has 10
>> partitions,and I tried to read from any of them. The fetch can succeed,
>> however, the message size returned is always 0( System.out
>>         .println("the message size is" + messageSet
>>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>>             .size());). Is there something apparent missing for my case?
>>
>>
>> while (a_maxReads > 0) {
>>     if (consumer == null) {
>>         consumer = new SimpleConsumer(leadBroker, a_port, 100000,
>>             10240 * 1024, clientName);
>>     }
>>
>>     System.out.println("start fetching");
>>     System.out.println("the readoffset is" + readOffset);
>>
>>     FetchRequest req = new FetchRequestBuilder().clientId(clientName)
>>         .addFetch(a_topic, a_partition, readOffset, 1000000)
>>         .build();
>>     FetchResponse fetchResponse = consumer.fetch(req);
>>     System.out.println("finish fetching");
>>     if (fetchResponse.hasError()) {
>>         numErrors++;
>>         // Something went wrong!
>>         short code = fetchResponse.errorCode(a_topic, a_partition);
>>         System.out.println("Error fetching data from the Broker:" +
>> leadBroker + " Reason: " + code);
>>         if (numErrors > 5)
>>             break;
>>         if (code == ErrorMapping.OffsetOutOfRangeCode()) {
>>             // We asked for an invalid offset. For simple case ask for
>>             // the last element to reset
>>             readOffset = getLastOffset(consumer, a_topic, a_partition,
>>                 kafka.api.OffsetRequest.LatestTime(), clientName);
>>             continue;
>>         }
>>         consumer.close();
>>         consumer = null;
>>         leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
>>             a_port);
>>         continue;
>>     }
>>     numErrors = 0;
>>
>>     long numRead = 0;
>>     System.out.println("The topic is:" + a_topic + " partition is : " +
>> a_partition);
>>     ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic,
>>         a_partition);
>>     System.out
>>         .println("the message size is" + messageSet
>>             .kafka$javaapi$message$ByteBufferMessageSet$$underlying()
>>             .size());
>>     for (MessageAndOffset messageAndOffset: messageSet) {
>>         long currentOffset = messageAndOffset.offset();
>>         if (currentOffset < readOffset) {
>>             System.out.println("Found an old offset: " + currentOffset +
>> " Expecting: " + readOffset);
>>             continue;
>>         }
>>         readOffset = messageAndOffset.nextOffset();
>>         ByteBuffer payload = messageAndOffset.message().payload();
>>
>>         byte[] bytes = new byte[payload.limit()];
>>         payload.get(bytes);
>>         System.out.println(String.valueOf(messageAndOffset.offset()) + ":
>> " + new String(bytes, "UTF-8"));
>>         numRead++;
>>         a_maxReads--;
>>     }
>>
>>     if (numRead == 0) {
>>         try {
>>             Thread.sleep(1000);
>>         } catch (InterruptedException ie) {}
>>
>> Thanks much!
>>  Chen
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message