kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Sutter <psut...@quantbench.com>
Subject Re: Kafka questions
Date Wed, 20 Jul 2011 16:16:13 GMT
If the unflushed messages havent been acked to the publisher, they havent
been lost in the system.

On Wed, Jul 20, 2011 at 9:09 AM, Jun Rao <junrao@gmail.com> wrote:

> Paul,
>
> The only concern is that if we expose unflushed messages, those messages
> could disappear after a broker machine restart.
>
> Jun
>
> On Tue, Jul 19, 2011 at 2:02 PM, Paul Sutter <psutter@quantbench.com>
> wrote:
>
> > One more suggestion:
> >
> > Even before you have replication, it seems that you could delay producer
> > side acks until after the data is recorded to disk, and still pass the
> data
> > forward to consumers immediately.
> >
> >
> >
> > On Jul 19, 2011, at 12:23 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >
> > > Agreed, no reason the policy to hand out messages should not be
> > > configurable. We were hoping to make the whole question irrelevant with
> > the
> > > replication since then the producer can choose the replication level it
> > > wants and fsync durability should be less of a concern.
> > >
> > > I agree with your comment that a good implementation of streaming with
> > acks
> > > being potentially superior.
> > >
> > > -Jay
> > >
> > > On Tue, Jul 19, 2011 at 11:15 AM, Paul Sutter <psutter@quantbench.com
> > >wrote:
> > >
> > >> Jay,
> > >>
> > >> Ah - thanks for the clarification on the delay in the broker. It would
> > be
> > >> nice to if that were a configuration option, so that the end user can
> > >> choose
> > >> only to forward messages that have been written to disk, or choose to
> > have
> > >> the data forwarded immediately. When you implement replication data
> > hitting
> > >> the disk will matter less.
> > >>
> > >> On the delay in the producer, I think it could best be resolved
> through
> > >> measurement. In your paper you compare two different approaches, and
> I'm
> > >> proposing a third:
> > >>
> > >> 1. Send and wait (single message, JMS style)
> > >> 2. Batch, send, and wait (Kafka today)
> > >> 3. Stream with ACKs
> > >>
> > >> Removing any wait for a reply should increase throughput, not decrease
> > it,
> > >> so you're likely trading latency against potential CPU efficiency. And
> > the
> > >> CPU savings is a question best resolved by measurement.
> > >>
> > >> I'd also encourage you to think about the WAN case. When you
> > send-and-wait,
> > >> you need to send a buffer that is >> the bandwidth delay product
to
> > >> approach
> > >> full line utilization, and the line will go idle for one RTT while you
> > stop
> > >> to wait for a reply. The bandwidth*delay product can get large (10s of
> > >> megabytes), and end users will rarely understand the need to tune the
> > batch
> > >> size to increase throughput. They'll just say it's slow over long
> > >> distances.
> > >>
> > >> All that said - your use case doesn't require minimizing latency or
> WAN
> > >> use,
> > >> so I can really understand if this isn't a priority for you.
> > >>
> > >> It's a well designed product that has had some real thought put into
> it.
> > >> It's a really promising system, thanks for taking the time to respond
> to
> > my
> > >> comments.
> > >>
> > >> Paul
> > >>
> > >> On Tue, Jul 19, 2011 at 10:44 AM, Jay Kreps <jay.kreps@gmail.com>
> > wrote:
> > >>
> > >>> Ah, I think what you are describing in zeromq is essentially the
> > >> equivalent
> > >>> of group commit for the socket. Essentially you wait until the socket
> > is
> > >> no
> > >>> longer writable and then begin to queue data. This is an interesting
> > >> idea.
> > >>> Of course it would only have a positive effect when you had already
> > >>> overflowed the socket buffer and were sending a very high throughput
> of
> > >>> small messages. That basically is a way to degrade an overloaded
> > >>> synchronous
> > >>> send into a batched send. This is not really the same as what we have
> > >> done,
> > >>> which is to allow the ability to trade off latency for throughput in
> a
> > >>> configurable manner. The reason the later is important is that we do
> > not
> > >>> have a handful of producers sending at a rate that saturates the
> > network
> > >>> I/O
> > >>> capacity of those servers (the case where the group commit would
> help)
> > >> but
> > >>> rather we have thousands of producers sending at a medium low volume,
> > so
> > >> we
> > >>> would never hit that in our use case. The advantage of batching is
> > fewer
> > >>> requests that hit the server, and larger packets. Where the group
> > commit
> > >>> would help is for the synchronous producer benchmarks, where you
> could
> > >>> potentially get much better throughput. This is something we should
> > >>> consider
> > >>> adding.
> > >>>
> > >>> To be clear, though, we have not added latency in our layer, just
> made
> > a
> > >>> configurable way to trade-off latency for throughput. This is
> > >> unambiguously
> > >>> a good thing, I think.
> > >>>
> > >>> With respect to mmap, i think you are misunderstanding where the
> > latency
> > >>> comes from. We immediately write data to the filesystem with no delay
> > >>> whatsoever. This incurs the overhead of a system call, as you point
> > out,
> > >>> which could be avoided by mmap, but that doesn't add much in the way
> of
> > >>> latency. The latency comes from the fact that we do not make the
> > written
> > >>> data available to consumers until we fsync the file to "ensure" the
> > >>> durability of consumed messages. The frequency of the fsync is
> > >>> configurable,
> > >>> anything either immediate or with a time or # messages threshold.
> This
> > >>> again
> > >>> trades latency for throughput.
> > >>>
> > >>> -Jay
> > >>>
> > >>> On Mon, Jul 18, 2011 at 10:26 PM, Paul Sutter <
> psutter@quantbench.com
> > >>>> wrote:
> > >>>
> > >>>> *Producer latency* - I'm not familiar with zeromq internals but
my
> > >>>> understanding is that they send the first message(s) immediately
and
> > as
> > >>> TCP
> > >>>> queues up the data, it will eventually block as the send buffer
> fills,
> > >>> and
> > >>>> during this time messages can queue up, and thte net-net is that
on
> > >>> average
> > >>>> the number of system calls is << the number of messages.
The key is
> > >>> having
> > >>>> a
> > >>>> separate thread for network operations with very efficient thread
> > >>>> coordination.  Clearly Nagle is disabled (TCP_NODELAY) as Nagle
is a
> > >>> blight
> > >>>> against humanity.
> > >>>>
> > >>>> Having any sort of delay adds latency. If every developer thinks
its
> > OK
> > >>> to
> > >>>> add a little latency in his layer, pretty soon you end up with
10
> > >> second
> > >>>> end
> > >>>> to end latency.
> > >>>>
> > >>>> Having an "accumulated message count" is also bad for WAN
> performance.
> > >> If
> > >>>> your "window size" is a set of delayed messages, the only way to
> deal
> > >>> with
> > >>>> a
> > >>>> large bandwidth*delay product is to delay a lot of messages, then
> send
> > >>>> them.
> > >>>> You can fit a lot of data into a fiber. Imagine a gigabit link
with
> > >> 100ms
> > >>>> roundtrip time, you can store 100MB in the fiber. And you need
a
> > >>> multiples
> > >>>> of that for buffering if you need to do a retransmit.
> > >>>>
> > >>>> *Broker Latency *- With mmap the memcpy() of the message should
make
> > >> the
> > >>>> data available to a thread even in another process, the pages that
> you
> > >>> have
> > >>>> mapped are also in the buffer cache and available to a sendfile()
> > call.
> > >>> or
> > >>>> at least I think so. The flush to physical disk (or msync() in
this
> > >> case)
> > >>>> would still be delayed but without impacting end to end latency.
> > >>>>
> > >>>> That said, in benchmarks I have done the fastest IO with the lowest
> > CPU
> > >>>> overhead is unbuffered (direct) IO (which is lower overhead than
> using
> > >>> the
> > >>>> buffer cache with or without memory mapping), but then you'd have
to
> > >>> manage
> > >>>> your own buffer pool and run your broker in a single multithreaded
> > >>> process.
> > >>>> But thats getting more extreme. Just getting rid of this buffer
> write
> > >>> delay
> > >>>> by using memory mapping will remove a big chunk of latency.
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Mon, Jul 18, 2011 at 9:30 PM, Jay Kreps <jay.kreps@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> Hi Paul,
> > >>>>>
> > >>>>> We are definitely interested in lowering latency--lower is
always
> > >>>>> better--but that was not a major concern for us so far (we
were
> > >>> replacing
> > >>>> a
> > >>>>> system with 1 hour latency), so we haven't focused on it yet.
As
> you
> > >>>>> describe latency in our setup at linkedin comes from batching
on
> the
> > >>>>> frontend and batching on the kafka servers do to very lenient
flush
> > >>>>> settings.
> > >>>>>
> > >>>>> I am interested in your comments on zeromq. Do they actually
have a
> > >>>> better
> > >>>>> approach for this problem even when using TCP? If so I would
be
> > >>>> interested
> > >>>>> to understand. The way I see things this is about trading
> throughput
> > >>> and
> > >>>>> latency. On the producer side you have only a few options:
> > >> immediately
> > >>>>> write
> > >>>>> the data to the socket buffer for sending or wait and see if
the
> > >>>>> application
> > >>>>> writes more data. The OS will do this for you unless you set
> > >>> TCP_NODELAY,
> > >>>>> but the OS is relatively inflexible, it doesn't understand
your
> data
> > >> so
> > >>> I
> > >>>>> think it just waits 200ms or until the socket buffer is full.
> > >>>>>
> > >>>>> The current approach in the async producer captures the same
> > >> tradeoff,
> > >>>> but
> > >>>>> a
> > >>>>> little more flexibly, it allows you to specify a max delay
and max
> > >>>>> accumulated message count, data is written when either of those
is
> > >> hit.
> > >>>>>
> > >>>>> Is it possible to better capture this tradeoff? Basically I
am not
> > >>> aware
> > >>>> of
> > >>>>> any other trick here if you are using TCP, so i would be interested
> > >> in
> > >>>> what
> > >>>>> zeromq does if they are doing this better.
> > >>>>>
> > >>>>> We do indeed write each message set to the filesystem as it
arrives
> > >> but
> > >>>> we
> > >>>>> distribute messages to consumers only after the write has been
> > >> flushed
> > >>> to
> > >>>>> disk, delaying (batching) that flush is the cause of the latency
> but
> > >>> also
> > >>>>> gives better use of IOPs by generating larger writes. Mmap
would
> > >> remove
> > >>>> the
> > >>>>> system call (which would be good), but not the flush I think.
As
> you
> > >>> say,
> > >>>>> adding replication allows giving stronger guarantees without
> actually
> > >>>>> caring
> > >>>>> about durability on a particular server which would make it
> possible
> > >> to
> > >>>>> distribute messages to consumers after ack from some number
of
> other
> > >>>>> servers
> > >>>>> irrespective of flushing to disk.
> > >>>>>
> > >>>>> -Jay
> > >>>>>
> > >>>>> On Mon, Jul 18, 2011 at 8:27 PM, Paul Sutter <
> psutter@quantbench.com
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Jun
> > >>>>>>
> > >>>>>> Thanks for your answers and the link to the paper - that
helps a
> > >> lot,
> > >>>>>> especially the comment in the paper that 10 second end
to end
> > >> latency
> > >>>> is
> > >>>>>> good enough for your intended use case.
> > >>>>>>
> > >>>>>> We're looking for much lower latencies, and the basic design
of
> > >> Kafka
> > >>>>> feels
> > >>>>>> like it should support latencies in milliseconds with a
few
> > >> changes.
> > >>>>> We're
> > >>>>>> either going to build our own system, or help develop something
> > >> that
> > >>>>>> already
> > >>>>>> exists, so please take my comments in the constructive
way they're
> > >>>>> intended
> > >>>>>> (I realize the changes I'm suggesting are outside your
intended
> use
> > >>>> case,
> > >>>>>> but if you're interested we may be able to provide a very
capable
> > >>>>> developer
> > >>>>>> to help with the work, assuming we choose kafka over the
other
> > >>> zillion
> > >>>>>> streaming systems that are coming out of the woodwork).
> > >>>>>>
> > >>>>>> a. *Producer "queue.time"* - In my question 4 below, I
was
> > >> referring
> > >>> to
> > >>>>> the
> > >>>>>> producer queue time.  With a default value of 5 seconds,
that
> > >>> accounts
> > >>>>> for
> > >>>>>> half your end to end latency. A system like zeromq is optimized
to
> > >>>> write
> > >>>>>> data immediately without delay, but in such a way to minimizes
the
> > >>>> number
> > >>>>>> of
> > >>>>>> system calls required during high throughput messages.
Zeromq is
> no
> > >>>>>> nirvana,
> > >>>>>> but it has a number of nice properties.
> > >>>>>>
> > >>>>>> b. *Broker "log.default.flush.interval.ms"* - The default
value
> of
> > >> 3
> > >>>>>> seconds
> > >>>>>> appears to be another significant source of latency in
the system,
> > >>>>> assuming
> > >>>>>> that clients are unable to access data until it has been
flushed.
> > >>> Since
> > >>>>> you
> > >>>>>> have wisely chosen to take advantage of the buffer cache
as part
> of
> > >>>> your
> > >>>>>> system design, it seems that you could remove this latency
> > >> completely
> > >>>> by
> > >>>>>> memory mapping the partitions and memcpying each message
as it
> > >>> arrives.
> > >>>>>> With
> > >>>>>> the right IPC mechanism clients could have immediate access
to new
> > >>>>>> messages.
> > >>>>>>
> > >>>>>> c. *Batching, sync vs async, replication, and auditing*.
Its
> > >>>>> understandable
> > >>>>>> that you've chosen a a forensic approach to producer reliability
> > >>> (after
> > >>>>> the
> > >>>>>> fact auditing), but when you implement replication it would
be
> > >> really
> > >>>>> nice
> > >>>>>> to revise the producer protocol mechanisms. If you used
a
> streaming
> > >>>>>> mechanism with producer offsets and ACKs, you could ensure
> reliable
> > >>>>>> delivery
> > >>>>>> of producer streams to multiple brokers without the need
to choose
> > >> a
> > >>>>> "batch
> > >>>>>> size" or "queue.time". This could also give you active/active
> > >>> failover
> > >>>> of
> > >>>>>> brokers. This may also help in the WAN case (my question
3 below)
> > >>>> because
> > >>>>>> you will be able to adaptively stuff more and more data
through
> the
> > >>>> fiber
> > >>>>>> for high bandwidth*delay links without having to choose
a large
> > >>> "batch
> > >>>>>> size"
> > >>>>>> nor have the additional latency that entails. Oh, and it
will help
> > >>> you
> > >>>>> deal
> > >>>>>> with CRC errors once you start checking for them.
> > >>>>>>
> > >>>>>> c. *Performance measurements* - I'd like to make a suggestion
for
> > >>> your
> > >>>>>> performance measurements. Your benchmarks measure throughput,
but
> a
> > >>>>>> throughput number is meaningless without an associated
"% cpu
> > >> time".
> > >>>>>> Ideally
> > >>>>>> all measurements achieve wire speed (100MB/sec) at 0% CPU
(since,
> > >>> after
> > >>>>>> all,
> > >>>>>> this is plumbing and we assume the cores in the system
should have
> > >>>>> capacity
> > >>>>>> set aside for useful work too). Obviously nobody ever achieves
> > >> this,
> > >>>> but
> > >>>>> by
> > >>>>>> measuring it one can raise the bar in terms of optimization.
> > >>>>>>
> > >>>>>> Paul
> > >>>>>>
> > >>>>>> ps. Just for background, I am the cofounder at Quantcast
where we
> > >>>> process
> > >>>>>> 3.5PB of data per day. These questions are related to my
new
> > >> startup
> > >>>>>> Quantbench which will deal with financial market data where
you
> > >> dont
> > >>>> want
> > >>>>>> any latency at all. And WAN issues are a big deal too.
> > >> Incidentally,
> > >>> I
> > >>>>> was
> > >>>>>> also founder of Orbital Data which was a WAN optimization
company
> > >> so
> > >>>> I've
> > >>>>>> done a lot of work with protocols over long distances.
> > >>>>>>
> > >>>>>> On Mon, Jul 18, 2011 at 7:14 PM, Jun Rao <junrao@gmail.com>
> wrote:
> > >>>>>>
> > >>>>>>> Paul,
> > >>>>>>>
> > >>>>>>> Excellent questions. See my answers below. Thanks,
> > >>>>>>>
> > >>>>>>> On Mon, Jul 18, 2011 at 6:41 PM, Paul Sutter <
> > >>> psutter@quantbench.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Kafka looks like an exciting project, thanks for
opening it up.
> > >>>>>>>>
> > >>>>>>>> I have a few questions:
> > >>>>>>>>
> > >>>>>>>> 1. Are checksums end to end (ie, created by the
producer and
> > >>>> checked
> > >>>>> by
> > >>>>>>> the
> > >>>>>>>> consumer)? or are they only used to confirm buffercache
> > >> behavior
> > >>> on
> > >>>>>> disk
> > >>>>>>> as
> > >>>>>>>> mentioned in the documentation? Bit errors occur
vastly more
> > >>> often
> > >>>>> than
> > >>>>>>>> most
> > >>>>>>>> people assume, often because of device driver bugs.
TCP only
> > >>>> detects
> > >>>>> 1
> > >>>>>>>> error
> > >>>>>>>> in 65536, so errors can flow through (if you like
I can send
> > >>> links
> > >>>> to
> > >>>>>>>> papers
> > >>>>>>>> describing the need for checksums everywhere).
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> Checksum is generated at the producer and propagated
to the
> > >> broker
> > >>>> and
> > >>>>>>> eventually the consumer. Currently, we only validate
the checksum
> > >>> at
> > >>>>> the
> > >>>>>>> broker. We could further validate it at the consumer
in the
> > >> future.
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>> 2. The consumer has a pretty solid mechanism to
ensure it hasnt
> > >>>>> missed
> > >>>>>>> any
> > >>>>>>>> messages (i like the design by the way), but how
does the
> > >>> producer
> > >>>>> know
> > >>>>>>>> that
> > >>>>>>>> all of its messages have been stored? (no apparent
message id
> > >> on
> > >>>> that
> > >>>>>>> side
> > >>>>>>>> since the message id isnt known until the message
is written to
> > >>> the
> > >>>>>>> file).
> > >>>>>>>> I'm especially curious how failover/replication
could be
> > >>>> implemented
> > >>>>>> and
> > >>>>>>>> I'm
> > >>>>>>>> thinking that acks on the publisher side may help)
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> The producer side auditing is not built-in. At LinkedIn,
we do
> > >> that
> > >>>> by
> > >>>>>>> generating an auditing event periodically in the eventhandler
of
> > >>> the
> > >>>>>> async
> > >>>>>>> producer. The auditing event contains the number of
events
> > >> produced
> > >>>> in
> > >>>>> a
> > >>>>>>> configured window (e.g., 10 minutes) and are sent to
a separate
> > >>>> topic.
> > >>>>>> The
> > >>>>>>> consumer can read the actual data and the auditing
event and
> > >>> compare
> > >>>>> the
> > >>>>>>> counts. See our paper (
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
> > >>>>>>> )
> > >>>>>>> for some more details.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>>
> > >>>>>>>> 3. Has the consumer's flow control been tested
over high
> > >>>>>> bandwidth*delay
> > >>>>>>>> links? (what bandwidth can you get from a London
consumer of an
> > >>> SF
> > >>>>>>>> cluster?)
> > >>>>>>>>
> > >>>>>>>> Yes, we actually replicate kafka data across data
centers,
> > >> using
> > >>> an
> > >>>>>>> embedded consumer in a broker. Again, there is a bit
more info on
> > >>>> this
> > >>>>> in
> > >>>>>>> our paper.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 4. What kind of performance do you get if you set
the
> > >> producer's
> > >>>>>> message
> > >>>>>>>> delay to zero? (ie, is there a separate system
call for each
> > >>>> message?
> > >>>>>> or
> > >>>>>>> do
> > >>>>>>>> you manage to aggregate messages into a smaller
number of
> > >> system
> > >>>>> calls
> > >>>>>>> even
> > >>>>>>>> with a delay of 0?)
> > >>>>>>>>
> > >>>>>>>> I assume that you are referring to the flush interval.
One can
> > >>>>>> configure
> > >>>>>>> to
> > >>>>>>> flush every message to disk. This will slow down the
throughput
> > >>>>>>> significantly.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 5. Have you considered using a library like zeromq
for the
> > >>>> messaging
> > >>>>>>> layer
> > >>>>>>>> instead of rolling your own? (zeromq will handle
#4 cleanly at
> > >>>>> millions
> > >>>>>>> of
> > >>>>>>>> messages per second and has clients in 20 languages)
> > >>>>>>>>
> > >>>>>>>> No. Our proprietary format allows us to support
things like
> > >>>>> compression
> > >>>>>>> in
> > >>>>>>> the future. However, we can definitely look into the
zeromq
> > >> format.
> > >>>> Is
> > >>>>>>> their
> > >>>>>>> messaging layer easily extractable?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 6. Do you have any plans to support intermediate
processing
> > >>>> elements
> > >>>>>> the
> > >>>>>>>> way
> > >>>>>>>> Flume supports?
> > >>>>>>>>
> > >>>>>>>> For now, we are just focusing on getting the raw
messaging
> > >> layer
> > >>>>> solid.
> > >>>>>>> We
> > >>>>>>> have worked a bit on streaming processing and will
look into that
> > >>>> again
> > >>>>>> in
> > >>>>>>> the future.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> 7. The docs mention that new versions will only
be released
> > >> after
> > >>>>> they
> > >>>>>>> are
> > >>>>>>>> in production at LinkedIn? Does that mean that
the latest
> > >> version
> > >>>> of
> > >>>>>> the
> > >>>>>>>> source code is hidden at LinkedIn and contributors
would have
> > >> to
> > >>>>> throw
> > >>>>>>>> patches over the wall and wait months to get the
integrated
> > >>>> product?
> > >>>>>>>>
> > >>>>>>>> What we ran at LinkedIn is the same version in
open source and
> > >>>> there
> > >>>>> is
> > >>>>>>> no
> > >>>>>>> internal repository of Kafka at LinkedIn. We plan to
maintain
> > >> that
> > >>> in
> > >>>>> the
> > >>>>>>> future.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>> Thanks!
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message