kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Kafka questions
Date Wed, 20 Jul 2011 16:09:01 GMT
Paul,

The only concern is that if we expose unflushed messages, those messages
could disappear after a broker machine restart.

Jun

On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <psutter@quantbench.com> wrote:

> One more suggestion:
>
> Even before you have replication, it seems that you could delay producer
> side acks until after the data is recorded to disk, and still pass the data
> forward to consumers immediately.
>
>
>
> On Jul 19, 2011, at 12:23 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Agreed, no reason the policy to hand out messages should not be
> > configurable. We were hoping to make the whole question irrelevant with
> the
> > replication since then the producer can choose the replication level it
> > wants and fsync durability should be less of a concern.
> >
> > I agree with your comment that a good implementation of streaming with
> acks
> > being potentially superior.
> >
> > -Jay
> >
> > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> >wrote:
> >
> >> Jay,
> >>
> >> Ah - thanks for the clarification on the delay in the broker. It would
> be
> >> nice to if that were a configuration option, so that the end user can
> >> choose
> >> only to forward messages that have been written to disk, or choose to
> have
> >> the data forwarded immediately. When you implement replication data
> hitting
> >> the disk will matter less.
> >>
> >> On the delay in the producer, I think it could best be resolved through
> >> measurement. In your paper you compare two different approaches, and I'm
> >> proposing a third:
> >>
> >> 1. Send and wait (single message, JMS style)
> >> 2. Batch, send, and wait (Kafka today)
> >> 3. Stream with ACKs
> >>
> >> Removing any wait for a reply should increase throughput, not decrease
> it,
> >> so you're likely trading latency against potential CPU efficiency. And
> the
> >> CPU savings is a question best resolved by measurement.
> >>
> >> I'd also encourage you to think about the WAN case. When you
> send-and-wait,
> >> you need to send a buffer that is >> the bandwidth delay product to
> >> approach
> >> full line utilization, and the line will go idle for one RTT while you
> stop
> >> to wait for a reply. The bandwidth*delay product can get large (10s of
> >> megabytes), and end users will rarely understand the need to tune the
> batch
> >> size to increase throughput. They'll just say it's slow over long
> >> distances.
> >>
> >> All that said - your use case doesn't require minimizing latency or WAN
> >> use,
> >> so I can really understand if this isn't a priority for you.
> >>
> >> It's a well designed product that has had some real thought put into it.
> >> It's a really promising system, thanks for taking the time to respond to
> my
> >> comments.
> >>
> >> Paul
> >>
> >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> >>
> >>> Ah, I think what you are describing in zeromq is essentially the
> >> equivalent
> >>> of group commit for the socket. Essentially you wait until the socket
> is
> >> no
> >>> longer writable and then begin to queue data. This is an interesting
> >> idea.
> >>> Of course it would only have a positive effect when you had already
> >>> overflowed the socket buffer and were sending a very high throughput of
> >>> small messages. That basically is a way to degrade an overloaded
> >>> synchronous
> >>> send into a batched send. This is not really the same as what we have
> >> done,
> >>> which is to allow the ability to trade off latency for throughput in a
> >>> configurable manner. The reason the later is important is that we do
> not
> >>> have a handful of producers sending at a rate that saturates the
> network
> >>> I/O
> >>> capacity of those servers (the case where the group commit would help)
> >> but
> >>> rather we have thousands of producers sending at a medium low volume,
> so
> >> we
> >>> would never hit that in our use case. The advantage of batching is
> fewer
> >>> requests that hit the server, and larger packets. Where the group
> commit
> >>> would help is for the synchronous producer benchmarks, where you could
> >>> potentially get much better throughput. This is something we should
> >>> consider
> >>> adding.
> >>>
> >>> To be clear, though, we have not added latency in our layer, just made
> a
> >>> configurable way to trade-off latency for throughput. This is
> >> unambiguously
> >>> a good thing, I think.
> >>>
> >>> With respect to mmap, i think you are misunderstanding where the
> latency
> >>> comes from. We immediately write data to the filesystem with no delay
> >>> whatsoever. This incurs the overhead of a system call, as you point
> out,
> >>> which could be avoided by mmap, but that doesn't add much in the way of
> >>> latency. The latency comes from the fact that we do not make the
> written
> >>> data available to consumers until we fsync the file to "ensure" the
> >>> durability of consumed messages. The frequency of the fsync is
> >>> configurable,
> >>> anything either immediate or with a time or # messages threshold. This
> >>> again
> >>> trades latency for throughput.
> >>>
> >>> -Jay
> >>>
> >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com
> >>>> wrote:
> >>>
> >>>> *Producer latency* - I'm not familiar with zeromq internals but my
> >>>> understanding is that they send the first message(s) immediately and
> as
> >>> TCP
> >>>> queues up the data, it will eventually block as the send buffer fills,
> >>> and
> >>>> during this time messages can queue up, and thte net-net is that on
> >>> average
> >>>> the number of system calls is << the number of messages. The key
is
> >>> having
> >>>> a
> >>>> separate thread for network operations with very efficient thread
> >>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a
> >>> blight
> >>>> against humanity.
> >>>>
> >>>> Having any sort of delay adds latency. If every developer thinks its
> OK
> >>> to
> >>>> add a little latency in his layer, pretty soon you end up with 10
> >> second
> >>>> end
> >>>> to end latency.
> >>>>
> >>>> Having an "accumulated message count" is also bad for WAN performance.
> >> If
> >>>> your "window size" is a set of delayed messages, the only way to deal
> >>> with
> >>>> a
> >>>> large bandwidth*delay product is to delay a lot of messages, then send
> >>>> them.
> >>>> You can fit a lot of data into a fiber. Imagine a gigabit link with
> >> 100ms
> >>>> roundtrip time, you can store 100MB in the fiber. And you need a
> >>> multiples
> >>>> of that for buffering if you need to do a retransmit.
> >>>>
> >>>> *Broker Latency *- With mmap the memcpy() of the message should make
> >> the
> >>>> data available to a thread even in another process, the pages that you
> >>> have
> >>>> mapped are also in the buffer cache and available to a sendfile()
> call.
> >>> or
> >>>> at least I think so. The flush to physical disk (or msync() in this
> >> case)
> >>>> would still be delayed but without impacting end to end latency.
> >>>>
> >>>> That said, in benchmarks I have done the fastest IO with the lowest
> CPU
> >>>> overhead is unbuffered (direct) IO (which is lower overhead than using
> >>> the
> >>>> buffer cache with or without memory mapping), but then you'd have to
> >>> manage
> >>>> your own buffer pool and run your broker in a single multithreaded
> >>> process.
> >>>> But thats getting more extreme. Just getting rid of this buffer write
> >>> delay
> >>>> by using memory mapping will remove a big chunk of latency.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <jay.kreps@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Paul,
> >>>>>
> >>>>> We are definitely interested in lowering latency--lower is always
> >>>>> better--but that was not a major concern for us so far (we were
> >>> replacing
> >>>> a
> >>>>> system with 1 hour latency), so we haven't focused on it yet. As
you
> >>>>> describe latency in our setup at linkedin comes from batching on
the
> >>>>> frontend and batching on the kafka servers do to very lenient flush
> >>>>> settings.
> >>>>>
> >>>>> I am interested in your comments on zeromq. Do they actually have
a
> >>>> better
> >>>>> approach for this problem even when using TCP? If so I would be
> >>>> interested
> >>>>> to understand. The way I see things this is about trading throughput
> >>> and
> >>>>> latency. On the producer side you have only a few options:
> >> immediately
> >>>>> write
> >>>>> the data to the socket buffer for sending or wait and see if the
> >>>>> application
> >>>>> writes more data. The OS will do this for you unless you set
> >>> TCP_NODELAY,
> >>>>> but the OS is relatively inflexible, it doesn't understand your
data
> >> so
> >>> I
> >>>>> think it just waits 200ms or until the socket buffer is full.
> >>>>>
> >>>>> The current approach in the async producer captures the same
> >> tradeoff,
> >>>> but
> >>>>> a
> >>>>> little more flexibly, it allows you to specify a max delay and max
> >>>>> accumulated message count, data is written when either of those
is
> >> hit.
> >>>>>
> >>>>> Is it possible to better capture this tradeoff? Basically I am not
> >>> aware
> >>>> of
> >>>>> any other trick here if you are using TCP, so i would be interested
> >> in
> >>>> what
> >>>>> zeromq does if they are doing this better.
> >>>>>
> >>>>> We do indeed write each message set to the filesystem as it arrives
> >> but
> >>>> we
> >>>>> distribute messages to consumers only after the write has been
> >> flushed
> >>> to
> >>>>> disk, delaying (batching) that flush is the cause of the latency
but
> >>> also
> >>>>> gives better use of IOPs by generating larger writes. Mmap would
> >> remove
> >>>> the
> >>>>> system call (which would be good), but not the flush I think. As
you
> >>> say,
> >>>>> adding replication allows giving stronger guarantees without actually
> >>>>> caring
> >>>>> about durability on a particular server which would make it possible
> >> to
> >>>>> distribute messages to consumers after ack from some number of other
> >>>>> servers
> >>>>> irrespective of flushing to disk.
> >>>>>
> >>>>> -Jay
> >>>>>
> >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <psutter@quantbench.com
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>> Thanks for your answers and the link to the paper - that helps
a
> >> lot,
> >>>>>> especially the comment in the paper that 10 second end to end
> >> latency
> >>>> is
> >>>>>> good enough for your intended use case.
> >>>>>>
> >>>>>> We're looking for much lower latencies, and the basic design
of
> >> Kafka
> >>>>> feels
> >>>>>> like it should support latencies in milliseconds with a few
> >> changes.
> >>>>> We're
> >>>>>> either going to build our own system, or help develop something
> >> that
> >>>>>> already
> >>>>>> exists, so please take my comments in the constructive way they're
> >>>>> intended
> >>>>>> (I realize the changes I'm suggesting are outside your intended
use
> >>>> case,
> >>>>>> but if you're interested we may be able to provide a very capable
> >>>>> developer
> >>>>>> to help with the work, assuming we choose kafka over the other
> >>> zillion
> >>>>>> streaming systems that are coming out of the woodwork).
> >>>>>>
> >>>>>> a. *Producer "queue.time"* - In my question 4 below, I was
> >> referring
> >>> to
> >>>>> the
> >>>>>> producer queue time.  With a default value of 5 seconds, that
> >>> accounts
> >>>>> for
> >>>>>> half your end to end latency. A system like zeromq is optimized
to
> >>>> write
> >>>>>> data immediately without delay, but in such a way to minimizes
the
> >>>> number
> >>>>>> of
> >>>>>> system calls required during high throughput messages. Zeromq
is no
> >>>>>> nirvana,
> >>>>>> but it has a number of nice properties.
> >>>>>>
> >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default value
of
> >> 3
> >>>>>> seconds
> >>>>>> appears to be another significant source of latency in the system,
> >>>>> assuming
> >>>>>> that clients are unable to access data until it has been flushed.
> >>> Since
> >>>>> you
> >>>>>> have wisely chosen to take advantage of the buffer cache as
part of
> >>>> your
> >>>>>> system design, it seems that you could remove this latency
> >> completely
> >>>> by
> >>>>>> memory mapping the partitions and memcpying each message as
it
> >>> arrives.
> >>>>>> With
> >>>>>> the right IPC mechanism clients could have immediate access
to new
> >>>>>> messages.
> >>>>>>
> >>>>>> c. *Batching, sync vs async, replication, and auditing*. Its
> >>>>> understandable
> >>>>>> that you've chosen a a forensic approach to producer reliability
> >>> (after
> >>>>> the
> >>>>>> fact auditing), but when you implement replication it would
be
> >> really
> >>>>> nice
> >>>>>> to revise the producer protocol mechanisms. If you used a streaming
> >>>>>> mechanism with producer offsets and ACKs, you could ensure reliable
> >>>>>> delivery
> >>>>>> of producer streams to multiple brokers without the need to
choose
> >> a
> >>>>> "batch
> >>>>>> size" or "queue.time". This could also give you active/active
> >>> failover
> >>>> of
> >>>>>> brokers. This may also help in the WAN case (my question 3 below)
> >>>> because
> >>>>>> you will be able to adaptively stuff more and more data through
the
> >>>> fiber
> >>>>>> for high bandwidth*delay links without having to choose a large
> >>> "batch
> >>>>>> size"
> >>>>>> nor have the additional latency that entails. Oh, and it will
help
> >>> you
> >>>>> deal
> >>>>>> with CRC errors once you start checking for them.
> >>>>>>
> >>>>>> c. *Performance measurements* - I'd like to make a suggestion
for
> >>> your
> >>>>>> performance measurements. Your benchmarks measure throughput,
but a
> >>>>>> throughput number is meaningless without an associated "% cpu
> >> time".
> >>>>>> Ideally
> >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU (since,
> >>> after
> >>>>>> all,
> >>>>>> this is plumbing and we assume the cores in the system should
have
> >>>>> capacity
> >>>>>> set aside for useful work too). Obviously nobody ever achieves
> >> this,
> >>>> but
> >>>>> by
> >>>>>> measuring it one can raise the bar in terms of optimization.
> >>>>>>
> >>>>>> Paul
> >>>>>>
> >>>>>> ps. Just for background, I am the cofounder at Quantcast where
we
> >>>> process
> >>>>>> 3.5PB of data per day. These questions are related to my new
> >> startup
> >>>>>> Quantbench which will deal with financial market data where
you
> >> dont
> >>>> want
> >>>>>> any latency at all. And WAN issues are a big deal too.
> >> Incidentally,
> >>> I
> >>>>> was
> >>>>>> also founder of Orbital Data which was a WAN optimization company
> >> so
> >>>> I've
> >>>>>> done a lot of work with protocols over long distances.
> >>>>>>
> >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <junrao@gmail.com>
wrote:
> >>>>>>
> >>>>>>> Paul,
> >>>>>>>
> >>>>>>> Excellent questions. See my answers below. Thanks,
> >>>>>>>
> >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> >>> psutter@quantbench.com
> >>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Kafka looks like an exciting project, thanks for opening
it up.
> >>>>>>>>
> >>>>>>>> I have a few questions:
> >>>>>>>>
> >>>>>>>> 1. Are checksums end to end (ie, created by the producer
and
> >>>> checked
> >>>>> by
> >>>>>>> the
> >>>>>>>> consumer)? or are they only used to confirm buffercache
> >> behavior
> >>> on
> >>>>>> disk
> >>>>>>> as
> >>>>>>>> mentioned in the documentation? Bit errors occur vastly
more
> >>> often
> >>>>> than
> >>>>>>>> most
> >>>>>>>> people assume, often because of device driver bugs.
TCP only
> >>>> detects
> >>>>> 1
> >>>>>>>> error
> >>>>>>>> in 65536, so errors can flow through (if you like I
can send
> >>> links
> >>>> to
> >>>>>>>> papers
> >>>>>>>> describing the need for checksums everywhere).
> >>>>>>>>
> >>>>>>>
> >>>>>>> Checksum is generated at the producer and propagated to
the
> >> broker
> >>>> and
> >>>>>>> eventually the consumer. Currently, we only validate the
checksum
> >>> at
> >>>>> the
> >>>>>>> broker. We could further validate it at the consumer in
the
> >> future.
> >>>>>>>
> >>>>>>>>
> >>>>>>>> 2. The consumer has a pretty solid mechanism to ensure
it hasnt
> >>>>> missed
> >>>>>>> any
> >>>>>>>> messages (i like the design by the way), but how does
the
> >>> producer
> >>>>> know
> >>>>>>>> that
> >>>>>>>> all of its messages have been stored? (no apparent message
id
> >> on
> >>>> that
> >>>>>>> side
> >>>>>>>> since the message id isnt known until the message is
written to
> >>> the
> >>>>>>> file).
> >>>>>>>> I'm especially curious how failover/replication could
be
> >>>> implemented
> >>>>>> and
> >>>>>>>> I'm
> >>>>>>>> thinking that acks on the publisher side may help)
> >>>>>>>>
> >>>>>>>
> >>>>>>> The producer side auditing is not built-in. At LinkedIn,
we do
> >> that
> >>>> by
> >>>>>>> generating an auditing event periodically in the eventhandler
of
> >>> the
> >>>>>> async
> >>>>>>> producer. The auditing event contains the number of events
> >> produced
> >>>> in
> >>>>> a
> >>>>>>> configured window (e.g., 10 minutes) and are sent to a separate
> >>>> topic.
> >>>>>> The
> >>>>>>> consumer can read the actual data and the auditing event
and
> >>> compare
> >>>>> the
> >>>>>>> counts. See our paper (
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> >>>>>>> )
> >>>>>>> for some more details.
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> 3. Has the consumer's flow control been tested over
high
> >>>>>> bandwidth*delay
> >>>>>>>> links? (what bandwidth can you get from a London consumer
of an
> >>> SF
> >>>>>>>> cluster?)
> >>>>>>>>
> >>>>>>>> Yes, we actually replicate kafka data across data centers,
> >> using
> >>> an
> >>>>>>> embedded consumer in a broker. Again, there is a bit more
info on
> >>>> this
> >>>>> in
> >>>>>>> our paper.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 4. What kind of performance do you get if you set the
> >> producer's
> >>>>>> message
> >>>>>>>> delay to zero? (ie, is there a separate system call
for each
> >>>> message?
> >>>>>> or
> >>>>>>> do
> >>>>>>>> you manage to aggregate messages into a smaller number
of
> >> system
> >>>>> calls
> >>>>>>> even
> >>>>>>>> with a delay of 0?)
> >>>>>>>>
> >>>>>>>> I assume that you are referring to the flush interval.
One can
> >>>>>> configure
> >>>>>>> to
> >>>>>>> flush every message to disk. This will slow down the throughput
> >>>>>>> significantly.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 5. Have you considered using a library like zeromq for
the
> >>>> messaging
> >>>>>>> layer
> >>>>>>>> instead of rolling your own? (zeromq will handle #4
cleanly at
> >>>>> millions
> >>>>>>> of
> >>>>>>>> messages per second and has clients in 20 languages)
> >>>>>>>>
> >>>>>>>> No. Our proprietary format allows us to support things
like
> >>>>> compression
> >>>>>>> in
> >>>>>>> the future. However, we can definitely look into the zeromq
> >> format.
> >>>> Is
> >>>>>>> their
> >>>>>>> messaging layer easily extractable?
> >>>>>>>
> >>>>>>>
> >>>>>>>> 6. Do you have any plans to support intermediate processing
> >>>> elements
> >>>>>> the
> >>>>>>>> way
> >>>>>>>> Flume supports?
> >>>>>>>>
> >>>>>>>> For now, we are just focusing on getting the raw messaging
> >> layer
> >>>>> solid.
> >>>>>>> We
> >>>>>>> have worked a bit on streaming processing and will look
into that
> >>>> again
> >>>>>> in
> >>>>>>> the future.
> >>>>>>>
> >>>>>>>
> >>>>>>>> 7. The docs mention that new versions will only be released
> >> after
> >>>>> they
> >>>>>>> are
> >>>>>>>> in production at LinkedIn? Does that mean that the latest
> >> version
> >>>> of
> >>>>>> the
> >>>>>>>> source code is hidden at LinkedIn and contributors would
have
> >> to
> >>>>> throw
> >>>>>>>> patches over the wall and wait months to get the integrated
> >>>> product?
> >>>>>>>>
> >>>>>>>> What we ran at LinkedIn is the same version in open
source and
> >>>> there
> >>>>> is
> >>>>>>> no
> >>>>>>> internal repository of Kafka at LinkedIn. We plan to maintain
> >> that
> >>> in
> >>>>> the
> >>>>>>> future.
> >>>>>>>
> >>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message