kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: sliding ktable?
Date Thu, 03 Nov 2016 21:29:06 GMT
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi John,

first of all, a KTable is a (changelog) stream; thus, by definition it
is infinite.

However, I assume you are worried about the internal materialized
view, of the changelog stream (ie, a table state). This view only
contains the latest value for each key, ie, a single entry for each
key. Thus, it's size is bound by the number of key and does not change
as long as you number of distinct keys does not change.

> At any given time I need at least 2 weeks data in my ktable

There is no such think as "data of the last 2 weeks":

Using a KTable for a KStream-KTable join to do lookups, each lookup
will be done on the current state if the KTable and thus only return a
single value for each key. There is no old data in the materialized
view with this regard. Of course, if a key does not get any update for
a long time, you can consider the corresponding value as old, but it
is still the latest (ie, current) value for the key.

> ktable.foreach

#foreach() is applied to the changelog stream and not the internally
materialized view. Thus, it does not scan over the key space or is
applied to each currently stored key in the view. It is rather called
for each update record that is in the changelog stream.

> not sure keys can be removed this way

The only way to delete a key-value entry in the materialized view is
to send a so-called tombstone record with format <key:null> (ie, value
is null). By "send" I mean that this tombstone record must be in the
input of the KTable.



- -Matthias


On 11/3/16 12:39 PM, John Hayles wrote:
> Newbie here, I am working with Kafka Streams with java 1.8.
> 
> 
> 
> I want to use the ktable as a lookup table in a join to a kstream.
> I had no issue implementing this.  However, I do not want the
> ktable to grow without bounds, I want to limit the ktable to the
> past 2 weeks data, more of a 'sliding' window ktable.  At any given
> time I need at least 2 weeks data in my ktable, so I don’t think
> solution like tumbling table will work since it starts over every
> time it hops.
> 
> 
> 
> A little simplified example. . .
> 
> 
> 
> 
> 
> KStream<String, GenericRecord> txnStream =
> builder.stream("TXN_DATA");
> 
> 
> 
> KStream<String, GenericRecord> txnStreamFull = txnStream
> 
> .map((key, record) -> {
> 
> return new KeyValue<>(record.get("TXN").toString(), record);
> 
> })
> 
> .through("RekeyedIntermediateTopic1")
> 
> ;
> 
> 
> 
> KTable<String,Long> countTableStream =  txnStream   //  do not want
> this table to grow indefinitely.
> 
> .map((key, record) -> {
> 
> return new KeyValue<>(record.get("TXN").toString(), record);
> 
> })
> 
> .through("RekeyedIntermediateTopic2")
> 
> .countByKey(stringSerdeKey, "DupCountKTable10");
> 
> 
> 
> KStream<String, GenericRecord> duplicatesStream =
> 
> txnStreamFull.leftJoin(countTableStream,
> (vTxnStream,vCountTableStream) -> {
> 
> vTxnStream.put("count",
> Long.toString(vCountTableStream.longValue()));
> 
> return vTxnStream;});
> 
> 
> 
> duplicatesStream.to("DUP_TXNS");
> 
> 
> 
> I thought perhaps can schedule ktable.foreach to inspect and clean,
> but not sure keys can be removed this way.
> 
> 
> 
> I may be missing a basic concept here.  I have spent some time
> searching but not finding good answer, thanks for any tips.
> 
> 
> 
> Thanks,
> 
> John
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYG6wiAAoJECnhiMLycopPXr4QAIPS8zpq06TRwkUyJfCqHzvg
xkuaVBDGL5NOG+HvzHi4CHkTYTXV/XPbmDEasEhXjakldZmlnnCv040Zfe63y9g4
tPOUjq1mvoIr7K7gJorZICZFMHwfRfRp1ocyFBW6xjVfryldR/+J7XMEAY+WZmMv
cVaa2Mu1ZexyYdBueQ8rR+wdZ4gA5P7rsQmP2nIRTB1TH872ebMUxGWCNF4ORmkS
rrjYZ56KSCXKfpgPNaVD5OLXpW4GpJwFkc3fUX+qBx4vmxQKm8cN1MrgZh6xnZCg
AR/7gaZsLR2IgEHh8VbMjOVazY1pLqGFLiwEi3gYXWeQ20zDHKa7QCH8O/KpJqQW
U3Q/CV8oqSwRLJkQmWnkVUrwUL05hnjSnz7TDYy2yr+jBE0Lp0pA4PyddqsWVQk2
bCGSrtdQoSaXic1C2v/1ODpDSG+aKk2fz9ZjhcAO9jOUGywKkZN34iBeooOobGC7
8gkolNQOVxJE/MKDRbH/0XpyehYfrIxHUuzIw200Ha9XPgIKyqHLxo4n+BxYEFQx
G6cEMf7r4GHh5/7zNteBmMZNcDwkyWYZeOdhdlglti44NUaa7TpmdLp+6NTZYp+k
UmjO5ZbmGBrmkBBz+2Ma39RR3jDYt8Z+kRlrHBAxWRhgLPwH0ManH3skzPIVY54u
v/kZaji6rOUxUFDsUIYw
=pWF2
-----END PGP SIGNATURE-----

Mime
View raw message