kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: Kafka questions
Date Tue, 19 Jul 2011 17:44:20 GMT
Ah, I think what you are describing in zeromq is essentially the equivalent
of group commit for the socket. Essentially you wait until the socket is no
longer writable and then begin to queue data. This is an interesting idea.
Of course it would only have a positive effect when you had already
overflowed the socket buffer and were sending a very high throughput of
small messages. That basically is a way to degrade an overloaded synchronous
send into a batched send. This is not really the same as what we have done,
which is to allow the ability to trade off latency for throughput in a
configurable manner. The reason the later is important is that we do not
have a handful of producers sending at a rate that saturates the network I/O
capacity of those servers (the case where the group commit would help) but
rather we have thousands of producers sending at a medium low volume, so we
would never hit that in our use case. The advantage of batching is fewer
requests that hit the server, and larger packets. Where the group commit
would help is for the synchronous producer benchmarks, where you could
potentially get much better throughput. This is something we should consider
adding.

To be clear, though, we have not added latency in our layer, just made a
configurable way to trade-off latency for throughput. This is unambiguously
a good thing, I think.

With respect to mmap, i think you are misunderstanding where the latency
comes from. We immediately write data to the filesystem with no delay
whatsoever. This incurs the overhead of a system call, as you point out,
which could be avoided by mmap, but that doesn't add much in the way of
latency. The latency comes from the fact that we do not make the written
data available to consumers until we fsync the file to "ensure" the
durability of consumed messages. The frequency of the fsync is configurable,
anything either immediate or with a time or # messages threshold. This again
trades latency for throughput.

-Jay

On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <psutter@quantbench.com>wrote:

> *Producer latency* - I'm not familiar with zeromq internals but my
> understanding is that they send the first message(s) immediately and as TCP
> queues up the data, it will eventually block as the send buffer fills, and
> during this time messages can queue up, and thte net-net is that on average
> the number of system calls is << the number of messages. The key is having
> a
> separate thread for network operations with very efficient thread
> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is a blight
> against humanity.
>
> Having any sort of delay adds latency. If every developer thinks its OK to
> add a little latency in his layer, pretty soon you end up with 10 second
> end
> to end latency.
>
> Having an "accumulated message count" is also bad for WAN performance. If
> your "window size" is a set of delayed messages, the only way to deal with
> a
> large bandwidth*delay product is to delay a lot of messages, then send
> them.
> You can fit a lot of data into a fiber. Imagine a gigabit link with 100ms
> roundtrip time, you can store 100MB in the fiber. And you need a multiples
> of that for buffering if you need to do a retransmit.
>
> *Broker Latency *- With mmap the memcpy() of the message should make the
> data available to a thread even in another process, the pages that you have
> mapped are also in the buffer cache and available to a sendfile() call. or
> at least I think so. The flush to physical disk (or msync() in this case)
> would still be delayed but without impacting end to end latency.
>
> That said, in benchmarks I have done the fastest IO with the lowest CPU
> overhead is unbuffered (direct) IO (which is lower overhead than using the
> buffer cache with or without memory mapping), but then you'd have to manage
> your own buffer pool and run your broker in a single multithreaded process.
> But thats getting more extreme. Just getting rid of this buffer write delay
> by using memory mapping will remove a big chunk of latency.
>
>
>
>
>
>
>
>
>
> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Hi Paul,
> >
> > We are definitely interested in lowering latency--lower is always
> > better--but that was not a major concern for us so far (we were replacing
> a
> > system with 1 hour latency), so we haven't focused on it yet. As you
> > describe latency in our setup at linkedin comes from batching on the
> > frontend and batching on the kafka servers do to very lenient flush
> > settings.
> >
> > I am interested in your comments on zeromq. Do they actually have a
> better
> > approach for this problem even when using TCP? If so I would be
> interested
> > to understand. The way I see things this is about trading throughput and
> > latency. On the producer side you have only a few options: immediately
> > write
> > the data to the socket buffer for sending or wait and see if the
> > application
> > writes more data. The OS will do this for you unless you set TCP_NODELAY,
> > but the OS is relatively inflexible, it doesn't understand your data so I
> > think it just waits 200ms or until the socket buffer is full.
> >
> > The current approach in the async producer captures the same tradeoff,
> but
> > a
> > little more flexibly, it allows you to specify a max delay and max
> > accumulated message count, data is written when either of those is hit.
> >
> > Is it possible to better capture this tradeoff? Basically I am not aware
> of
> > any other trick here if you are using TCP, so i would be interested in
> what
> > zeromq does if they are doing this better.
> >
> > We do indeed write each message set to the filesystem as it arrives but
> we
> > distribute messages to consumers only after the write has been flushed to
> > disk, delaying (batching) that flush is the cause of the latency but also
> > gives better use of IOPs by generating larger writes. Mmap would remove
> the
> > system call (which would be good), but not the flush I think. As you say,
> > adding replication allows giving stronger guarantees without actually
> > caring
> > about durability on a particular server which would make it possible to
> > distribute messages to consumers after ack from some number of other
> > servers
> > irrespective of flushing to disk.
> >
> > -Jay
> >
> > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <psutter@quantbench.com>
> > wrote:
> >
> > > Jun
> > >
> > > Thanks for your answers and the link to the paper - that helps a lot,
> > > especially the comment in the paper that 10 second end to end latency
> is
> > > good enough for your intended use case.
> > >
> > > We're looking for much lower latencies, and the basic design of Kafka
> > feels
> > > like it should support latencies in milliseconds with a few changes.
> > We're
> > > either going to build our own system, or help develop something that
> > > already
> > > exists, so please take my comments in the constructive way they're
> > intended
> > > (I realize the changes I'm suggesting are outside your intended use
> case,
> > > but if you're interested we may be able to provide a very capable
> > developer
> > > to help with the work, assuming we choose kafka over the other zillion
> > > streaming systems that are coming out of the woodwork).
> > >
> > > a. *Producer "queue.time"* - In my question 4 below, I was referring to
> > the
> > > producer queue time.  With a default value of 5 seconds, that accounts
> > for
> > > half your end to end latency. A system like zeromq is optimized to
> write
> > > data immediately without delay, but in such a way to minimizes the
> number
> > > of
> > > system calls required during high throughput messages. Zeromq is no
> > > nirvana,
> > > but it has a number of nice properties.
> > >
> > > b. *Broker "log.default.flush.interval.ms"* - The default value of 3
> > > seconds
> > > appears to be another significant source of latency in the system,
> > assuming
> > > that clients are unable to access data until it has been flushed. Since
> > you
> > > have wisely chosen to take advantage of the buffer cache as part of
> your
> > > system design, it seems that you could remove this latency completely
> by
> > > memory mapping the partitions and memcpying each message as it arrives.
> > > With
> > > the right IPC mechanism clients could have immediate access to new
> > > messages.
> > >
> > > c. *Batching, sync vs async, replication, and auditing*. Its
> > understandable
> > > that you've chosen a a forensic approach to producer reliability (after
> > the
> > > fact auditing), but when you implement replication it would be really
> > nice
> > > to revise the producer protocol mechanisms. If you used a streaming
> > > mechanism with producer offsets and ACKs, you could ensure reliable
> > > delivery
> > > of producer streams to multiple brokers without the need to choose a
> > "batch
> > > size" or "queue.time". This could also give you active/active failover
> of
> > > brokers. This may also help in the WAN case (my question 3 below)
> because
> > > you will be able to adaptively stuff more and more data through the
> fiber
> > > for high bandwidth*delay links without having to choose a large "batch
> > > size"
> > > nor have the additional latency that entails. Oh, and it will help you
> > deal
> > > with CRC errors once you start checking for them.
> > >
> > > c. *Performance measurements* - I'd like to make a suggestion for your
> > > performance measurements. Your benchmarks measure throughput, but a
> > > throughput number is meaningless without an associated "% cpu time".
> > > Ideally
> > > all measurements achieve wire speed (100MB/sec) at 0% CPU (since, after
> > > all,
> > > this is plumbing and we assume the cores in the system should have
> > capacity
> > > set aside for useful work too). Obviously nobody ever achieves this,
> but
> > by
> > > measuring it one can raise the bar in terms of optimization.
> > >
> > > Paul
> > >
> > > ps. Just for background, I am the cofounder at Quantcast where we
> process
> > > 3.5PB of data per day. These questions are related to my new startup
> > > Quantbench which will deal with financial market data where you dont
> want
> > > any latency at all. And WAN issues are a big deal too. Incidentally, I
> > was
> > > also founder of Orbital Data which was a WAN optimization company so
> I've
> > > done a lot of work with protocols over long distances.
> > >
> > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > Paul,
> > > >
> > > > Excellent questions. See my answers below. Thanks,
> > > >
> > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <psutter@quantbench.com
> >
> > > > wrote:
> > > >
> > > > > Kafka looks like an exciting project, thanks for opening it up.
> > > > >
> > > > > I have a few questions:
> > > > >
> > > > > 1. Are checksums end to end (ie, created by the producer and
> checked
> > by
> > > > the
> > > > > consumer)? or are they only used to confirm buffercache behavior
on
> > > disk
> > > > as
> > > > > mentioned in the documentation? Bit errors occur vastly more often
> > than
> > > > > most
> > > > > people assume, often because of device driver bugs. TCP only
> detects
> > 1
> > > > > error
> > > > > in 65536, so errors can flow through (if you like I can send links
> to
> > > > > papers
> > > > > describing the need for checksums everywhere).
> > > > >
> > > >
> > > > Checksum is generated at the producer and propagated to the broker
> and
> > > > eventually the consumer. Currently, we only validate the checksum at
> > the
> > > > broker. We could further validate it at the consumer in the future.
> > > >
> > > > >
> > > > > 2. The consumer has a pretty solid mechanism to ensure it hasnt
> > missed
> > > > any
> > > > > messages (i like the design by the way), but how does the producer
> > know
> > > > > that
> > > > > all of its messages have been stored? (no apparent message id on
> that
> > > > side
> > > > > since the message id isnt known until the message is written to the
> > > > file).
> > > > > I'm especially curious how failover/replication could be
> implemented
> > > and
> > > > > I'm
> > > > > thinking that acks on the publisher side may help)
> > > > >
> > > >
> > > > The producer side auditing is not built-in. At LinkedIn, we do that
> by
> > > > generating an auditing event periodically in the eventhandler of the
> > > async
> > > > producer. The auditing event contains the number of events produced
> in
> > a
> > > > configured window (e.g., 10 minutes) and are sent to a separate
> topic.
> > > The
> > > > consumer can read the actual data and the auditing event and compare
> > the
> > > > counts. See our paper (
> > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > )
> > > > for some more details.
> > > >
> > > >
> > > > >
> > > > > 3. Has the consumer's flow control been tested over high
> > > bandwidth*delay
> > > > > links? (what bandwidth can you get from a London consumer of an SF
> > > > > cluster?)
> > > > >
> > > > > Yes, we actually replicate kafka data across data centers, using
an
> > > > embedded consumer in a broker. Again, there is a bit more info on
> this
> > in
> > > > our paper.
> > > >
> > > >
> > > > > 4. What kind of performance do you get if you set the producer's
> > > message
> > > > > delay to zero? (ie, is there a separate system call for each
> message?
> > > or
> > > > do
> > > > > you manage to aggregate messages into a smaller number of system
> > calls
> > > > even
> > > > > with a delay of 0?)
> > > > >
> > > > > I assume that you are referring to the flush interval. One can
> > > configure
> > > > to
> > > > flush every message to disk. This will slow down the throughput
> > > > significantly.
> > > >
> > > >
> > > > > 5. Have you considered using a library like zeromq for the
> messaging
> > > > layer
> > > > > instead of rolling your own? (zeromq will handle #4 cleanly at
> > millions
> > > > of
> > > > > messages per second and has clients in 20 languages)
> > > > >
> > > > > No. Our proprietary format allows us to support things like
> > compression
> > > > in
> > > > the future. However, we can definitely look into the zeromq format.
> Is
> > > > their
> > > > messaging layer easily extractable?
> > > >
> > > >
> > > > > 6. Do you have any plans to support intermediate processing
> elements
> > > the
> > > > > way
> > > > > Flume supports?
> > > > >
> > > > > For now, we are just focusing on getting the raw messaging layer
> > solid.
> > > > We
> > > > have worked a bit on streaming processing and will look into that
> again
> > > in
> > > > the future.
> > > >
> > > >
> > > > > 7. The docs mention that new versions will only be released after
> > they
> > > > are
> > > > > in production at LinkedIn? Does that mean that the latest version
> of
> > > the
> > > > > source code is hidden at LinkedIn and contributors would have to
> > throw
> > > > > patches over the wall and wait months to get the integrated
> product?
> > > > >
> > > > > What we ran at LinkedIn is the same version in open source and
> there
> > is
> > > > no
> > > > internal repository of Kafka at LinkedIn. We plan to maintain that in
> > the
> > > > future.
> > > >
> > > >
> > > > > Thanks!
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message