kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <...@confluent.io>
Subject Re: Kafka Streams Usage Patterns
Date Sat, 27 May 2017 20:16:13 GMT
This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <
michal.borowiecki@openbet.com> wrote:

> Hi all,
>
> I've updated the wiki page with a draft pattern for consecutively growing
> time-windowed aggregations which was discussed some time ago on this
> mailing list.
>
> I'm yet to add the part that cleans up the stores using punctuations. Stay
> tuned.
>
>
> On a somewhat similar subject, I've been working to implement the
> following requirements:
>
> * transaction sums per customer session (simple, just extract non-expired
> session-windowed aggregates from a SessionStore using interactive queries)
>
> * global transaction sums for all *currently active* customer sessions
>
> The second bit proved non-trivial, because session-windowed KTables (or
> any windowed KTables for that matter) don't notify downstream when a window
> expires. And I can't use punctuate until KIP-138 is implemented because
> stream time punctuation is no good in this case (records can stop coming),
> reliable system time punctuation would be needed.
>
> Below is how I implemented this, I'm yet to test it thoroughly.
>
> I wonder if anyone knows of an easier way of achieving the same.
>
> If so, I'm looking forward to suggestions. If not, I'll add that to the
> patterns wiki page too, in case someone else finds it useful.
>
>
> builder
>   .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
>
>   .groupByKey(/*key serde*/, /*transaction serde*/)
>
>   .aggregate(
>     () -> /*empty aggregate*/,
>     aggregator(),
>     merger(),
>     SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
>     /* aggregate serde */,
>     txPerCustomerSumStore() // this store can be queried for per customer session data
 )
>
>   .toStream()
>
>   .filter(((key, value) -> value != null)) // tombstones only come when a session
is merged into a bigger session, so ignore them
>    // the below map/groupByKey/reduce operations are to only propagate updates to the
*latest* session per customer to downstream
>
>   .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed key
into the value                                                          // so that we can
group by customerId only and reduce to the later value    new KeyValue<>(
>       windowedCustomerId.key(), // just customerId      new WindowedAggsImpl( // this
is just like a tuple2 but with nicely named accessors: timestamp() and aggs()
>         windowedCustomerId.window().end(),
>         agg
>       )
>     )
>   )
>   .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just customerId  .reduce(
// take later session value and ignore any older - downstream only cares about *current* sessions
   (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
>     TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
>     "latest-session-windowed"  )
>
>   .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with maximum granularity,
which is per-partition    new KeyValue<>(
>       new Windowed<>(
>         windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS,  // KIP-159
would come in handy here, to access partition number instead
>         windowedCustomerId.window() // will use this in the interactive queries to pick
the oldest not-yet-expired window
>       ),
>       timeAndAggs.aggs()
>     ),
>     new SessionKeySerde<>(Serdes.Integer()),    /* aggregate serde */
>   )
>
>   .reduce(
>     (val, agg) -> agg.add(val),
>     (val, agg) -> agg.subtract(val),
>     txTotalsStore() // this store can be queried to get totals per partition for all
active sessions  );
>
> builder.globalTable(
>   new SessionKeySerde<>(Serdes.Integer()),
>   /* aggregate serde */,
>   changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table puts per
partition totals on every node, so that they can be easily summed for global totals, picking
the oldest not-yet-expired window
>
> TODO: put in StreamParitioners (with KTable.through variants added in
> KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
> The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
> first do summation with max parallelism and minimize the work needed
> downstream. So I calculate a per-partition sum first to limit the updates
> that the totals topic will receive and the summing work done by the
> interactive queries on the global store. Is this a good way of going about
> it?
>
> Thanks,
>
> MichaƂ
>
> On 09/05/17 18:31, Matthias J. Sax wrote:
>
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowiecki@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmaster@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message