kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajiv Kurian <ra...@signalfx.com>
Subject Re: Kafka protocol fetch request max wait.
Date Fri, 05 Feb 2016 01:27:15 GMT
Indeed this seems to be the case. I am now running the client mentioned in
https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
taking up high CPU. The high number of EOF exceptions are also gone. It is
performing very well now. I can't understand if the improvement is because
of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
etc or because of a bug in the 0.9 broker. I have definitely under a
debugger seen a problem where I was getting back empty messages from the
broker running locally. It might be worth creating a bug for this.

Thanks,
Rajiv

On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:

> And just like that it stopped happening even though I didn't change any of
> my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> where the stock 0.9 kafka consumer was using very high CPU and seeing a lot
> of EOFExceptions on the same topic and partition. I wonder if it was
> hitting the same problem (lots of empty messages) even though we asked the
> broker to park the request till enough bytes came through.
>
> On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:
>
>> I am writing a Kafka consumer client using the document at
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>
>> One place where I am having problems is the fetch request itself. I am
>> able to send fetch requests and can get fetch responses that I can parse
>> properly, but it seems like the broker is not respecting my max wait time
>> and min fetch bytes parameters.
>>
>> To test this part I am sending in a fetch request for 128 partitions of a
>> single topic  that hasn't seen any messages for a while and is currently
>> empty. All 128 partitions are on the same broker (running 0.9). I would
>> expect the broker to NOT send me any replies till my max_wait_time_ms
>> elapses but it is sending me a reply immediately. This reply is empty (as
>> expected) since the partitions have no data and I can parse the data just
>> fine but I don't understand why the broker is sending me a reply
>> immediately instead of waiting long enough.
>>
>> Here is how I make a request:
>>
>> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>>     // This does the math to get the size required.
>>     final int sizeRequired =
>> numBytesRequiredForFetchRequest(numPartitions);
>>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
>>     // Size field
>>     int sizeField = sizeRequired - 4;
>>     buffer.putInt(sizeField);
>>     // API key.
>>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>>     // API version.
>>     buffer.putShort((short) 0);
>>     // Correlation id.
>>     buffer.putInt(-3);  // Just a random correlation id.
>>     // Client id.
>>     buffer.putShort(numClientStringBytes); // The length of the client
>> string as a short.
>>     buffer.put(clientStringBytes); // The client string bytes.
>>     // Replica id.
>>     buffer.putInt(-1);  // As per the recommendation.
>>     // Max wait time in ms.
>>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>>     // Min bytes field size.
>>     buffer.putInt(1000000);  // A big number.
>>     // Num topics.
>>     buffer.putInt(1); // A single topic.
>>     // Topic string.
>>     buffer.putShort(numTopicBytes); // The length of the topic string as
>> a short.
>>     buffer.put(topicBytes); // The topic string bytes.
>>     // Num partitions field.
>>     buffer.putInt(numPartitions); // 128 like I said.
>>     for (int i = 0; i < numPartitions; i++) {
>>       final int partitionId = i;
>>       // partition number.
>>       buffer.putInt(partitionId);
>>       // offset.
>>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
>> of longs to get this from.
>>       // maxBytesPerPartition.
>>       buffer.putInt(maxBytesPerPartition);
>>     }
>>
>>     buffer.flip();
>>
>>     return buffer;
>> }
>>
>> I get a response pretty much immediately when I write this request to the
>> broker. The response parses just fine but has no actual non zero size
>> message sets.
>>
>> Thanks in advance.
>> Rajiv
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message