kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajiv Kurian <ra...@signalfx.com>
Subject Re: Kafka protocol fetch request max wait.
Date Fri, 05 Feb 2016 00:56:16 GMT
And just like that it stopped happening even though I didn't change any of
my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159 where
the stock 0.9 kafka consumer was using very high CPU and seeing a lot of
EOFExceptions on the same topic and partition. I wonder if it was hitting
the same problem (lots of empty messages) even though we asked the broker
to park the request till enough bytes came through.

On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:

> I am writing a Kafka consumer client using the document at
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>
> One place where I am having problems is the fetch request itself. I am
> able to send fetch requests and can get fetch responses that I can parse
> properly, but it seems like the broker is not respecting my max wait time
> and min fetch bytes parameters.
>
> To test this part I am sending in a fetch request for 128 partitions of a
> single topic  that hasn't seen any messages for a while and is currently
> empty. All 128 partitions are on the same broker (running 0.9). I would
> expect the broker to NOT send me any replies till my max_wait_time_ms
> elapses but it is sending me a reply immediately. This reply is empty (as
> expected) since the partitions have no data and I can parse the data just
> fine but I don't understand why the broker is sending me a reply
> immediately instead of waiting long enough.
>
> Here is how I make a request:
>
> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
>     // This does the math to get the size required.
>     final int sizeRequired =
> numBytesRequiredForFetchRequest(numPartitions);
>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
>     // Size field
>     int sizeField = sizeRequired - 4;
>     buffer.putInt(sizeField);
>     // API key.
>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
>     // API version.
>     buffer.putShort((short) 0);
>     // Correlation id.
>     buffer.putInt(-3);  // Just a random correlation id.
>     // Client id.
>     buffer.putShort(numClientStringBytes); // The length of the client
> string as a short.
>     buffer.put(clientStringBytes); // The client string bytes.
>     // Replica id.
>     buffer.putInt(-1);  // As per the recommendation.
>     // Max wait time in ms.
>     buffer.putInt(30 * 1000); // Should be 30 seconds.
>     // Min bytes field size.
>     buffer.putInt(1000000);  // A big number.
>     // Num topics.
>     buffer.putInt(1); // A single topic.
>     // Topic string.
>     buffer.putShort(numTopicBytes); // The length of the topic string as a
> short.
>     buffer.put(topicBytes); // The topic string bytes.
>     // Num partitions field.
>     buffer.putInt(numPartitions); // 128 like I said.
>     for (int i = 0; i < numPartitions; i++) {
>       final int partitionId = i;
>       // partition number.
>       buffer.putInt(partitionId);
>       // offset.
>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
> of longs to get this from.
>       // maxBytesPerPartition.
>       buffer.putInt(maxBytesPerPartition);
>     }
>
>     buffer.flip();
>
>     return buffer;
> }
>
> I get a response pretty much immediately when I write this request to the
> broker. The response parses just fine but has no actual non zero size
> message sets.
>
> Thanks in advance.
> Rajiv
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message