kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Parthasarathy, Mohan" <mpart...@hpe.com>
Subject Re: Can kafka internal state be purged ?
Date Tue, 25 Jun 2019 00:25:30 GMT
John,

Thanks for the nice explanation. When the repartitioning happens, does the window get associated
with the new partition i.e., now does a message with new timestamp has to appear on the repartition
topic for the window to expire ? It is possible that there is new stream of messages coming
in but post-map operation, the partitions in the repartitioned topic does not see the same
thing.

Thanks
Mohan

´╗┐On 6/24/19, 7:49 AM, "John Roesler" <john@confluent.io> wrote:

    Hey, this is a very apt question.
    
    GroupByKey isn't a great example because it doesn't actually change
    the key, so all the aggregation results are actually on records from
    the same partition. But let's say you do a groupBy or a map (or any
    operation that can change the key), followed by an aggregation. Now
    it's possible that the aggregation would need to process records from
    two different partitions. In such a case (key-changing operation
    followed by a stateful operation), Streams actually round-trips the
    data through an intermediate topic, called a repartition topic, before
    the aggregation. This has the effect, similar to the "shuffle" phase
    of map-reduce, of putting all the data into its *new* right partition,
    so then the aggregation can still process each of its partitions
    independently.
    
    Regarding the latter statement, even though you only have one
    instance, Streams _still_ processes each partition independently. The
    "unit of work" responsible for processing a partition is called a
    "task". So if you have 4 partitions, then your one instance actually
    has 4 state stores, one for each task, where each task only gets
    records from a single partition. The tasks can't see anything about
    each other, not their state nor other metadata like their current
    stream time. Otherwise, the results would depend on which tasks happen
    to be co-located with which other tasks. So, having to send your
    "purge" event to all partitions is a pain, but in the end, it buys you
    a lot, as you can add another instance to your cluster at any time,
    and Streams will scale up, and you'll know that the program is
    executing exactly the same way the whole time.
    
    -John
    
    On Sat, Jun 22, 2019 at 4:37 PM Parthasarathy, Mohan <mparthas@hpe.com> wrote:
    >
    > I can see the issue. But it raised other questions. Pardon my ignorance. Even though
partitions are processed independently, windows can be aggregating state from records read
from many partitions. Let us say there is a groupByKey followed by aggregate. In this case
how is the state reconciled across all the application instances ? Is there a designated instance
for a particular key ?
    >
    > In my case, there was only one instance processing records from all partitions and
it is kind of odd that windows did not expire even though I understand why now.
    >
    > Thanks
    > Mohan
    >
    >
    > On 6/21/19, 2:25 PM, "John Roesler" <john@confluent.io> wrote:
    >
    >     No problem. It's definitely a subtlety. It occurs because each
    >     partition is processed completely independently of the others, so
    >     "stream time" is tracked per partition, and there's no way to look
    >     across at the other partitions to find out what stream time they have.
    >
    >     In general, it's not a problem because you'd expect all partitions to
    >     receive updates over time, but if you're specifically trying to send
    >     events that cause stuff to get flushed from the buffers, it can mess
    >     with you. It's especially notable in tests. So, for most tests, I just
    >     configure the topics to have one partition.
    >
    >     -John
    >
    >     On Fri, Jun 21, 2019 at 3:56 PM Parthasarathy, Mohan <mparthas@hpe.com>
wrote:
    >     >
    >     > That change "In the same partition" must explain what we are seeing. Unless
you see one message per partition, all windows will not expire. That is an interesting twist.
Thanks for the correction ( I will go back and confirm this.
    >     >
    >     > -mohan
    >     >
    >     >
    >     > On 6/21/19, 12:40 PM, "John Roesler" <john@confluent.io> wrote:
    >     >
    >     >     Sure, the record cache attempts to save downstream operators from
    >     >     unnecessary updates by also buffering for a short amount of time
    >     >     before forwarding. It forwards results whenever the cache fills up or
    >     >     whenever there is a commit. If you're happy to wait at least "commit
    >     >     interval" amount of time for updates, then you don't need to do
    >     >     anything, but if you're on the edge of your seat, waiting for these
    >     >     results, you can set cache.max.bytes.buffering to 0 to disable the
    >     >     record cache entirely. Note that this would hurt throughput in
    >     >     general, though.
    >     >
    >     >     Just a slight modification:
    >     >     * a new record with new timestamp > (all the previous timestamps
+
    >     >     grace period) will cause all the old windows *in the same partition*
    >     >     to close
    >     >     * yes, expiry of the window depends only on the event time
    >     >
    >     >     Hope this helps!
    >     >     -John
    >     >
    >     >     On Thu, Jun 20, 2019 at 11:42 AM Parthasarathy, Mohan <mparthas@hpe.com>
wrote:
    >     >     >
    >     >     > Could you tell me a little more about the delays about the record
caches and how I can disable it ?
    >     >     >
    >     >     >  If I could summarize my problem:
    >     >     >
    >     >     > -A new record with a new timestamp > all records sent before,
I expect *all* of the old windows to close
    >     >     > -Expiry of the windows depends only on the event time and not on
the key
    >     >     >
    >     >     > Are these two statements correct ?
    >     >     >
    >     >     > Thanks
    >     >     > Mohan
    >     >     >
    >     >     > On 6/20/19, 9:17 AM, "John Roesler" <john@confluent.io> wrote:
    >     >     >
    >     >     >     Hi!
    >     >     >
    >     >     >     In addition to setting the grace period to zero (or some small
    >     >     >     number), you should also consider the delays introduced by
record
    >     >     >     caches upstream of the suppression. If you're closely watching
the
    >     >     >     timing of records going into and coming out of the topology,
this
    >     >     >     might also spoil your expectations. You could always disable
the
    >     >     >     record cache to make the system more predictable (although
this would
    >     >     >     hurt throughput in production).
    >     >     >
    >     >     >     Thanks,
    >     >     >     -John
    >     >     >
    >     >     >     On Wed, Jun 19, 2019 at 3:01 PM Parthasarathy, Mohan <mparthas@hpe.com>
wrote:
    >     >     >     >
    >     >     >     > We do explicitly set the grace period to zero. I am going
to try the new version
    >     >     >     >
    >     >     >     > -mohan
    >     >     >     >
    >     >     >     >
    >     >     >     > On 6/19/19, 12:50 PM, "Parthasarathy, Mohan" <mparthas@hpe.com>
wrote:
    >     >     >     >
    >     >     >     >     Thanks. We will give it a shot.
    >     >     >     >
    >     >     >     >     On 6/19/19, 12:42 PM, "Bruno Cadonna" <bruno@confluent.io>
wrote:
    >     >     >     >
    >     >     >     >         Hi Mohan,
    >     >     >     >
    >     >     >     >         I realized that my previous statement was not
clear. With a grace
    >     >     >     >         period of 12 hour, suppress would wait for late
events until stream
    >     >     >     >         time has advanced 12 hours before a result would
be emitted.
    >     >     >     >
    >     >     >     >         Best,
    >     >     >     >         Bruno
    >     >     >     >
    >     >     >     >         On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna
<bruno@confluent.io> wrote:
    >     >     >     >         >
    >     >     >     >         > Hi Mohan,
    >     >     >     >         >
    >     >     >     >         > if you do not set a grace period, the grace
period defaults to 12
    >     >     >     >         > hours. Hence, suppress would wait for an
event that occurs 12 hour
    >     >     >     >         > later before it outputs a result. Try to
explicitly set the grace
    >     >     >     >         > period to 0 and let us know if it worked.
    >     >     >     >         >
    >     >     >     >         > If it still does not work, upgrade to version
2.2.1 if it is possible
    >     >     >     >         > for you. We had a couple of bugs in suppress
recently that are fixed
    >     >     >     >         > in that version.
    >     >     >     >         >
    >     >     >     >         > Best,
    >     >     >     >         > Bruno
    >     >     >     >         >
    >     >     >     >         > On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy,
Mohan <mparthas@hpe.com> wrote:
    >     >     >     >         > >
    >     >     >     >         > > No, I have not set any grace period.
Is that mandatory ? Have you seen problems with suppress and windows expiring ?
    >     >     >     >         > >
    >     >     >     >         > > Thanks
    >     >     >     >         > > Mohan
    >     >     >     >         > >
    >     >     >     >         > > On 6/19/19, 12:41 AM, "Bruno Cadonna"
<bruno@confluent.io> wrote:
    >     >     >     >         > >
    >     >     >     >         > >     Hi Mohan,
    >     >     >     >         > >
    >     >     >     >         > >     Did you set a grace period on the
window?
    >     >     >     >         > >
    >     >     >     >         > >     Best,
    >     >     >     >         > >     Bruno
    >     >     >     >         > >
    >     >     >     >         > >     On Tue, Jun 18, 2019 at 2:04 AM
Parthasarathy, Mohan <mparthas@hpe.com> wrote:
    >     >     >     >         > >     >
    >     >     >     >         > >     > On further debugging, what
we are seeing is that windows are expiring rather randomly as new messages are being processed.
. We tested with new key for every new message. We waited for the window time before replaying
new messages. Sometimes a new message would come in and create state. It takes several messages
to make some of the old windows to be closed (go past suppress to the next stage). We have
also seen where one of them never closed even but several other older ones expired.  Then
we explicitly sent a message with the same old key and then it showed up. Also, for every
new message, only one of the previous window expires even though there are several pending.
    >     >     >     >         > >     >
    >     >     >     >         > >     > If we don't use suppress, then
there is never an issue. With suppress, the behavior we are seeing is weird. We are using
2.1.0 version in DSL mode. Any clues on what we could be missing ? Why isn't there an order
in the way windows are closed ? As event time progresses by the new messages arriving, the
older ones should expire. Is that right understanding or not ?
    >     >     >     >         > >     >
    >     >     >     >         > >     > Thanks
    >     >     >     >         > >     > Mohan
    >     >     >     >         > >     >
    >     >     >     >         > >     > On 6/17/19, 3:43 PM, "Parthasarathy,
Mohan" <mparthas@hpe.com> wrote:
    >     >     >     >         > >     >
    >     >     >     >         > >     >     Hi,
    >     >     >     >         > >     >
    >     >     >     >         > >     >     We are using suppress in
the application. We see some state being created at some point in time. Now there is no new
data for a day or two. We send new data but the old window of data (where we see the state
being created) is not closing i.e not seeing it go through suppress and on to the next stage.
It is as though the state created earlier was purged. Is this possible ?
    >     >     >     >         > >     >
    >     >     >     >         > >     >     Thanks
    >     >     >     >         > >     >     Mohan
    >     >     >     >         > >     >
    >     >     >     >         > >     >
    >     >     >     >         > >     >
    >     >     >     >         > >
    >     >     >     >         > >
    >     >     >     >
    >     >     >     >
    >     >     >     >
    >     >     >     >
    >     >     >
    >     >     >
    >     >
    >     >
    >
    >
    

Mime
View raw message