kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?
Date Sat, 09 Dec 2017 19:59:29 GMT
> How large is the record buffer? Is it configurable?

I seem to have just discovered this answer to this:
buffered.records.per.partition

On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky <dminkovsky@gmail.com>
wrote:

> Hi Matthias, yes that definitely helps. A few thoughts inline below.
>
> Thank you!
>
> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
>> Hard to give a generic answer.
>>
>> 1. We recommend to over-partitions your input topics to start with (to
>> avoid that you need to add new partitions later on); problem avoidance
>> is the best strategy. There will be some overhead for this obviously on
>> the broker side, but it's not too big.
>>
>
> Yes,  I will definitely be doing this.
>
>
>>
>> 2. Not sure why you would need a new cluster? You can just create a new
>> topic in the same cluster and let Kafka Streams read from there.
>>
>
> Motivated by fear of disturbing/manipulating a production cluster and the
> relative ease of putting up a new cluster. Perhaps that fear is irrational.
> I could alternatively just prefix topics.
>
>
>>
>> 3. Depending on your state requirements, you could also run two
>> applications in parallel -- the new one reads from the new input topic
>> with more partitions and you configure your producer to write to the new
>> topic (or maybe even to dual writes to both). If your new application is
>> ramped up, you can stop the old one.
>>
>
> Yes, this is my plan for migrations. If I could run it past you:
>
> (i) Write input topics from the old prefix to the new prefix.
> (ii) Start the new Kafka Streams application against the new prefix.
> (iii) When the two applications are in sync, stop writing to the old
> topics
>
> Since I will be copying from an old prefix to new prefix, it seems
> essential here to have timestamps embedded in the data records along with a
> custom timestamp extractor.
>
> I really wish I could get some more flavor on "Flow Control With
> Timestamps
> <https://docs.confluent.io/current/streams/architecture.html#flow-control-with-timestamps>"
> in this regard. Assuming my timestamps are monotonically increasing within
> each input topic, from my reading of that section it still appears that the
> result of reprocessing input topics is non-deterministic beyond the
> "records in its stream record buffer". Some seemingly crucial sentences:
>
> > *This flow control is best-effort because it is not always possible to
> strictly enforce execution order across streams by record timestamp; in
> fact, in order to enforce strict execution ordering, one must either wait
> until the system has received all the records from all streams (which may
> be quite infeasible in practice) or inject additional information about
> timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.*
>
>
> Practically, how am I to understand this? How large is the record buffer?
> Is it configurable?
>
> For example, suppose I am re-processing an inner join on partitions P1
> (left) and P2 (right). In the original processing, record K1V1T1 was
> recorded onto P1, then some time laster record K1V2T2 was recorded onto P2.
> As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1
> and P2 contain historical data and the Kafka Streams consumers can read P2
> before P1. If the consumer reads P2 before P1, will the task still
> properly align these two records given their timestamps for the correct
> inner join, assuming both records within the record buffer? I've
> experimented with this, but unfortunately I didn't have time to really set
> up good experiments to satisfy myself.
>
>
>> 4. If you really need to add new partitions, you need to fix up all
>> topics manually -- including all topics Kafka Streams created for you.
>> Adding partitions messes up all your state shared as key-based
>> partitioning changes. This implies that you application must be stopped!
>> Thus, if you have zero downtime requirements you can't do this at all.
>>
>> 5. If you have a stateless application all those issues go away though
>> and you can even add new partitions during runtime.
>>
>>
> Stateless in what sense? Kafka Streams seems to be all about aligning and
> manipulating state to create more state. Are you referring to internal
> state, specifically?
>
>
>
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>>
>>
>> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote:
>> > I am about to put a topology into production and I am concerned that I
>> > don't know how to repartition/rebalance the topics in the event that I
>> need
>> > to add more partitions.
>> >
>> > My inclination is that I should spin up a new cluster and run some kind
>> of
>> > consumer/producer combination that takes data from the previous cluster
>> and
>> > writes it to the new cluster. A new instance of the Kafka Streams
>> > application then works against this new cluster. But I'm not sure how to
>> > best execute this, or whether this approach is sound at all. I am
>> imagining
>> > many things may go wrong. Without going into further speculation, what
>> is
>> > the best way to do this?
>> >
>> > Thank you,
>> > Dmitry
>> >
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message