kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Hayles <jhay...@etcc.com>
Subject sliding ktable?
Date Thu, 03 Nov 2016 19:39:15 GMT
Newbie here, I am working with Kafka Streams with java 1.8.

I want to use the ktable as a lookup table in a join to a kstream.  I had no issue implementing
this.  However, I do not want the ktable to grow without bounds, I want to limit the ktable
to the past 2 weeks data, more of a 'sliding' window ktable.  At any given time I need at
least 2 weeks data in my ktable, so I don’t think solution like tumbling table will work
since it starts over every time it hops.

A little simplified example. . .

             KStream<String, GenericRecord> txnStream = builder.stream("TXN_DATA");

             KStream<String, GenericRecord> txnStreamFull = txnStream

               .map((key, record) -> {

              return new KeyValue<>(record.get("TXN").toString(), record);




             KTable<String,Long> countTableStream =  txnStream   //  do not want this
table to grow indefinitely.

                     .map((key, record) -> {

                         return new KeyValue<>(record.get("TXN").toString(), record);



                     .countByKey(stringSerdeKey, "DupCountKTable10");

             KStream<String, GenericRecord> duplicatesStream =

                       txnStreamFull.leftJoin(countTableStream, (vTxnStream,vCountTableStream)
-> {

                              vTxnStream.put("count", Long.toString(vCountTableStream.longValue()));

                              return vTxnStream;});


I thought perhaps can schedule ktable.foreach to inspect and clean, but not sure keys can
be removed this way.

I may be missing a basic concept here.  I have spent some time searching but not finding good
answer, thanks for any tips.



  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message