kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marc Labbe <mrla...@gmail.com>
Subject Re: Purgatory
Date Fri, 01 Nov 2013 19:33:35 GMT
Guozhang,

I have to agree with Priya the doc isn't very clear. Although the
configuration is documented, it is simply rewording the name of the config,
which isn't particularly useful if you want more information about what the
purgatory is. I searched the whole wiki and doc and could not find anything
very useful as opposed looking a the code. In this case,
kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
friends.

I'll try to add to Joe's answer here, mostly just reporting what's
available in the Scala doc from the project. I am doing this to understand
the mechanics myself btw.

As Joe said, messages are not dropped by the purgatory but simply removed
from the purgatory when they are satisfied. Satisfaction conditions are
different for both fetch and produce requests and this is implemented in
their respective DelayedRequest implementation (DelayedFetch and
DelayedProduce).

Requests purgatories are defined as follow in the code:
 - ProducerRequestPurgatory: A holding pen for produce requests waiting to
be satisfied.
 - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
satisfied

Each request purgatory runs a thread (ExpiredRequestReaper). This thread
will first try to find an expired delayed request. When one if found, it
will run the purgatory's expire method to handle the delayed request
expiration. In both produce and fetch cases, it sends a response to the
client. An expired request will be a satisfied request. The next step of
the thread's loop is when it checks for the configuration parameters you
asked for initially (purgatory.purge.interval.requests). When the number of
delayed requests given to watch by the purgatory reaches this value, it
goes through all previously queued requests and removes those which are
marked as satisfied. Because of that, it is really an interval more than it
is a threshold since it doesn't really care about the amount of satisfied
requests or the size of the queue.

Producer request
- When is it added to purgatory (delayed)?:
  * when it uses ack=-1 (actually, the code tells me anything but 0 or 1);
Producer config: request.required.acks
  * partitions have more than one replica (in this case, ack=-1 isn't
different to ack=1 and it doesn't make much sense to use a delayed request)
  * not all partitions are in error
- When does it expire? when it reaches the timeout defined in the produce
request (ackTimeoutMs). Translates from producer config request.timeout.ms.
- What happens (on the broker) when it expires? Sends a response to the
client. Response content depends on the request of course.
- When is it satisfied? I didn't find the courage to dig into the details
of this one :(  ... but mainly when all the follower have also acknowledge
the produce request for their replica

Fetch request
- When is it added to purgatory (delayed)? 2 parameters of the requests are
mainly useful here: max wait time and fetch size
  * if max wait is greater than 0; otherwise, it is a blocking call by the
consumer
  * if fetch size is greater than the current size of data available to
fulfil the request
- When does it expire?
  * wait time: the amount of time the consumer is willing to wait for data;
Consumer config: fetch.wait.max.ms
- When is it satisfied? the fetch size requested is reached - ie. the
amount of data the consumer wishes to receive in one response (from
consumer config: fetch.message.max.bytes)

******

It would be useful to add some information about the metrics associated
with this.

Of course, I am all for being corrected if I said anything wrong here. The
truth is always the code :-)

marc
- mrtheb -


On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
<priya.matpadi@ecofactor.com>wrote:

> Guozhang,
> The documentation is not very clear.
> Marc's response for producer purgatory makes sense.
> I am not entirely clear on fetch purgatory.
> How does broker use purgatory? Is it a temporary holding area? What happens
> to the messages if purge interval is exceeded in case of either/both
> producer and consumer? Are messages dropped in this case?
> Thanks,
> Priya
>
>
> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hello Priya,
> >
> > You can find the definitions of these two configs here:
> >
> > http://kafka.apache.org/documentation.html#brokerconfigs
> >
> > Guozhang
> >
> >
> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mrlabbe@gmail.com> wrote:
> >
> > > Hi Priya
> > >
> > > my understanding is producer requests will be delayed (and put in
> request
> > > purgatory) only if your producer uses ack=-1. It will be in the
> purgatory
> > > (delayed) until all brokers have acknowledged the messages to be
> > > replicated. The documentation suggests to monitor the
> > > ProducerRequestPurgatory size metrics , but it only applies if you're
> > using
> > > ack=-1, otherwise, this value will always be 0.
> > >
> > > For consumer requests, they'll be in purgatory (delayed) until the max
> > > allowed time to respond has been reached, unless it has enough messages
> > to
> > > fill the buffer before that. The request will not end up in the
> purgatory
> > > if you're making a blocking request (max wait <= 0).
> > >
> > > Not sure about the configuration interval though.
> > >
> > > marc
> > >
> > >
> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> > > priya.matpadi@ecofactor.com
> > > > wrote:
> > >
> > > > Hello,
> > > > What is purgatory? I believe the following two properties relate to
> > > > consumer and producer respectively.
> > > > Could someone please explain the significance of these?
> > > > fetch.purgatory.purge.interval.requests=100
> > > > producer.purgatory.purge.interval.requests=100
> > > >
> > > > Thanks,
> > > > Priya
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message