kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: sliding ktable?
Date Tue, 08 Nov 2016 20:04:21 GMT
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Yes and no.

Kafka allows you to set a retention time for compacted topics, too.
Thus, if a key does not get an update for this retention time, it will
be deleted, too.

See here for details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+c
ompaction+and+deletion+to+co-exist

- -Matthias


On 11/7/16 11:44 PM, R Krishna wrote:
> There is a problem with tombstoning old entries based on a new
> entry, that, the keys which have no new entries will remain there
> forever.
> 
> On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax
> <matthias@confluent.io> wrote:
> 
> John,
> 
> your thinking is on the right track!
> 
> About infinitely growing KTable: It seems you are extending each
> lane with a list of all txnId -- so your view needs infinite memory
> as you expend your values... A quick fix might be, to delete older
> txnID for this list, each time you update the list (as you
> mentioned you only need data for the last two weeks -- you might
> need to add a timestamp for each txnID in the list to do the
> pruning each time you append or lookup the list).
> 
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an item is 'popped off' I would like to do an aggregate
>>>> subtraction for it's value.  But I don't think this is how
>>>> kafka works.  Is this possible?  Any other
>>>> feedback/suggestion?    Perhaps a better approach?
> 
> There is no Kafka support for this. You would need to go with the 
> suggest as describe above. The only "delete" mechanism Kafka offers
> is for compacted topics via tombstone message (ie, message with 
> <key:null> format; value == null). However, tombstones do delete
> the whole record with this key, thus I doubt they are useful for
> your case.
> 
> However, reading through your email, I am wondering why you do
> need the all old txnIds. You mentioned that you want to get the
> previous txnId for each duplicate (and you example results verifies
> this). Thus, it would be sufficient to only store the latest tnxId
> for each "lane" IMHO. Furhtermore, for this deduplication it seems
> sufficient to only use a KTable without a join.
> 
> The ideas would the as follows: You consumer you streams as a 
> changelog (ie, KTable). For each record, you check if there is an 
> entry in the view. If not, just put the record itself as result 
> because there is no duplicate. If you do find an entry, the
> current record is a duplicate of the record found. The record
> found, does contain it's txnId, you so can use this as "previous
> txnId". As result, you store the current record. You data format
> would be like <lane:(txnId,txnDate)> (for input) and 
> <lane:(txnId,txnDate,previoudTxnId)> (for output.
> 
> You stream and view would be like:
> 
> {'c',('03','11/07/2016')} plus state: EMPTY
> 
> => {'c',('03','11/07/2016',''}    // this is output and state
> update at the same time
> 
> 
> 
> {'c',('09','11/07/2016')} plus state:
> {'c',('03','11/07/2016',null}
> 
> => {'c',('09','11/07/2016','03')}     // this is output and state 
> update at the same time
> 
> 
> 
> {'c',('11','11/08/2016')} plus state:
> {'c',('09','11/07/2016','03')}
> 
> => {'c',('11','11/08/2016','09')}     // this is output and state 
> update at the same time
> 
> 
> -Matthias
> 
> On 11/7/16 8:22 AM, John Hayles wrote:
>>>> Thanks for the reply.  I really appreciate the insight.
>>>> Again newbie here.  I want to expand on what I am struggling
>>>> with.  It may be that I just need to get my mind thinking
>>>> more in a streaming mode.  Please let me know you thoughts.
>>>> Just having problem ‘getting it’ on my own.
>>>> 
>>>> 
>>>> 
>>>> Below is a simple topic I want to identify where the 'lane' 
>>>> duplicates, and when it does get the 'txnId' of the
>>>> duplicate record.  The txnId is distinct and will never be
>>>> duplicate.  The lane will seldom have a duplicate.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Topic payload {txnId,lane,txnDate}  Notice lane 'c' is
>>>> dulplicated 3 times.
>>>> 
>>>> 
>>>> 
>>>> {'01','wfasd','11/07/2016'}
>>>> 
>>>> {'02','bas','11/07/2016'}
>>>> 
>>>> {'03','c','11/07/2016'}
>>>> 
>>>> {'04','xxwq','11/07/2016'}
>>>> 
>>>> {'05','dasf','11/07/2016'}
>>>> 
>>>> {'06','drdd','11/07/2016'}
>>>> 
>>>> {'07','tasd','11/07/2016'}
>>>> 
>>>> {'08','ywq','11/07/2016'}
>>>> 
>>>> {'09','c','11/07/2016'}
>>>> 
>>>> {'10','jda','11/07/2016'}
>>>> 
>>>> {'11','c','11/08/2016'}
>>>> 
>>>> {'12','ozs','11/09/2016'}
>>>> 
>>>> . . .
>>>> 
>>>> Note txnId and lane keep getting more distinct values.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> My thought is to join the data to itself,  one as kstream the
>>>> other as ktable for lookups.
>>>> 
>>>> 
>>>> 
>>>> kstream as
>>>> 
>>>> 
>>>> 
>>>> {lane:(txnId,txnDate)}
>>>> 
>>>> 
>>>> 
>>>> so I visualize like ...
>>>> 
>>>> 
>>>> 
>>>> ('wfasd':('01','11/07/2016')),
>>>> 
>>>> ('bas'  :('02','11/07/2016')),
>>>> 
>>>> ('c'    :('03','11/07/2016')), ...
>>>> 
>>>> 
>>>> 
>>>> The ktable (lookup table) is an aggregate view I built to
>>>> hold historic data by lane:
>>>> 
>>>> 
>>>> 
>>>> (lane:{(txnId1,txnDate1),
>>>> 
>>>> (txnId2,txnDate2),
>>>> 
>>>> . . .})
>>>> 
>>>> 
>>>> 
>>>> I visualize the materialized view as below.
>>>> 
>>>> 'c' being the important key/value for this example...
>>>> 
>>>> Also note this materialized view will keep growing without
>>>> bound.
>>>> 
>>>> There will always be new keys and txnIds.
>>>> 
>>>> 
>>>> 
>>>> ('wfasd':{('01','11/07/2016')}),
>>>> 
>>>> ('bas'  :{('02','11/07/2016')}),
>>>> 
>>>> ('c'    :{('03','11/07/2016'),
>>>> 
>>>> ('09','11/07/2016'),
>>>> 
>>>> ('11','11/09/2016')})
>>>> 
>>>> . . .
>>>> 
>>>> 
>>>> 
>>>> Now I can join a kstream to ktable on lane, and duplicates
>>>> are easy to identify.  I can traverse list from value found
>>>> in materialized view to get previous txnId I need.
>>>> 
>>>> 
>>>> 
>>>> So I can build resulting stream / topic like…
>>>> 
>>>> 
>>>> 
>>>> {txnId,lane,txnDate,duplicateTxnId}
>>>> 
>>>> 
>>>> 
>>>> note where c duplicates there is a duplicate txnId...
>>>> 
>>>> 
>>>> 
>>>> {'01','wfasd','11/07/2016',''}
>>>> 
>>>> {'02','bas','11/07/2016',''}
>>>> 
>>>> {'03','c','11/07/2016',''}
>>>> 
>>>> {'04','xxwq','11/07/2016',''}
>>>> 
>>>> {'05','dasf','11/07/2016',''}
>>>> 
>>>> {'06','drdd','11/07/2016',''}
>>>> 
>>>> {'07','tasd','11/07/2016',''}
>>>> 
>>>> {'08','ywq','11/07/2016',''}
>>>> 
>>>> {'09','c','11/07/2016','03'}
>>>> 
>>>> {'10','jda','11/07/2016',''}
>>>> 
>>>> {'11','c','11/08/2016','09'}
>>>> 
>>>> {'12','ozs','11/09/2016',''}
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> The issue is the materialized view of the ktable keeps
>>>> growing without bound, however by business rule I only need
>>>> past 2 weeks, so I think over time there is performance
>>>> impact that is not needed regarding the materialized view,
>>>> one, the size of materialized view keeps growing, and two,
>>>> traversing ever increasing larger value lists.
>>>> 
>>>> 
>>>> 
>>>> Ideally if the topic is set to two weeks retention, then once
>>>> an item is 'popped off' I would like to do an aggregate
>>>> subtraction for it's value.  But I don't think this is how
>>>> kafka works.  Is this possible?  Any other
>>>> feedback/suggestion?    Perhaps a better approach?
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> John
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -----Original Message-----
>>>> 
>>>> From: Matthias J. Sax [mailto:matthias@confluent.io]
>>>> 
>>>> Sent: Thursday, November 03, 2016 4:29 PM
>>>> 
>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
>>>> 
>>>> Subject: Re: sliding ktable?
>>>> 
>>>> 
>>>> 
>>>> Hash: SHA512
>>>> 
>>>> 
>>>> 
>>>> Hi John,
>>>> 
>>>> 
>>>> 
>>>> first of all, a KTable is a (changelog) stream; thus, by
>>>> definition it is infinite.
>>>> 
>>>> 
>>>> 
>>>> However, I assume you are worried about the internal
>>>> materialized view, of the changelog stream (ie, a table
>>>> state). This view only contains the latest value for each
>>>> key, ie, a single entry for each key. Thus, it's size is
>>>> bound by the number of key and does not change as long as you
>>>> number of distinct keys does not change.
>>>> 
>>>> 
>>>> 
>>>>> At any given time I need at least 2 weeks data in my
>>>>> ktable
>>>> 
>>>> 
>>>> 
>>>> There is no such think as "data of the last 2 weeks":
>>>> 
>>>> 
>>>> 
>>>> Using a KTable for a KStream-KTable join to do lookups, each
>>>> lookup will be done on the current state if the KTable and
>>>> thus only return a single value for each key. There is no old
>>>> data in the materialized view with this regard. Of course, if
>>>> a key does not get any update for a long time, you can
>>>> consider the corresponding value as old, but it is still the
>>>> latest (ie, current) value for the key.
>>>> 
>>>> 
>>>> 
>>>>> ktable.foreach
>>>> 
>>>> 
>>>> 
>>>> #foreach() is applied to the changelog stream and not the 
>>>> internally materialized view. Thus, it does not scan over the
>>>> key space or is applied to each currently stored key in the
>>>> view. It is rather called for each update record that is in
>>>> the changelog stream.
>>>> 
>>>> 
>>>> 
>>>>> not sure keys can be removed this way
>>>> 
>>>> 
>>>> 
>>>> The only way to delete a key-value entry in the materialized
>>>> view is to send a so-called tombstone record with format
>>>> <key:null> (ie, value is null). By "send" I mean that this
>>>> tombstone record must be in the input of the KTable.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On 11/3/16 12:39 PM, John Hayles wrote:
>>>> 
>>>>> Newbie here, I am working with Kafka Streams with java
>>>>> 1.8.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> I want to use the ktable as a lookup table in a join to a 
>>>>> kstream.
>>>> 
>>>>> I had no issue implementing this.  However, I do not want
>>>>> the ktable
>>>> 
>>>>> to grow without bounds, I want to limit the ktable to the
>>>>> past 2 weeks
>>>> 
>>>>> data, more of a 'sliding' window ktable.  At any given time
>>>>> I need at
>>>> 
>>>>> least 2 weeks data in my ktable, so I don’t think solution
>>>>> like
>>>> 
>>>>> tumbling table will work since it starts over every time
>>>>> it hops.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> A little simplified example. . .
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> KStream<String, GenericRecord> txnStream = 
>>>>> builder.stream("TXN_DATA");
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> KStream<String, GenericRecord> txnStreamFull = txnStream
>>>> 
>>>> 
>>>> 
>>>>> .map((key, record) -> {
>>>> 
>>>> 
>>>> 
>>>>> return new KeyValue<>(record.get("TXN").toString(),
>>>>> record);
>>>> 
>>>> 
>>>> 
>>>>> })
>>>> 
>>>> 
>>>> 
>>>>> .through("RekeyedIntermediateTopic1")
>>>> 
>>>> 
>>>> 
>>>>> ;
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> KTable<String,Long> countTableStream =  txnStream   //  do
>>>>> not want
>>>> 
>>>>> this table to grow indefinitely.
>>>> 
>>>> 
>>>> 
>>>>> .map((key, record) -> {
>>>> 
>>>> 
>>>> 
>>>>> return new KeyValue<>(record.get("TXN").toString(),
>>>>> record);
>>>> 
>>>> 
>>>> 
>>>>> })
>>>> 
>>>> 
>>>> 
>>>>> .through("RekeyedIntermediateTopic2")
>>>> 
>>>> 
>>>> 
>>>>> .countByKey(stringSerdeKey, "DupCountKTable10");
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> KStream<String, GenericRecord> duplicatesStream =
>>>> 
>>>> 
>>>> 
>>>>> txnStreamFull.leftJoin(countTableStream,
>>>> 
>>>>> (vTxnStream,vCountTableStream) -> {
>>>> 
>>>> 
>>>> 
>>>>> vTxnStream.put("count",
>>>> 
>>>>> Long.toString(vCountTableStream.longValue()));
>>>> 
>>>> 
>>>> 
>>>>> return vTxnStream;});
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> duplicatesStream.to("DUP_TXNS");
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> I thought perhaps can schedule ktable.foreach to inspect
>>>>> and clean,
>>>> 
>>>>> but not sure keys can be removed this way.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> I may be missing a basic concept here.  I have spent some
>>>>> time
>>>> 
>>>>> searching but not finding good answer, thanks for any
>>>>> tips.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>>> Thanks,
>>>> 
>>>> 
>>>> 
>>>>> John
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>> 
> 
> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYIi/FAAoJECnhiMLycopPlRsP/jZbuLWPZmQbODIgPihVxT/0
9iJcb1U1tj/doCAoEvCdbWp/NRYfreElpsqEeWzVIRy83IDKGmkgATnfC9c179P1
KC6YqK/sDVttbbZYy3r99YfKTS+ZfUOg8bl+i9o5asakCSsfCLkrGVSQOhytDyGv
UVyz5J/tArW/KP6WsGRaBI+DIx6ImN+uL+VPfxsOyAmV+RHD9bcuOrsHISjgYyiI
IIva0qg+vjhzGN5h/gxId8UgdIyJmUNY+IfQypEUd6GqoLXefoVzWEc/4yJjdEYv
oZXh8QXjwufsQEDiEjC6oJWB+QGGv2muZCHvXELXuM2nJzmNODCL1Y4o0krasGoE
ZG3FwIfGHTNcXpj3xE67Q3JLf0d7ropUMI5X/ZNsLNNy8XMM8DUTk4WzlstXnH2n
MiuDmUkCfgfPpYsJzo3fAFJiT3IhU9S6Q4INiNwNTCE8zaJ/WygMDg1dkC7vF6QI
yvQthmiM3+6okeT+9Kb+xkmcuaDl9HY0PkLNwe3tUtwmttr9VC2e6zeIoZymA4Nt
sn8N0FoC6GzolAUfYhxEoFPY6JHyQKLXAsVxpD0Cf3Kyg7UhsSmlM74P8Fz9Mejr
otN44p32SpLYloWJHcREAxQUSnp71wus9W0zCA1tg1Vylqufg4ziwdkw9NxBMG/j
4bPeYCcfGaD9IGhkL6fa
=wA01
-----END PGP SIGNATURE-----

Mime
View raw message