kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Kafka questions
Date Wed, 20 Jul 2011 16:07:07 GMT
Oliver,

We have a design for replication (see the design doc and subtasks at
https://issues.apache.org/jira/browse/KAFKA-50). We are currently wrapping
up the compression support and will start working on replication soon.

Jun

On Tue, Jul 19, 2011 at 12:59 PM, Olivier Pomel <oli@datadoghq.com> wrote:

> Thanks, guys, this was a great thread. May be worth pointing to it in the
> online docs as it asks and answers a lot of interesting questions about the
> performance characteristics and tradeoffs made in Kafka.
>
> How far out do you think built-in replication is?
> Best,
> O.
>
>
>
> On Tue, Jul 19, 2011 at 3:23 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Agreed, no reason the policy to hand out messages should not be
> > configurable. We were hoping to make the whole question irrelevant with
> the
> > replication since then the producer can choose the replication level it
> > wants and fsync durability should be less of a concern.
> >
> > I agree with your comment that a good implementation of streaming with
> acks
> > being potentially superior.
> >
> > -Jay
> >
> > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> > >wrote:
> >
> > > Jay,
> > >
> > > Ah - thanks for the clarification on the delay in the broker. It would
> be
> > > nice to if that were a configuration option, so that the end user can
> > > choose
> > > only to forward messages that have been written to disk, or choose to
> > have
> > > the data forwarded immediately. When you implement replication data
> > hitting
> > > the disk will matter less.
> > >
> > > On the delay in the producer, I think it could best be resolved through
> > > measurement. In your paper you compare two different approaches, and
> I'm
> > > proposing a third:
> > >
> > > 1. Send and wait (single message, JMS style)
> > > 2. Batch, send, and wait (Kafka today)
> > > 3. Stream with ACKs
> > >
> > > Removing any wait for a reply should increase throughput, not decrease
> > it,
> > > so you're likely trading latency against potential CPU efficiency. And
> > the
> > > CPU savings is a question best resolved by measurement.
> > >
> > > I'd also encourage you to think about the WAN case. When you
> > send-and-wait,
> > > you need to send a buffer that is >> the bandwidth delay product to
> > > approach
> > > full line utilization, and the line will go idle for one RTT while you
> > stop
> > > to wait for a reply. The bandwidth*delay product can get large (10s of
> > > megabytes), and end users will rarely understand the need to tune the
> > batch
> > > size to increase throughput. They'll just say it's slow over long
> > > distances.
> > >
> > > All that said - your use case doesn't require minimizing latency or WAN
> > > use,
> > > so I can really understand if this isn't a priority for you.
> > >
> > > It's a well designed product that has had some real thought put into
> it.
> > > It's a really promising system, thanks for taking the time to respond
> to
> > my
> > > comments.
> > >
> > > Paul
> > >
> > > On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> > >
> > > > Ah, I think what you are describing in zeromq is essentially the
> > > equivalent
> > > > of group commit for the socket. Essentially you wait until the socket
> > is
> > > no
> > > > longer writable and then begin to queue data. This is an interesting
> > > idea.
> > > > Of course it would only have a positive effect when you had already
> > > > overflowed the socket buffer and were sending a very high throughput
> of
> > > > small messages. That basically is a way to degrade an overloaded
> > > > synchronous
> > > > send into a batched send. This is not really the same as what we have
> > > done,
> > > > which is to allow the ability to trade off latency for throughput in
> a
> > > > configurable manner. The reason the later is important is that we do
> > not
> > > > have a handful of producers sending at a rate that saturates the
> > network
> > > > I/O
> > > > capacity of those servers (the case where the group commit would
> help)
> > > but
> > > > rather we have thousands of producers sending at a medium low volume,
> > so
> > > we
> > > > would never hit that in our use case. The advantage of batching is
> > fewer
> > > > requests that hit the server, and larger packets. Where the group
> > commit
> > > > would help is for the synchronous producer benchmarks, where you
> could
> > > > potentially get much better throughput. This is something we should
> > > > consider
> > > > adding.
> > > >
> > > > To be clear, though, we have not added latency in our layer, just
> made
> > a
> > > > configurable way to trade-off latency for throughput. This is
> > > unambiguously
> > > > a good thing, I think.
> > > >
> > > > With respect to mmap, i think you are misunderstanding where the
> > latency
> > > > comes from. We immediately write data to the filesystem with no delay
> > > > whatsoever. This incurs the overhead of a system call, as you point
> > out,
> > > > which could be avoided by mmap, but that doesn't add much in the way
> of
> > > > latency. The latency comes from the fact that we do not make the
> > written
> > > > data available to consumers until we fsync the file to "ensure" the
> > > > durability of consumed messages. The frequency of the fsync is
> > > > configurable,
> > > > anything either immediate or with a time or # messages threshold.
> This
> > > > again
> > > > trades latency for throughput.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
> psutter@quantbench.com
> > > > >wrote:
> > > >
> > > > > *Producer latency* - I'm not familiar with zeromq internals but my
> > > > > understanding is that they send the first message(s) immediately
> and
> > as
> > > > TCP
> > > > > queues up the data, it will eventually block as the send buffer
> > fills,
> > > > and
> > > > > during this time messages can queue up, and thte net-net is that
on
> > > > average
> > > > > the number of system calls is << the number of messages. The
key is
> > > > having
> > > > > a
> > > > > separate thread for network operations with very efficient thread
> > > > > coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle is
> a
> > > > blight
> > > > > against humanity.
> > > > >
> > > > > Having any sort of delay adds latency. If every developer thinks
> its
> > OK
> > > > to
> > > > > add a little latency in his layer, pretty soon you end up with 10
> > > second
> > > > > end
> > > > > to end latency.
> > > > >
> > > > > Having an "accumulated message count" is also bad for WAN
> > performance.
> > > If
> > > > > your "window size" is a set of delayed messages, the only way to
> deal
> > > > with
> > > > > a
> > > > > large bandwidth*delay product is to delay a lot of messages, then
> > send
> > > > > them.
> > > > > You can fit a lot of data into a fiber. Imagine a gigabit link with
> > > 100ms
> > > > > roundtrip time, you can store 100MB in the fiber. And you need a
> > > > multiples
> > > > > of that for buffering if you need to do a retransmit.
> > > > >
> > > > > *Broker Latency *- With mmap the memcpy() of the message should
> make
> > > the
> > > > > data available to a thread even in another process, the pages that
> > you
> > > > have
> > > > > mapped are also in the buffer cache and available to a sendfile()
> > call.
> > > > or
> > > > > at least I think so. The flush to physical disk (or msync() in this
> > > case)
> > > > > would still be delayed but without impacting end to end latency.
> > > > >
> > > > > That said, in benchmarks I have done the fastest IO with the lowest
> > CPU
> > > > > overhead is unbuffered (direct) IO (which is lower overhead than
> > using
> > > > the
> > > > > buffer cache with or without memory mapping), but then you'd have
> to
> > > > manage
> > > > > your own buffer pool and run your broker in a single multithreaded
> > > > process.
> > > > > But thats getting more extreme. Just getting rid of this buffer
> write
> > > > delay
> > > > > by using memory mapping will remove a big chunk of latency.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <jay.kreps@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Paul,
> > > > > >
> > > > > > We are definitely interested in lowering latency--lower is always
> > > > > > better--but that was not a major concern for us so far (we were
> > > > replacing
> > > > > a
> > > > > > system with 1 hour latency), so we haven't focused on it yet.
As
> > you
> > > > > > describe latency in our setup at linkedin comes from batching
on
> > the
> > > > > > frontend and batching on the kafka servers do to very lenient
> flush
> > > > > > settings.
> > > > > >
> > > > > > I am interested in your comments on zeromq. Do they actually
have
> a
> > > > > better
> > > > > > approach for this problem even when using TCP? If so I would
be
> > > > > interested
> > > > > > to understand. The way I see things this is about trading
> > throughput
> > > > and
> > > > > > latency. On the producer side you have only a few options:
> > > immediately
> > > > > > write
> > > > > > the data to the socket buffer for sending or wait and see if
the
> > > > > > application
> > > > > > writes more data. The OS will do this for you unless you set
> > > > TCP_NODELAY,
> > > > > > but the OS is relatively inflexible, it doesn't understand your
> > data
> > > so
> > > > I
> > > > > > think it just waits 200ms or until the socket buffer is full.
> > > > > >
> > > > > > The current approach in the async producer captures the same
> > > tradeoff,
> > > > > but
> > > > > > a
> > > > > > little more flexibly, it allows you to specify a max delay and
> max
> > > > > > accumulated message count, data is written when either of those
> is
> > > hit.
> > > > > >
> > > > > > Is it possible to better capture this tradeoff? Basically I
am
> not
> > > > aware
> > > > > of
> > > > > > any other trick here if you are using TCP, so i would be
> interested
> > > in
> > > > > what
> > > > > > zeromq does if they are doing this better.
> > > > > >
> > > > > > We do indeed write each message set to the filesystem as it
> arrives
> > > but
> > > > > we
> > > > > > distribute messages to consumers only after the write has been
> > > flushed
> > > > to
> > > > > > disk, delaying (batching) that flush is the cause of the latency
> > but
> > > > also
> > > > > > gives better use of IOPs by generating larger writes. Mmap would
> > > remove
> > > > > the
> > > > > > system call (which would be good), but not the flush I think.
As
> > you
> > > > say,
> > > > > > adding replication allows giving stronger guarantees without
> > actually
> > > > > > caring
> > > > > > about durability on a particular server which would make it
> > possible
> > > to
> > > > > > distribute messages to consumers after ack from some number
of
> > other
> > > > > > servers
> > > > > > irrespective of flushing to disk.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> > psutter@quantbench.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > Thanks for your answers and the link to the paper - that
helps
> a
> > > lot,
> > > > > > > especially the comment in the paper that 10 second end
to end
> > > latency
> > > > > is
> > > > > > > good enough for your intended use case.
> > > > > > >
> > > > > > > We're looking for much lower latencies, and the basic design
of
> > > Kafka
> > > > > > feels
> > > > > > > like it should support latencies in milliseconds with a
few
> > > changes.
> > > > > > We're
> > > > > > > either going to build our own system, or help develop something
> > > that
> > > > > > > already
> > > > > > > exists, so please take my comments in the constructive
way
> > they're
> > > > > > intended
> > > > > > > (I realize the changes I'm suggesting are outside your
intended
> > use
> > > > > case,
> > > > > > > but if you're interested we may be able to provide a very
> capable
> > > > > > developer
> > > > > > > to help with the work, assuming we choose kafka over the
other
> > > > zillion
> > > > > > > streaming systems that are coming out of the woodwork).
> > > > > > >
> > > > > > > a. *Producer "queue.time"* - In my question 4 below, I
was
> > > referring
> > > > to
> > > > > > the
> > > > > > > producer queue time.  With a default value of 5 seconds,
that
> > > > accounts
> > > > > > for
> > > > > > > half your end to end latency. A system like zeromq is optimized
> > to
> > > > > write
> > > > > > > data immediately without delay, but in such a way to minimizes
> > the
> > > > > number
> > > > > > > of
> > > > > > > system calls required during high throughput messages.
Zeromq
> is
> > no
> > > > > > > nirvana,
> > > > > > > but it has a number of nice properties.
> > > > > > >
> > > > > > > b. *Broker "log.default.flush.interval.ms"* - The default
> value
> > of
> > > 3
> > > > > > > seconds
> > > > > > > appears to be another significant source of latency in
the
> > system,
> > > > > > assuming
> > > > > > > that clients are unable to access data until it has been
> flushed.
> > > > Since
> > > > > > you
> > > > > > > have wisely chosen to take advantage of the buffer cache
as
> part
> > of
> > > > > your
> > > > > > > system design, it seems that you could remove this latency
> > > completely
> > > > > by
> > > > > > > memory mapping the partitions and memcpying each message
as it
> > > > arrives.
> > > > > > > With
> > > > > > > the right IPC mechanism clients could have immediate access
to
> > new
> > > > > > > messages.
> > > > > > >
> > > > > > > c. *Batching, sync vs async, replication, and auditing*.
Its
> > > > > > understandable
> > > > > > > that you've chosen a a forensic approach to producer
> reliability
> > > > (after
> > > > > > the
> > > > > > > fact auditing), but when you implement replication it would
be
> > > really
> > > > > > nice
> > > > > > > to revise the producer protocol mechanisms. If you used
a
> > streaming
> > > > > > > mechanism with producer offsets and ACKs, you could ensure
> > reliable
> > > > > > > delivery
> > > > > > > of producer streams to multiple brokers without the need
to
> > choose
> > > a
> > > > > > "batch
> > > > > > > size" or "queue.time". This could also give you active/active
> > > > failover
> > > > > of
> > > > > > > brokers. This may also help in the WAN case (my question
3
> below)
> > > > > because
> > > > > > > you will be able to adaptively stuff more and more data
through
> > the
> > > > > fiber
> > > > > > > for high bandwidth*delay links without having to choose
a large
> > > > "batch
> > > > > > > size"
> > > > > > > nor have the additional latency that entails. Oh, and it
will
> > help
> > > > you
> > > > > > deal
> > > > > > > with CRC errors once you start checking for them.
> > > > > > >
> > > > > > > c. *Performance measurements* - I'd like to make a suggestion
> for
> > > > your
> > > > > > > performance measurements. Your benchmarks measure throughput,
> but
> > a
> > > > > > > throughput number is meaningless without an associated
"% cpu
> > > time".
> > > > > > > Ideally
> > > > > > > all measurements achieve wire speed (100MB/sec) at 0% CPU
> (since,
> > > > after
> > > > > > > all,
> > > > > > > this is plumbing and we assume the cores in the system
should
> > have
> > > > > > capacity
> > > > > > > set aside for useful work too). Obviously nobody ever achieves
> > > this,
> > > > > but
> > > > > > by
> > > > > > > measuring it one can raise the bar in terms of optimization.
> > > > > > >
> > > > > > > Paul
> > > > > > >
> > > > > > > ps. Just for background, I am the cofounder at Quantcast
where
> we
> > > > > process
> > > > > > > 3.5PB of data per day. These questions are related to my
new
> > > startup
> > > > > > > Quantbench which will deal with financial market data where
you
> > > dont
> > > > > want
> > > > > > > any latency at all. And WAN issues are a big deal too.
> > > Incidentally,
> > > > I
> > > > > > was
> > > > > > > also founder of Orbital Data which was a WAN optimization
> company
> > > so
> > > > > I've
> > > > > > > done a lot of work with protocols over long distances.
> > > > > > >
> > > > > > > On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <junrao@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > Paul,
> > > > > > > >
> > > > > > > > Excellent questions. See my answers below. Thanks,
> > > > > > > >
> > > > > > > > On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > > > psutter@quantbench.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Kafka looks like an exciting project, thanks
for opening it
> > up.
> > > > > > > > >
> > > > > > > > > I have a few questions:
> > > > > > > > >
> > > > > > > > > 1. Are checksums end to end (ie, created by the
producer
> and
> > > > > checked
> > > > > > by
> > > > > > > > the
> > > > > > > > > consumer)? or are they only used to confirm buffercache
> > > behavior
> > > > on
> > > > > > > disk
> > > > > > > > as
> > > > > > > > > mentioned in the documentation? Bit errors occur
vastly
> more
> > > > often
> > > > > > than
> > > > > > > > > most
> > > > > > > > > people assume, often because of device driver
bugs. TCP
> only
> > > > > detects
> > > > > > 1
> > > > > > > > > error
> > > > > > > > > in 65536, so errors can flow through (if you
like I can
> send
> > > > links
> > > > > to
> > > > > > > > > papers
> > > > > > > > > describing the need for checksums everywhere).
> > > > > > > > >
> > > > > > > >
> > > > > > > > Checksum is generated at the producer and propagated
to the
> > > broker
> > > > > and
> > > > > > > > eventually the consumer. Currently, we only validate
the
> > checksum
> > > > at
> > > > > > the
> > > > > > > > broker. We could further validate it at the consumer
in the
> > > future.
> > > > > > > >
> > > > > > > > >
> > > > > > > > > 2. The consumer has a pretty solid mechanism
to ensure it
> > hasnt
> > > > > > missed
> > > > > > > > any
> > > > > > > > > messages (i like the design by the way), but
how does the
> > > > producer
> > > > > > know
> > > > > > > > > that
> > > > > > > > > all of its messages have been stored? (no apparent
message
> id
> > > on
> > > > > that
> > > > > > > > side
> > > > > > > > > since the message id isnt known until the message
is
> written
> > to
> > > > the
> > > > > > > > file).
> > > > > > > > > I'm especially curious how failover/replication
could be
> > > > > implemented
> > > > > > > and
> > > > > > > > > I'm
> > > > > > > > > thinking that acks on the publisher side may
help)
> > > > > > > > >
> > > > > > > >
> > > > > > > > The producer side auditing is not built-in. At LinkedIn,
we
> do
> > > that
> > > > > by
> > > > > > > > generating an auditing event periodically in the eventhandler
> > of
> > > > the
> > > > > > > async
> > > > > > > > producer. The auditing event contains the number of
events
> > > produced
> > > > > in
> > > > > > a
> > > > > > > > configured window (e.g., 10 minutes) and are sent
to a
> separate
> > > > > topic.
> > > > > > > The
> > > > > > > > consumer can read the actual data and the auditing
event and
> > > > compare
> > > > > > the
> > > > > > > > counts. See our paper (
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > > > > > > > )
> > > > > > > > for some more details.
> > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > > > 3. Has the consumer's flow control been tested
over high
> > > > > > > bandwidth*delay
> > > > > > > > > links? (what bandwidth can you get from a London
consumer
> of
> > an
> > > > SF
> > > > > > > > > cluster?)
> > > > > > > > >
> > > > > > > > > Yes, we actually replicate kafka data across
data centers,
> > > using
> > > > an
> > > > > > > > embedded consumer in a broker. Again, there is a bit
more
> info
> > on
> > > > > this
> > > > > > in
> > > > > > > > our paper.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 4. What kind of performance do you get if you
set the
> > > producer's
> > > > > > > message
> > > > > > > > > delay to zero? (ie, is there a separate system
call for
> each
> > > > > message?
> > > > > > > or
> > > > > > > > do
> > > > > > > > > you manage to aggregate messages into a smaller
number of
> > > system
> > > > > > calls
> > > > > > > > even
> > > > > > > > > with a delay of 0?)
> > > > > > > > >
> > > > > > > > > I assume that you are referring to the flush
interval. One
> > can
> > > > > > > configure
> > > > > > > > to
> > > > > > > > flush every message to disk. This will slow down the
> throughput
> > > > > > > > significantly.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 5. Have you considered using a library like zeromq
for the
> > > > > messaging
> > > > > > > > layer
> > > > > > > > > instead of rolling your own? (zeromq will handle
#4 cleanly
> > at
> > > > > > millions
> > > > > > > > of
> > > > > > > > > messages per second and has clients in 20 languages)
> > > > > > > > >
> > > > > > > > > No. Our proprietary format allows us to support
things like
> > > > > > compression
> > > > > > > > in
> > > > > > > > the future. However, we can definitely look into the
zeromq
> > > format.
> > > > > Is
> > > > > > > > their
> > > > > > > > messaging layer easily extractable?
> > > > > > > >
> > > > > > > >
> > > > > > > > > 6. Do you have any plans to support intermediate
processing
> > > > > elements
> > > > > > > the
> > > > > > > > > way
> > > > > > > > > Flume supports?
> > > > > > > > >
> > > > > > > > > For now, we are just focusing on getting the
raw messaging
> > > layer
> > > > > > solid.
> > > > > > > > We
> > > > > > > > have worked a bit on streaming processing and will
look into
> > that
> > > > > again
> > > > > > > in
> > > > > > > > the future.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 7. The docs mention that new versions will only
be released
> > > after
> > > > > > they
> > > > > > > > are
> > > > > > > > > in production at LinkedIn? Does that mean that
the latest
> > > version
> > > > > of
> > > > > > > the
> > > > > > > > > source code is hidden at LinkedIn and contributors
would
> have
> > > to
> > > > > > throw
> > > > > > > > > patches over the wall and wait months to get
the integrated
> > > > > product?
> > > > > > > > >
> > > > > > > > > What we ran at LinkedIn is the same version in
open source
> > and
> > > > > there
> > > > > > is
> > > > > > > > no
> > > > > > > > internal repository of Kafka at LinkedIn. We plan
to maintain
> > > that
> > > > in
> > > > > > the
> > > > > > > > future.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message