kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams: finding a solution to a particular use case
Date Wed, 20 Apr 2016 18:43:27 GMT
Henry,

Yes for joining windows the key is actually a combo of {join window, key,
sequenceID} and hence all records are unique, we do not need log compaction
for its changelogs.

Guozhang


On Tue, Apr 19, 2016 at 11:28 PM, Henry Cai <hcai@pinterest.com.invalid>
wrote:

> In my case, the key space is unbounded.  The key would be something like
> 'ad_id', this id is auto incrementing all the time.  I understand the
> benefit of using compacted kafka topic for aggregation store, but I don't
> see much benefit of using compaction to replicate records in JoinWindow
> (there are not many duplicates in that window).  Can we specify not to use
> compaction for some state store replication?
>
> The window expiration policy on pure event time sounds risky, one
> out-of-order record will drop still active windows.  We probably need a
> policy to depend on both stream time and event time.
>
> I can fire JIRAs for these two.  For the issue of controlling compaction
> time, I am not sure how to word the details, I will leave this up to you.
>
>
> On Tue, Apr 19, 2016 at 6:19 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hi Henry,
> >
> > 1) Yes, if your key space is unlimited. But in practice, for KTable
> streams
> > where the record key (i.e. the primary key of the "table") is usually a
> > client-id, service-id, etc, the key space is usually bounded, for example
> > by the population of the globe, where in this case it should still be OK
> to
> > host with parallel Kafka Streams instances :)
> >
> > 2) It is currently based on the record event time. More specifically,
> > currently say you have a new Window instance created at T0 with
> maintenance
> > interval 10, then the first time we received a record with timestamp T10,
> > we will drop the window. I think this semantics can be improved to
> "stream
> > time", which is less vulnerable to early out-of-ordering records.
> >
> >
> > Do you want to create JIRAs for those issues I mentioned in the previous
> > emails to keep track?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Apr 19, 2016 at 2:29 PM, Henry Cai <hcai@pinterest.com.invalid>
> > wrote:
> >
> > > I have another follow-up question on the compacted kafka topic for
> > RocksDB
> > > replication.
> > >
> > > 1. From Kafka compaction implementation, looks like all keys from the
> > past
> > > for that topic will be preserved, (the compaction/cleaner will only
> > delete
> > > the records which has same-key occurrences later in the queue).  If
> > that's
> > > the case, will we run out of disk space on kafka broker side for those
> > > compacted topics if we keep the stream application runs too long?
> > >
> > > 2. For the various windows stored in RocksDB, when we do trigger the
> > > removal/expiration of those window and keys from RocksDB?
> > >
> > >
> > > On Tue, Apr 19, 2016 at 12:27 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > >
> > > > 1) It sounds your should be using KTable.outerjoin(KTable) with your
> > > case,
> > > > but keep in mind that currently we are still working on exactly-once
> > > > semantics, and hence currently the results may be ordering dependent.
> > > >
> > > > We do not support windowing in KTable since itself is an
> ever-updating
> > > > changlog already, and hence its join result would also be a ever
> > updating
> > > > changelog stream as KTable. Reading data from KTable where values
> with
> > > the
> > > > same key may not yet been compacted as fine, as long as the operation
> > > > itself is preserving :
> > > >
> > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> > {key:
> > > b,
> > > > value: 4}
> > > >
> > > > Here the resulted key values may be different, but the same key input
> > > will
> > > > generate the same key output. Then they are still changelog records
> for
> > > the
> > > > same key. All built-in KTable operators preserve this property. On
> the
> > > > other hand, if:
> > > >
> > > > F( {key: a, value: 1}, {key: a, value: 2} ) => {key: b, value: 3},
> > {key:
> > > c,
> > > > value: 4}
> > > >
> > > > The it is not key-preserving, and then you may encounter some
> > unexpected
> > > > behavior.
> > > >
> > > >
> > > > 2) log compaction is a Kafka broker feature that Kafka Streams
> leverage
> > > on:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
> > > >
> > > > It is done on disk files that are not active (i.e. no longer takes
> > > > appends).
> > > >
> > > > We are working on exposing the configs for log compactions such as
> > > > compaction intervals and thresholds in Kafka Streams so that users
> can
> > > > control its behavior. Actually, Henry do you mind creating a JIRA for
> > > this
> > > > purpose and list what you would like to control log compaction?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 19, 2016 at 10:02 AM, Henry Cai
> <hcai@pinterest.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Related to the log compaction question: " it will be log
> > > > > compacted on the key over time", how do we control the time for log
> > > > > compaction?  For the log compaction implementation, is the storage
> > used
> > > > to
> > > > > map a new value for a given key stored in memory or on disk?
> > > > >
> > > > > On Tue, Apr 19, 2016 at 8:58 AM, Guillermo Lammers Corral <
> > > > > guillermo.lammers.corral@tecsisa.com> wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > Thanks again for your reply :)
> > > > > >
> > > > > > 1) In my example when I send a record from outer table and there
> is
> > > no
> > > > > > matching record from inner table I receive data to the output
> topic
> > > and
> > > > > > vice versa. I am trying it with the topics empties at the first
> > > > > execution.
> > > > > > How is possible?
> > > > > >
> > > > > > Why KTable joins does not support windowing strategies? I think
> > that
> > > > for
> > > > > > this use cases I need it, what do you think?
> > > > > >
> > > > > > 2) What does it means? Although the log may not be yet compacted,
> > > there
> > > > > > should be no problem to read from them and execute a new stream
> > > > process,
> > > > > > right? (like a new joins, counts...).
> > > > > >
> > > > > > Thanks!!
> > > > > >
> > > > > > 2016-04-15 17:37 GMT+02:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > > >
> > > > > > > 1) There are three types of joins for KTable-KTable join,
the
> > > follow
> > > > > the
> > > > > > > same semantics in SQL joins:
> > > > > > >
> > > > > > > KTable.join(KTable): when there is no matching record from
> inner
> > > > table
> > > > > > when
> > > > > > > received a new record from outer table, no output; and
vice
> > versa.
> > > > > > > KTable.leftjoin(KTable): when there is no matching record
from
> > > inner
> > > > > > table
> > > > > > > when received a new record from outer table, output (a,
null);
> on
> > > the
> > > > > > other
> > > > > > > direction no output.
> > > > > > > KTable.outerjoin(KTable): when there is no matching record
from
> > > > inner /
> > > > > > > outer table when received a new record from outer / inner
> table,
> > > > output
> > > > > > (a,
> > > > > > > null) or (null, b).
> > > > > > >
> > > > > > >
> > > > > > > 2) The result topic is also a changelog topic, although
it will
> > be
> > > > log
> > > > > > > compacted on the key over time, if you consume immediately
the
> > log
> > > > may
> > > > > > not
> > > > > > > be yet compacted.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Fri, Apr 15, 2016 at 2:11 AM, Guillermo Lammers Corral
<
> > > > > > > guillermo.lammers.corral@tecsisa.com> wrote:
> > > > > > >
> > > > > > > > Hi Guozhang,
> > > > > > > >
> > > > > > > > Thank you very much for your reply and sorry for the
generic
> > > > > question,
> > > > > > > I'll
> > > > > > > > try to explain with some pseudocode.
> > > > > > > >
> > > > > > > > I have two KTable with a join:
> > > > > > > >
> > > > > > > > ktable1: KTable[String, String] = builder.table("topic1")
> > > > > > > > ktable2: KTable[String, String] = builder.table("topic2")
> > > > > > > >
> > > > > > > > result: KTable[String, ResultUnion] =
> > > > > > > > ktable1.join(ktable2, (data1, data2) => new
> ResultUnion(data1,
> > > > > data2))
> > > > > > > >
> > > > > > > > I send the result to a topic result.to("resultTopic").
> > > > > > > >
> > > > > > > > My questions are related with the following scenario:
> > > > > > > >
> > > > > > > > - The streming is up & running without data in
topics
> > > > > > > >
> > > > > > > > - I send data to "topic2", for example a key/value
like that
> > > > > > > ("uniqueKey1",
> > > > > > > > "hello")
> > > > > > > >
> > > > > > > > - I see null values in topic "resultTopic", i.e.
> ("uniqueKey1",
> > > > null)
> > > > > > > >
> > > > > > > > - If I send data to "topic1", for example a key/value
like
> that
> > > > > > > > ("uniqueKey1", "world") then I see this values in
topic
> > > > > "resultTopic",
> > > > > > > > ("uniqueKey1", ResultUnion("hello", "world"))
> > > > > > > >
> > > > > > > > Q: If we send data for one of the KTable that does
not have
> the
> > > > > > > > corresponding data by key in the other one, obtain
null
> values
> > in
> > > > the
> > > > > > > > result final topic is the expected behavior?
> > > > > > > >
> > > > > > > > My next step would be use Kafka Connect to persist
result
> data
> > in
> > > > C*
> > > > > (I
> > > > > > > > have not read yet the Connector docs...), is this
the way to
> do
> > > it?
> > > > > (I
> > > > > > > mean
> > > > > > > > prepare the data in the topic).
> > > > > > > >
> > > > > > > > Q: On the other hand, just to try, I have a KTable
that read
> > > > messages
> > > > > > in
> > > > > > > > "resultTopic" and prints them. If the stream is a
KTable I am
> > > > > wondering
> > > > > > > why
> > > > > > > > is getting all the values from the topic even those
with the
> > same
> > > > > key?
> > > > > > > >
> > > > > > > > Thanks in advance! Great job answering community!
> > > > > > > >
> > > > > > > > 2016-04-14 20:00 GMT+02:00 Guozhang Wang <wangguoz@gmail.com
> >:
> > > > > > > >
> > > > > > > > > Hi Guillermo,
> > > > > > > > >
> > > > > > > > > 1) Yes in your case, the streams are really a
"changelog"
> > > stream,
> > > > > > hence
> > > > > > > > you
> > > > > > > > > should create the stream as KTable, and do KTable-KTable
> > join.
> > > > > > > > >
> > > > > > > > > 2) Could elaborate about "achieving this"? What
behavior do
> > > > require
> > > > > > in
> > > > > > > > the
> > > > > > > > > application logic?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Apr 14, 2016 at 1:30 AM, Guillermo Lammers
Corral <
> > > > > > > > > guillermo.lammers.corral@tecsisa.com> wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > I am a newbie to Kafka Streams and I am
using it trying
> to
> > > > solve
> > > > > a
> > > > > > > > > > particular use case. Let me explain.
> > > > > > > > > >
> > > > > > > > > > I have two sources of data both like that:
> > > > > > > > > >
> > > > > > > > > > Key (string)
> > > > > > > > > > DateTime (hourly granularity)
> > > > > > > > > > Value
> > > > > > > > > >
> > > > > > > > > > I need to join the two sources by key and
date (hour of
> > day)
> > > to
> > > > > > > obtain:
> > > > > > > > > >
> > > > > > > > > > Key (string)
> > > > > > > > > > DateTime (hourly granularity)
> > > > > > > > > > ValueSource1
> > > > > > > > > > ValueSource2
> > > > > > > > > >
> > > > > > > > > > I think that first I'd need to push the
messages in Kafka
> > > > topics
> > > > > > with
> > > > > > > > the
> > > > > > > > > > date as part of the key because I'll group
by key taking
> > into
> > > > > > account
> > > > > > > > the
> > > > > > > > > > date. So maybe the key must be a new string
like
> > > key_timestamp.
> > > > > > But,
> > > > > > > of
> > > > > > > > > > course, it is not the main problem, is just
an additional
> > > > > > > explanation.
> > > > > > > > > >
> > > > > > > > > > Ok, so data are in topics, here we go!
> > > > > > > > > >
> > > > > > > > > > - Multiple records allows per key but only
the latest
> value
> > > > for a
> > > > > > > > record
> > > > > > > > > > key will be considered. I should use two
KTable with some
> > > join
> > > > > > > > strategy,
> > > > > > > > > > right?
> > > > > > > > > >
> > > > > > > > > > - Data of both sources could arrive at any
time. What
> can I
> > > do
> > > > to
> > > > > > > > achieve
> > > > > > > > > > this?
> > > > > > > > > >
> > > > > > > > > > Thanks in advance.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message