kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marc Labbe <mrla...@gmail.com>
Subject Re: Purgatory
Date Sat, 02 Nov 2013 01:08:53 GMT
Hi Joel,

I used to have edit to the wiki, I made a few additions to it a while ago
but it's seem I don't have it anymore. It might have been lost in the
confluence update. I would be glad to add what I have written if I get it
back. Otherwise, feel free to paste my words in one of the pages, I don't
intend on asking for copyrights for this :).

marc


On Fri, Nov 1, 2013 at 4:32 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:

> Marc, thanks for writing that up. I think it is worth adding some
> details on the request-purgatory on a wiki (Jay had started a wiki
> page for kafka internals [1] a while ago, but we have not had time to
> add much to it since.) Your write-up could be reviewed and added
> there. Do you have edit permissions on the wiki?
>
> As for the purge interval config - yes the documentation can be
> improved a bit. It's one of those "internal" configs that generally
> don't need to be modified by users. The reason we added that was as
> follows:
> - We found that for low-volume topics, replica fetch requests were
> getting expired but sitting around in purgatory
> - This was because we were expiring them from the delay queue (used to
> track when requests should expire), but they were still sitting in the
> watcherFor map - i.e., they would get purged when the next producer
> request to that topic/partition arrived, but for low volume topics
> this could be a long time (or never in the worst case) and we would
> eventually run into an OOME.
> - So we needed to periodically go through the entire watcherFor map
> and explicitly remove those requests that had expired.
> - More details on this are in KAFKA-664.
>
> Thanks,
>
> Joel
>
> [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Internals
>
> On Fri, Nov 1, 2013 at 12:33 PM, Marc Labbe <mrlabbe@gmail.com> wrote:
> > Guozhang,
> >
> > I have to agree with Priya the doc isn't very clear. Although the
> > configuration is documented, it is simply rewording the name of the
> config,
> > which isn't particularly useful if you want more information about what
> the
> > purgatory is. I searched the whole wiki and doc and could not find
> anything
> > very useful as opposed looking a the code. In this case,
> > kafka.server.KafkaApis and kafka.server.RequestPurgatory will be your
> > friends.
> >
> > I'll try to add to Joe's answer here, mostly just reporting what's
> > available in the Scala doc from the project. I am doing this to
> understand
> > the mechanics myself btw.
> >
> > As Joe said, messages are not dropped by the purgatory but simply removed
> > from the purgatory when they are satisfied. Satisfaction conditions are
> > different for both fetch and produce requests and this is implemented in
> > their respective DelayedRequest implementation (DelayedFetch and
> > DelayedProduce).
> >
> > Requests purgatories are defined as follow in the code:
> >  - ProducerRequestPurgatory: A holding pen for produce requests waiting
> to
> > be satisfied.
> >  - FetchRequestPurgatory: A holding pen for fetch requests waiting to be
> > satisfied
> >
> > Each request purgatory runs a thread (ExpiredRequestReaper). This thread
> > will first try to find an expired delayed request. When one if found, it
> > will run the purgatory's expire method to handle the delayed request
> > expiration. In both produce and fetch cases, it sends a response to the
> > client. An expired request will be a satisfied request. The next step of
> > the thread's loop is when it checks for the configuration parameters you
> > asked for initially (purgatory.purge.interval.requests). When the number
> of
> > delayed requests given to watch by the purgatory reaches this value, it
> > goes through all previously queued requests and removes those which are
> > marked as satisfied. Because of that, it is really an interval more than
> it
> > is a threshold since it doesn't really care about the amount of satisfied
> > requests or the size of the queue.
> >
> > Producer request
> > - When is it added to purgatory (delayed)?:
> >   * when it uses ack=-1 (actually, the code tells me anything but 0 or
> 1);
> > Producer config: request.required.acks
> >   * partitions have more than one replica (in this case, ack=-1 isn't
> > different to ack=1 and it doesn't make much sense to use a delayed
> request)
> >   * not all partitions are in error
> > - When does it expire? when it reaches the timeout defined in the produce
> > request (ackTimeoutMs). Translates from producer config
> request.timeout.ms.
> > - What happens (on the broker) when it expires? Sends a response to the
> > client. Response content depends on the request of course.
> > - When is it satisfied? I didn't find the courage to dig into the details
> > of this one :(  ... but mainly when all the follower have also
> acknowledge
> > the produce request for their replica
> >
> > Fetch request
> > - When is it added to purgatory (delayed)? 2 parameters of the requests
> are
> > mainly useful here: max wait time and fetch size
> >   * if max wait is greater than 0; otherwise, it is a blocking call by
> the
> > consumer
> >   * if fetch size is greater than the current size of data available to
> > fulfil the request
> > - When does it expire?
> >   * wait time: the amount of time the consumer is willing to wait for
> data;
> > Consumer config: fetch.wait.max.ms
> > - When is it satisfied? the fetch size requested is reached - ie. the
> > amount of data the consumer wishes to receive in one response (from
> > consumer config: fetch.message.max.bytes)
> >
> > ******
> >
> > It would be useful to add some information about the metrics associated
> > with this.
> >
> > Of course, I am all for being corrected if I said anything wrong here.
> The
> > truth is always the code :-)
> >
> > marc
> > - mrtheb -
> >
> >
> > On Fri, Nov 1, 2013 at 2:45 AM, Priya Matpadi
> > <priya.matpadi@ecofactor.com>wrote:
> >
> >> Guozhang,
> >> The documentation is not very clear.
> >> Marc's response for producer purgatory makes sense.
> >> I am not entirely clear on fetch purgatory.
> >> How does broker use purgatory? Is it a temporary holding area? What
> happens
> >> to the messages if purge interval is exceeded in case of either/both
> >> producer and consumer? Are messages dropped in this case?
> >> Thanks,
> >> Priya
> >>
> >>
> >> On Thu, Oct 31, 2013 at 2:47 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >>
> >> > Hello Priya,
> >> >
> >> > You can find the definitions of these two configs here:
> >> >
> >> > http://kafka.apache.org/documentation.html#brokerconfigs
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Thu, Oct 31, 2013 at 11:20 AM, Marc Labbe <mrlabbe@gmail.com>
> wrote:
> >> >
> >> > > Hi Priya
> >> > >
> >> > > my understanding is producer requests will be delayed (and put in
> >> request
> >> > > purgatory) only if your producer uses ack=-1. It will be in the
> >> purgatory
> >> > > (delayed) until all brokers have acknowledged the messages to be
> >> > > replicated. The documentation suggests to monitor the
> >> > > ProducerRequestPurgatory size metrics , but it only applies if
> you're
> >> > using
> >> > > ack=-1, otherwise, this value will always be 0.
> >> > >
> >> > > For consumer requests, they'll be in purgatory (delayed) until the
> max
> >> > > allowed time to respond has been reached, unless it has enough
> messages
> >> > to
> >> > > fill the buffer before that. The request will not end up in the
> >> purgatory
> >> > > if you're making a blocking request (max wait <= 0).
> >> > >
> >> > > Not sure about the configuration interval though.
> >> > >
> >> > > marc
> >> > >
> >> > >
> >> > > On Thu, Oct 31, 2013 at 12:41 PM, Priya Matpadi <
> >> > > priya.matpadi@ecofactor.com
> >> > > > wrote:
> >> > >
> >> > > > Hello,
> >> > > > What is purgatory? I believe the following two properties relate
> to
> >> > > > consumer and producer respectively.
> >> > > > Could someone please explain the significance of these?
> >> > > > fetch.purgatory.purge.interval.requests=100
> >> > > > producer.purgatory.purge.interval.requests=100
> >> > > >
> >> > > > Thanks,
> >> > > > Priya
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message