kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: [DISCUSS] KIP-328: Ability to suppress updates for KTables
Date Wed, 27 Jun 2018 22:10:19 GMT
Hello John, thanks for putting up the KIP. I have a meta comment:

We need to clarify the difference between late event suppression semantics
and the window retention semantics that result into a windowed KTable. More
specifically, say you have a window of size 10 min, and `until` 20 min, and
the resulted window KTable is suppressed for events later than 5 min. My
understanding is that:

a) for a specific window starting at t0 and ending at t10, if an upstream
record is received with timestamp falling into [t0, t10) but the current
stream time has been larger than t20, this record will be dropped on the
floor and not be used to update the window KTable. Also we record it in the
"skipped-record" metric.
b) as above, if an upstream record is received with timestamp falling into
[t0, t10), and the current stream time is larger than t15 but smaller than
t20, this record will still be used to update the windowed table, but the
updated result will not be sent downstream, and it is recorded in the
"late-event-suppression" metric.
c) as a result, if we query the windowed KTable's result, we will see
updates up to stream time t20, but from its resulted changelog stream, we
will only see results up to stream time t15.

If that is correct, I felt it is really complicated for users to
comprehend: why should I want to have my windowed aggregations to take late
records up to 20 minutes, while not seeing their resulted updates in the
changelog stream? And the semantic difference between "skipped record due
to window retention" and "late-event-suppression" is quite obscure (btw I
am not sure it is true that "Skipped records are records that are for one
reason or another invalid.", since "skipped record due to window retention
time" is not really due to an invalid record, but some window store
implementation details, right?)

Thinking about this further, although I understand the intention to propose
an unified API for all three motivation requests, I feel the "Final value
of a window" request may better be handled in a more restricted interface.


So just throwing out a bold / controversial idea to this proposal: instead
of using a unified suppress() for all three motivation scenarios, we have:

1) KTable#suppress() for "Request shaping" and "Easier config", and it will
only for intermediate-event-suppression, and in this case, for both
windowed and non-windowed KTable, the suppression semantics can be
dependent on each key's record timestamp plus the byte buffer size limit /
buffer strategy.

2) In TimeWindowedStream / SessionWindowedStream#aggregate() that result in
a windowed KTable (note that although KGroupedTable#aggregate can also
result in a windowed KTable, its window semantics is not very well defined
I'd suggest we defer its discussion later), we add another config object,
e.g.:

TimeWindowedStream#aggregate(final Initializer<VR> initializer,
                                                       final Aggregator<?
super K, ? super V, VR> aggregator,
                                                       final
Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized,
                                                       final Emitted
emitted);

public class Emitted {

    static Emitted onlyOnceAfterWindowClosed(final long
late-period-allowed);

    static Emitted wheneverWindowUpdated();  // this may still be subject
to caching effects, so not exactly every update..

}

------------

The Emitted config option is going to be much less expressive than
`Suppressed`, intentionally, to only cover the "Final value of a window"
case. Note that the resulted window KTable can still be suppressed
programmatically, but if it is already been emitted only once, then the
suppress function will take no effect.

In this case, the difference of "late-period-allowed" v.s.
"Windows.until()" is that, the former determines if or not a record will be
applied to update the window or not, and it is controlled in the
WindowedStreamAggregateProcessor, and whenever an event gets dropped
because of it we record it in a new, say "too-late-records" metric (same to
"late-event-suppression" actually, just using a different name, while the
latter only controls how long at least each window will be retained for
queries and should normally be larger than (window size + late
period-allowed). From implementation's pov, if the retention time of a
window is less than (window size + late-period-allowed), the Processor may
not be able to find any matching window when first trying to get it from
store, and it then need to tell if it is because the key is never been
updated for this window or because the window retention has elapsed, hence
it needs to be aware of the window retention time. And in the latter case,
it will drop it on the floor and also record it in "too-late-records"
metrics. And also this emit policy would not need any buffering, since the
original store's cache contains the record context already need for
flushing downstream.

My primary motivation is that, from user's perspective, this may be easier
to comprehensive and reason from the metrics. But if people think it
actually does not make things better, I'm happy to rethink the current
proposal.




Guozhang


On Wed, Jun 27, 2018 at 12:04 PM, John Roesler <john@confluent.io> wrote:

> Thanks for the feedback, Matthias,
>
> It seems like in straightforward relational processing cases, it would not
> make sense to bound the lateness of KTables. In general, it seems better to
> have "guard rails" in place that make it easier to write sensible programs
> than insensible ones.
>
> But I'm still going to argue in favor of keeping it for all KTables ;)
>
> 1. I believe it is simpler to understand the operator if it has one uniform
> definition, regardless of context. It's well defined and intuitive what
> will happen when you use late-event suppression on a KTable, so I think
> nothing surprising or dangerous will happen in that case. From my
> perspective, having two sets of allowed operations is actually an increase
> in cognitive complexity.
>
> 2. To me, it's not crazy to use the operator this way. For example, in lieu
> of full-featured timestamp semantics, I can implement MVCC behavior when
> building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect that
> there are other, non-obvious applications of suppressing late events on
> KTables.
>
> 3. Not to get too much into implementation details in a KIP discussion, but
> if we did want to make late-event suppression available only on windowed
> KTables, we have two enforcement options:
>   a. check when we build the topology - this would be simple to implement,
> but would be a runtime check. Hopefully, people write tests for their
> topology before deploying them, so the feedback loop isn't instantaneous,
> but it's not too long either.
>   b. add a new WindowedKTable type - this would be a compile time check,
> but would also be substantial increase of both interface and code
> complexity.
>
> We should definitely strive to have guard rails protecting against
> surprising or dangerous behavior. Protecting against programs that we don't
> currently predict is a lesser benefit, and I think we can put up guard
> rails on a case-by-case basis for that. It seems like the increase in
> cognitive (and potentially code and interface) complexity makes me think we
> should skip this case.
>
> What do you think?
>
> Thanks,
> -John
>
> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Thanks for the KIP John.
> >
> > One initial comments about the last example "Bounded lateness": For a
> > non-windowed KTable bounding the lateness does not really make sense,
> > does it?
> >
> > Thus, I am wondering if we should allow `suppressLateEvents()` for this
> > case? It seems to be better to only allow it for windowed-KTables.
> >
> >
> > -Matthias
> >
> >
> > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > I noticed this (lack of primary parameter) as well.
> > >
> > > What you gave as new example is semantically the same as what I
> > suggested.
> > > So it is good by me.
> > >
> > > Thanks
> > >
> > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <john@confluent.io>
> wrote:
> > >
> > >> Thanks for taking look, Ted,
> > >>
> > >> I agree this is a departure from the conventions of Streams DSL.
> > >>
> > >> Most of our config objects have one or two "required" parameters,
> which
> > fit
> > >> naturally with the static factory method approach. TimeWindow, for
> > example,
> > >> requires a size parameter, so we can naturally say
> TimeWindows.of(size).
> > >>
> > >> I think in the case of a suppression, there's really no "core"
> > parameter,
> > >> and "Suppression.of()" seems sillier than "new Suppression()". I think
> > that
> > >> Suppression.of(duration) would be ambiguous, since there are many
> > durations
> > >> that we can configure.
> > >>
> > >> However, thinking about it again, I suppose that I can give each
> > >> configuration method a static version, which would let you replace
> "new
> > >> Suppression()." with "Suppression." in all the examples. Basically,
> > instead
> > >> of "of()", we'd support any of the methods I listed.
> > >>
> > >> For example:
> > >>
> > >> windowCounts
> > >>     .suppress(
> > >>         Suppression
> > >>             .suppressLateEvents(Duration.ofMinutes(10))
> > >>             .suppressIntermediateEvents(
> > >>
> >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > >>             )
> > >>     );
> > >>
> > >>
> > >> Does that seem better?
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >>
> > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yuzhihong@gmail.com> wrote:
> > >>
> > >>> I started to read this KIP which contains a lot of materials.
> > >>>
> > >>> One suggestion:
> > >>>
> > >>>     .suppress(
> > >>>         new Suppression()
> > >>>
> > >>>
> > >>> Do you think it would be more consistent with the rest of Streams
> data
> > >>> structures by supporting `of` ?
> > >>>
> > >>> Suppression.of(Duration.ofMinutes(10))
> > >>>
> > >>>
> > >>> Cheers
> > >>>
> > >>>
> > >>>
> > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <john@confluent.io>
> > wrote:
> > >>>
> > >>>> Hello devs and users,
> > >>>>
> > >>>> Please take some time to consider this proposal for Kafka Streams:
> > >>>>
> > >>>> KIP-328: Ability to suppress updates for KTables
> > >>>>
> > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > >>>>
> > >>>> The basic idea is to provide:
> > >>>> * more usable control over update rate (vs the current state store
> > >>> caches)
> > >>>> * the final-result-for-windowed-computations feature which several
> > >> people
> > >>>> have requested
> > >>>>
> > >>>> I look forward to your feedback!
> > >>>>
> > >>>> Thanks,
> > >>>> -John
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message