kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tomoyuki Saito <aocch...@gmail.com>
Subject Re: Batch processing with Kafka Streams with at-least-once semantics
Date Sat, 13 Oct 2018 05:55:54 GMT
Hello Guozhang,

Thank you for your reply.

> setting to "0" will actually mean to commit every time.

Hum, I somehow misunderstood the code. Now I understand that is true.

> You should actually set it to Long.MAX_VALUE to indicate "not commit
regularly by intervals"

I see. I'd consider taking that approach!


Another question just from curiosity:

>From looking at code, setting `commit.interval.ms` to a negative value can
also indicate "not commit regularly by intervals." In other words, it has
the same effect as setting it to Long.MAX_VALUE.
Is this true?

The implication of setting the property to a negative value is
undocumented, so I'm thinking setting it to Long.MAX_VALUE is preferable.

Thank you.
Tomoyuki

On Sat, Oct 13, 2018 at 1:48 AM Guozhang Wang <wangguoz@gmail.com> wrote:

> Hello Tomoyuki,
>
> 1. Seems a good use case for Streams.
> 2. You should actually set it to Long.MAX_VALUE to indicate "not commit
> regularly by intervals", setting to "0" will actually mean to commit every
> time. Then you can leverage on ProcessorContext.commit() to manually commit
> after the batch is done.
>
>
> Guozhang
>
>
>
>
> On Wed, Oct 10, 2018 at 11:15 PM, Tomoyuki Saito <aocchoda@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm exploring whether it is possible to use Kafka Streams for batch
> > processing with at-least-once semantics.
> >
> > What I want to do is to insert records in an external storage in bulk,
> and
> > execute offset-commit after the bulk insertion to achieve at-least-once
> > semantics.
> > A processing topology can be very simple like:
> > ```
> > TopologyBuilder builder = new TopologyBuilder();
> > builder.addSource("source", "topic");
> > builder.addProcessor("processor", processorSupplier, "source");
> > new KafkaStreams(builder, streamsConfig);
> > ```
> >
> > My questions are:
> >
> > 1. Could you suggest how to achieve that? Can it be better to use
> > KafkaConsumer instead of KafkaStreams?
> >
> > 2. From my understanding, when setting StreamsConfig `commit.interval.ms
> `
> > to 0, we can turn off offset-commit by KafkaStreams internal logic (in
> > StreamThread#maybeCommit), and control when to commit offsets with
> > `ProcessorContext#commit`. Is my understanding right? Any expected issues
> > for this approach?
> >
> > Thank you,
> > Tomoyuki
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message