kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?
Date Sat, 09 Dec 2017 19:48:14 GMT
Hi Matthias, yes that definitely helps. A few thoughts inline below.

Thank you!

On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <matthias@confluent.io>

> Hard to give a generic answer.
> 1. We recommend to over-partitions your input topics to start with (to
> avoid that you need to add new partitions later on); problem avoidance
> is the best strategy. There will be some overhead for this obviously on
> the broker side, but it's not too big.

Yes,  I will definitely be doing this.

> 2. Not sure why you would need a new cluster? You can just create a new
> topic in the same cluster and let Kafka Streams read from there.

Motivated by fear of disturbing/manipulating a production cluster and the
relative ease of putting up a new cluster. Perhaps that fear is irrational.
I could alternatively just prefix topics.

> 3. Depending on your state requirements, you could also run two
> applications in parallel -- the new one reads from the new input topic
> with more partitions and you configure your producer to write to the new
> topic (or maybe even to dual writes to both). If your new application is
> ramped up, you can stop the old one.

Yes, this is my plan for migrations. If I could run it past you:

(i) Write input topics from the old prefix to the new prefix.
(ii) Start the new Kafka Streams application against the new prefix.
(iii) When the two applications are in sync, stop writing to the old topics

Since I will be copying from an old prefix to new prefix, it seems
essential here to have timestamps embedded in the data records along with a
custom timestamp extractor.

I really wish I could get some more flavor on "Flow Control With Timestamps
in this regard. Assuming my timestamps are monotonically increasing within
each input topic, from my reading of that section it still appears that the
result of reprocessing input topics is non-deterministic beyond the
"records in its stream record buffer". Some seemingly crucial sentences:

> *This flow control is best-effort because it is not always possible to
strictly enforce execution order across streams by record timestamp; in
fact, in order to enforce strict execution ordering, one must either wait
until the system has received all the records from all streams (which may
be quite infeasible in practice) or inject additional information about
timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.*

Practically, how am I to understand this? How large is the record buffer?
Is it configurable?

For example, suppose I am re-processing an inner join on partitions P1
(left) and P2 (right). In the original processing, record K1V1T1 was
recorded onto P1, then some time laster record K1V2T2 was recorded onto P2.
As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1
and P2 contain historical data and the Kafka Streams consumers can read P2
before P1. If the consumer reads P2 before P1, will the task still properly
align these two records given their timestamps for the correct inner join,
assuming both records within the record buffer? I've experimented with
this, but unfortunately I didn't have time to really set up good
experiments to satisfy myself.

> 4. If you really need to add new partitions, you need to fix up all
> topics manually -- including all topics Kafka Streams created for you.
> Adding partitions messes up all your state shared as key-based
> partitioning changes. This implies that you application must be stopped!
> Thus, if you have zero downtime requirements you can't do this at all.
> 5. If you have a stateless application all those issues go away though
> and you can even add new partitions during runtime.
Stateless in what sense? Kafka Streams seems to be all about aligning and
manipulating state to create more state. Are you referring to internal
state, specifically?

> Hope this helps.
> -Matthias
> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
> > I am about to put a topology into production and I am concerned that I
> > don't know how to repartition/rebalance the topics in the event that I
> need
> > to add more partitions.
> >
> > My inclination is that I should spin up a new cluster and run some kind
> of
> > consumer/producer combination that takes data from the previous cluster
> and
> > writes it to the new cluster. A new instance of the Kafka Streams
> > application then works against this new cluster. But I'm not sure how to
> > best execute this, or whether this approach is sound at all. I am
> imagining
> > many things may go wrong. Without going into further speculation, what is
> > the best way to do this?
> >
> > Thank you,
> > Dmitry
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message