kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?
Date Sun, 10 Dec 2017 00:52:44 GMT
About timestamps: embedding timestamps in the payload itself is not
really necessary IMHO. Each record has meta-data timestamp that provides
the exact same semantic. If you just copy data from one topic to
another, the timestamp can be preserved (using plain consumer/producer
and setting the timestamp of the input record explicitly as timestamp
for the output recrod-- for streams, it could be that "some" timestamps
get altered as we apply slightly different timestamp inference
logic---but there are plans to improve this and to better inference that
would preserve the timestamp exactly in Streams, too).

With regard to flow control: it depends on the operators you use. Some
are fully deterministic, other have some runtime dependencies. Fully
deterministic are all aggregations (non-windowed and windowed), as well
as inner KStream-KStream join and all variants (inner/left/outer) of
KTable-KTable join.

> If the consumer reads P2 before P1, will the task still
> properly align these two records given their timestamps for the correct
> inner join, assuming both records within the record buffer?

This will always be computed correctly, even if both records are not in
the buffer at the same time :)

Thus, only left/outer KStream-KStream and KStream-KTable join have some
runtime dependencies. For more details about join, check out this blog
post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Btw: we are aware of some weaknesses in the current implementation and I
it's on our road map to strengthen our guarantees. Also with regard to
the internally used record buffer, time management in general, as well
as operator semantics.

Note though: Kafka guarantees offset-based ordering, not
timestamp-ordering. And thus, also in Kafka Streams we process records
in offset order. This implies, that records might be out-of-order with
regard to their timestamps, but our operators are implemented to handle
this case correctly (minus some know issues as mentioned above that we
are going to fix in future releases).

Stateless: I mean, if you write a program that only uses stateless
operators like filter/map but not aggregation/joins.


On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
>> How large is the record buffer? Is it configurable?
> I seem to have just discovered this answer to this:
> buffered.records.per.partition
> On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky <dminkovsky@gmail.com>
> wrote:
>> Hi Matthias, yes that definitely helps. A few thoughts inline below.
>> Thank you!
>> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>> Hard to give a generic answer.
>>> 1. We recommend to over-partitions your input topics to start with (to
>>> avoid that you need to add new partitions later on); problem avoidance
>>> is the best strategy. There will be some overhead for this obviously on
>>> the broker side, but it's not too big.
>> Yes,  I will definitely be doing this.
>>> 2. Not sure why you would need a new cluster? You can just create a new
>>> topic in the same cluster and let Kafka Streams read from there.
>> Motivated by fear of disturbing/manipulating a production cluster and the
>> relative ease of putting up a new cluster. Perhaps that fear is irrational.
>> I could alternatively just prefix topics.
>>> 3. Depending on your state requirements, you could also run two
>>> applications in parallel -- the new one reads from the new input topic
>>> with more partitions and you configure your producer to write to the new
>>> topic (or maybe even to dual writes to both). If your new application is
>>> ramped up, you can stop the old one.
>> Yes, this is my plan for migrations. If I could run it past you:
>> (i) Write input topics from the old prefix to the new prefix.
>> (ii) Start the new Kafka Streams application against the new prefix.
>> (iii) When the two applications are in sync, stop writing to the old
>> topics
>> Since I will be copying from an old prefix to new prefix, it seems
>> essential here to have timestamps embedded in the data records along with a
>> custom timestamp extractor.
>> I really wish I could get some more flavor on "Flow Control With
>> Timestamps
>> <https://docs.confluent.io/current/streams/architecture.html#flow-control-with-timestamps>"
>> in this regard. Assuming my timestamps are monotonically increasing within
>> each input topic, from my reading of that section it still appears that the
>> result of reprocessing input topics is non-deterministic beyond the
>> "records in its stream record buffer". Some seemingly crucial sentences:
>>> *This flow control is best-effort because it is not always possible to
>> strictly enforce execution order across streams by record timestamp; in
>> fact, in order to enforce strict execution ordering, one must either wait
>> until the system has received all the records from all streams (which may
>> be quite infeasible in practice) or inject additional information about
>> timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.*
>> Practically, how am I to understand this? How large is the record buffer?
>> Is it configurable?
>> For example, suppose I am re-processing an inner join on partitions P1
>> (left) and P2 (right). In the original processing, record K1V1T1 was
>> recorded onto P1, then some time laster record K1V2T2 was recorded onto P2.
>> As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1
>> and P2 contain historical data and the Kafka Streams consumers can read P2
>> before P1. If the consumer reads P2 before P1, will the task still
>> properly align these two records given their timestamps for the correct
>> inner join, assuming both records within the record buffer? I've
>> experimented with this, but unfortunately I didn't have time to really set
>> up good experiments to satisfy myself.
>>> 4. If you really need to add new partitions, you need to fix up all
>>> topics manually -- including all topics Kafka Streams created for you.
>>> Adding partitions messes up all your state shared as key-based
>>> partitioning changes. This implies that you application must be stopped!
>>> Thus, if you have zero downtime requirements you can't do this at all.
>>> 5. If you have a stateless application all those issues go away though
>>> and you can even add new partitions during runtime.
>> Stateless in what sense? Kafka Streams seems to be all about aligning and
>> manipulating state to create more state. Are you referring to internal
>> state, specifically?
>>> Hope this helps.
>>> -Matthias
>>> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
>>>> I am about to put a topology into production and I am concerned that I
>>>> don't know how to repartition/rebalance the topics in the event that I
>>> need
>>>> to add more partitions.
>>>> My inclination is that I should spin up a new cluster and run some kind
>>> of
>>>> consumer/producer combination that takes data from the previous cluster
>>> and
>>>> writes it to the new cluster. A new instance of the Kafka Streams
>>>> application then works against this new cluster. But I'm not sure how to
>>>> best execute this, or whether this approach is sound at all. I am
>>> imagining
>>>> many things may go wrong. Without going into further speculation, what
>>> is
>>>> the best way to do this?
>>>> Thank you,
>>>> Dmitry

View raw message