kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajiv Kurian <ra...@signalfx.com>
Subject Re: Kafka protocol fetch request max wait.
Date Fri, 05 Feb 2016 04:40:14 GMT
Hey Jason,

Yes I checked for error codes. There were none. The message was perfectly
legal as parsed by my hand written parser. I also verified the size of the
response which was exactly the size of a response with an empty message set
per partition.

The topic has 128 partitions and has a retention of 10 minutes and a
replication factor of 3. The 128 partitions are divided amongst 3 brokers
but I managed to replicate the problem of premature responses even running
my own code in a debugger connected to a locally running kafka instance.

I haven't made any changes to the topic configuration while running these
tests. All the changes I have made are to the settings of my fetch request
i.e. min_bytes_per_fetch, max_wait_ms and max_bytes_per_partition. I
haven't exactly noted all the changes I made but I think I can try to get
my original configuration and see if that reproduces the problem both for
the consumer I wrote myself and the stock 0.9 consumer.

I definitely saw empty responses being returned really quickly when running
my own client locally (under a debugger) and so it's just a theory that
that might have been the problem being all those EOFExceptions.

Rajiv

On Thu, Feb 4, 2016 at 6:42 PM, Jason Gustafson <jason@confluent.io> wrote:

> Hey Rajiv,
>
> Just to be clear, when you received the empty fetch response, did you check
> the error codes? It would help to also include some more information (such
> as broker and topic settings). If you can come up with a way to reproduce
> it, that will help immensely.
>
> Also, would you mind updating KAFKA-3159 with your findings about the high
> CPU issue? If the problem went away after a configuration change, does it
> come back when those changes are reverted?
>
> Thanks,
> Jason
>
> On Thu, Feb 4, 2016 at 5:27 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:
>
> > Indeed this seems to be the case. I am now running the client mentioned
> in
> > https://issues.apache.org/jira/browse/KAFKA-3159  and it is no longer
> > taking up high CPU. The high number of EOF exceptions are also gone. It
> is
> > performing very well now. I can't understand if the improvement is
> because
> > of my config  changes (min_bytes, max_bytes_per_partition, max wait time)
> > etc or because of a bug in the 0.9 broker. I have definitely under a
> > debugger seen a problem where I was getting back empty messages from the
> > broker running locally. It might be worth creating a bug for this.
> >
> > Thanks,
> > Rajiv
> >
> > On Thu, Feb 4, 2016 at 4:56 PM, Rajiv Kurian <rajiv@signalfx.com> wrote:
> >
> > > And just like that it stopped happening even though I didn't change any
> > of
> > > my code. I had filed https://issues.apache.org/jira/browse/KAFKA-3159
> > > where the stock 0.9 kafka consumer was using very high CPU and seeing a
> > lot
> > > of EOFExceptions on the same topic and partition. I wonder if it was
> > > hitting the same problem (lots of empty messages) even though we asked
> > the
> > > broker to park the request till enough bytes came through.
> > >
> > > On Thu, Feb 4, 2016 at 3:21 PM, Rajiv Kurian <rajiv@signalfx.com>
> wrote:
> > >
> > >> I am writing a Kafka consumer client using the document at
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > >>
> > >> One place where I am having problems is the fetch request itself. I am
> > >> able to send fetch requests and can get fetch responses that I can
> parse
> > >> properly, but it seems like the broker is not respecting my max wait
> > time
> > >> and min fetch bytes parameters.
> > >>
> > >> To test this part I am sending in a fetch request for 128 partitions
> of
> > a
> > >> single topic  that hasn't seen any messages for a while and is
> currently
> > >> empty. All 128 partitions are on the same broker (running 0.9). I
> would
> > >> expect the broker to NOT send me any replies till my max_wait_time_ms
> > >> elapses but it is sending me a reply immediately. This reply is empty
> > (as
> > >> expected) since the partitions have no data and I can parse the data
> > just
> > >> fine but I don't understand why the broker is sending me a reply
> > >> immediately instead of waiting long enough.
> > >>
> > >> Here is how I make a request:
> > >>
> > >> private ByteBuffer createFetchRequestBuffer(int numPartitions) {
> > >>     // This does the math to get the size required.
> > >>     final int sizeRequired =
> > >> numBytesRequiredForFetchRequest(numPartitions);
> > >>     final ByteBuffer buffer = ByteBuffer.allocateDirect(sizeRequired);
> > >>     // Size field
> > >>     int sizeField = sizeRequired - 4;
> > >>     buffer.putInt(sizeField);
> > >>     // API key.
> > >>     buffer.putShort(FECTH_REQUEST_API_KEY);  // 1.
> > >>     // API version.
> > >>     buffer.putShort((short) 0);
> > >>     // Correlation id.
> > >>     buffer.putInt(-3);  // Just a random correlation id.
> > >>     // Client id.
> > >>     buffer.putShort(numClientStringBytes); // The length of the client
> > >> string as a short.
> > >>     buffer.put(clientStringBytes); // The client string bytes.
> > >>     // Replica id.
> > >>     buffer.putInt(-1);  // As per the recommendation.
> > >>     // Max wait time in ms.
> > >>     buffer.putInt(30 * 1000); // Should be 30 seconds.
> > >>     // Min bytes field size.
> > >>     buffer.putInt(1000000);  // A big number.
> > >>     // Num topics.
> > >>     buffer.putInt(1); // A single topic.
> > >>     // Topic string.
> > >>     buffer.putShort(numTopicBytes); // The length of the topic string
> as
> > >> a short.
> > >>     buffer.put(topicBytes); // The topic string bytes.
> > >>     // Num partitions field.
> > >>     buffer.putInt(numPartitions); // 128 like I said.
> > >>     for (int i = 0; i < numPartitions; i++) {
> > >>       final int partitionId = i;
> > >>       // partition number.
> > >>       buffer.putInt(partitionId);
> > >>       // offset.
> > >>       buffer.putLong(partitionToOffset[partitionId]); // I have an
> array
> > >> of longs to get this from.
> > >>       // maxBytesPerPartition.
> > >>       buffer.putInt(maxBytesPerPartition);
> > >>     }
> > >>
> > >>     buffer.flip();
> > >>
> > >>     return buffer;
> > >> }
> > >>
> > >> I get a response pretty much immediately when I write this request to
> > the
> > >> broker. The response parses just fine but has no actual non zero size
> > >> message sets.
> > >>
> > >> Thanks in advance.
> > >> Rajiv
> > >>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message