kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: Timing state changes?
Date Thu, 06 Sep 2018 18:37:59 GMT
Hi Tim,

>From your spec, I think that Kafka Streams has several ways to support
either scenario. Caveat: this is off the cuff, so I might have missed
something. For your context, I'll give you some thoughts on several ways
you could do it, with various tradeoffs.

Scenario 1 (every widget reports its current (connected/disconnected) state
every 30s):

Windowed with final results:
The easiest implementation would depend on a feature we have planned for
version 2.1:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
For this, you can do a sliding 5-minute window (advance by 30s). Since all
widgets "phone in" during the window, you can just count the number of
"connected" messages and alert at the end of the window for any widget
whose count is 0. The "default" mode for stream processing is to
continuously emit updates to the windowed aggregation, so a widget that is
disconnected at the start of the window would be reported as 0 first, then
transition to 1 when it connects. To avoid false positives, you'll want to
"suppress" the intermediate results, which is not available currently but
I'm working on it for 2.1 .

As a stop-gap solution until final results are available, you can still do
sliding windows, but with a longer window size, say a 6-minute window,
advanced by 30s. Instead of a count, you can do a reduce that sets the
value to the time of the last connection. Transforming this to the time
*since* the last connection, you'll have an opportunity to send an alert if
a widget was last connected more than 5 minutes ago. This won't cause false
positives, but it might result in double-reporting. This can be a little
more compact that the count-based one, since it only needs to store alert
candidates (widgets that are currently connected don't need to be stored).
So once the "final results" feature is released, you can transition this
approach to fix the double-reporting problem by using a 5-minute window and
enabling "final result" mode.

Note that any of the windowed solutions would generate an alert for *every*
window in which the widget hasn't been connected. Assuming some window
stays disconnected, the sliding window would generate a fresh alert every
30s. Instead you could use tumbling windows, which would give you a fresh
alert every 5 minutes. This doesn't sound like what you want.

Scenario 2 (widgets only report when they change state):
As you noticed, a windowed computation won't work here, because you would
be wanting to alert on things that are absent from the window.

Instead, you can use a custom Processor with a Key/Value store and schedule
punctuations to send the alerts. For example, you can store the state and
the time of the transition to that state, and finally whether an alert has
been sent for that widget. You can schedule a punctuation to scan over the
whole store. For any record that's been disconnected for more than 5m and
*not* sent an alert, you send the alert and set the "sent" flag. Since you
only need to consider widgets whose last transition was a disconnect and
that have *not* had an alert sent, you can keep the store pretty compact by
dropping entries when you send the alert or when they transition from
"disconnected to connected". So the store doesn't need to contain any
widget whose state is currently "connected" or who is disconnected and has
already been alerted.

Just considering these two implementations, the custom processor + state
store can be much more compact, since it only needs to store alert
candidates, whereas the windowed computation needs to store the candidates
for each active window (for tumbling windows, this is still just one, but
for sliding windows, it's 5 minutes * 30 seconds = 10 windows).
Also, the custom processor lets you send the alert just once for each
widget that stays disconnected longer than 5m, so this is sounding better
to me.

I hope this helps!
-John

On Thu, Sep 6, 2018 at 11:00 AM Tim Ward <tim.ward@origamienergy.com> wrote:

> Hi, looking for some beginner's help with architectural choices.
>
> Scenario 1:
>
> A stream of messages arrives on a Kafka topic. Each has the key being a
> widget ID, and the value being an indication of whether the widget is
> connected or disconnected. For each widget a message arrives every 30
> seconds with the current connected state.
>
> Requirement: emit a message when a widget has been reported as
> disconnected for at least five minutes.
>
> Constraints: there can be large numbers of widgets, so horizontal
> scalability is required. The solution must be robust against the
> application, or instances of it, crashing, so persistence or regeneration
> of any local/internal state is required, and committing the input messages
> has to be got right so that they're reprocessed on a restart if and as
> necessary.
>
> Observation: without the constraints we'd just read each message, keep an
> in-memory list of disconnected widgets with the timestamp at which they
> went disconnected, then when another "disconnected" status comes in emit
> the output message if this widget has now been disconnected for more than
> five minutes.
>
> It looks like there's stuff in Kafka Streams that can help here, but I
> can't see quite where to start?
>
> Scenario 2:
>
> A stream of messages arrives on a Kafka topic. Each has the key being a
> widget ID, and the value being an indication of whether the widget is
> connected or disconnected. For each widget a message arrives only when the
> connected state changes.
>
> Requirement and constraints: as above
>
> Observation: without the constraints we'd just read each message, keep an
> in-memory list of widgets whose most recent message indicated a
> disconnection, and for each of these start a five minute timer, and if the
> timer went off before a "reconnected" message was received emit the output
> message.
>
> I'm less clear that there's anything out of the box in Kafka Streams that
> can do this sort of timeout of non-existent messages? - but I don't
> understand from the documentation how the various windowing features work.
>
> Note: the numbers are for the sake of illustration. I can imagine
> different solutions being appropriate depending on whether (a) the timeout
> (five minutes as quoted) is, whilst configurable, guaranteed to be the same
> for every widget, or (b) there is a requirement to configure the timeout
> separately for each widget.
>
> (Solutions involving updating a database table and then polling it on a
> cron job are really the sort of thing we're trying to get away from here.)
>
> Tim Ward
>
> The contents of this email and any attachment are confidential to the
> intended recipient(s). If you are not an intended recipient: (i) do not
> use, disclose, distribute, copy or publish this email or its contents; (ii)
> please contact the sender immediately; and (iii) delete this email. Our
> privacy policy is available here:
> https://origamienergy.com/privacy-policy/. Origami Energy Limited
> (company number 8619644); Origami Storage Limited (company number 10436515)
> and OSSPV001 Limited (company number 10933403), each registered in England
> and each with a registered office at: Ashcombe Court, Woolsack Way,
> Godalming, GU7 1LQ.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message