kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: Kafka protocol fetch request max wait.
Date Fri, 05 Feb 2016 02:42:11 GMT
Hey Rajiv,

Just to be clear, when you received the empty fetch response, did you check
the error codes? It would help to also include some more information (such
as broker and topic settings). If you can come up with a way to reproduce
it, that will help immensely.

Also, would you mind updating KAFKA-3159 with your findings about the high
CPU issue? If the problem went away after a configuration change, does it
come back when those changes are reverted?

Thanks,
Jason

On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:

> Indeed this seems to be the case. I am now running the client mentioned in
> https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
> taking up high CPU. The high number of EOF exceptions are also gone. It is
> performing very well now. I can't understand if the improvement is because
> of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
> etc or because of a bug in the 0.9 broker. I have definitely under a
> debugger seen a problem where I was getting back empty messages from the
> broker running locally. It might be worth creating a bug for this.
>
> Thanks,
> Rajiv
>
> On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:
>
> > And just like that it stopped happening even though I didn't change any
> of
> > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> > where the stock 0.9 kafka consumer was using very high CPU and seeing a
> lot
> > of EOFExceptions on the same topic and partition. I wonder if it was
> > hitting the same problem (lots of empty messages) even though we asked
> the
> > broker to park the request till enough bytes came through.
> >
> > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:
> >
> >> I am writing a Kafka consumer client using the document at
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >>
> >> One place where I am having problems is the fetch request itself. I am
> >> able to send fetch requests and can get fetch responses that I can parse
> >> properly, but it seems like the broker is not respecting my max wait
> time
> >> and min fetch bytes parameters.
> >>
> >> To test this part I am sending in a fetch request for 128 partitions of
> a
> >> single topic  that hasn't seen any messages for a while and is currently
> >> empty. All 128 partitions are on the same broker (running 0.9). I would
> >> expect the broker to NOT send me any replies till my max_wait_time_ms
> >> elapses but it is sending me a reply immediately. This reply is empty
> (as
> >> expected) since the partitions have no data and I can parse the data
> just
> >> fine but I don't understand why the broker is sending me a reply
> >> immediately instead of waiting long enough.
> >>
> >> Here is how I make a request:
> >>
> >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> >>     // This does the math to get the size required.
> >>     final int sizeRequired =
> >> numBytesRequiredForFetchRequest(numPartitions);
> >>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
> >>     // Size field
> >>     int sizeField = sizeRequired - 4;
> >>     buffer.putInt(sizeField);
> >>     // API key.
> >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> >>     // API version.
> >>     buffer.putShort((short) 0);
> >>     // Correlation id.
> >>     buffer.putInt(-3);  // Just a random correlation id.
> >>     // Client id.
> >>     buffer.putShort(numClientStringBytes); // The length of the client
> >> string as a short.
> >>     buffer.put(clientStringBytes); // The client string bytes.
> >>     // Replica id.
> >>     buffer.putInt(-1);  // As per the recommendation.
> >>     // Max wait time in ms.
> >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> >>     // Min bytes field size.
> >>     buffer.putInt(1000000);  // A big number.
> >>     // Num topics.
> >>     buffer.putInt(1); // A single topic.
> >>     // Topic string.
> >>     buffer.putShort(numTopicBytes); // The length of the topic string as
> >> a short.
> >>     buffer.put(topicBytes); // The topic string bytes.
> >>     // Num partitions field.
> >>     buffer.putInt(numPartitions); // 128 like I said.
> >>     for (int i = 0; i < numPartitions; i++) {
> >>       final int partitionId = i;
> >>       // partition number.
> >>       buffer.putInt(partitionId);
> >>       // offset.
> >>       buffer.putLong(partitionToOffset[partitionId]); // I have an array
> >> of longs to get this from.
> >>       // maxBytesPerPartition.
> >>       buffer.putInt(maxBytesPerPartition);
> >>     }
> >>
> >>     buffer.flip();
> >>
> >>     return buffer;
> >> }
> >>
> >> I get a response pretty much immediately when I write this request to
> the
> >> broker. The response parses just fine but has no actual non zero size
> >> message sets.
> >>
> >> Thanks in advance.
> >> Rajiv
> >>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message