kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tomoyuki Saito <aocch...@gmail.com>
Subject Re: Batch processing with Kafka Streams with at-least-once semantics
Date Wed, 17 Oct 2018 03:08:08 GMT
Hi,

I've read the guide below, and filed up a PR:
https://github.com/apache/kafka/pull/5809
Started without creating a JIRA ticket.

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

Thank you,
Tomoyuki

On Wed, Oct 17, 2018 at 9:19 AM Tomoyuki Saito <aocchoda@gmail.com> wrote:

> Hi,
>
> > Would you like to contribute a PR?
>
> Yes! Sounds great.
>
> Should I file a JIRA ticket first?
>
> Tomoyuki
>
>
>
>
> On Wed, Oct 17, 2018 at 12:19 AM Guozhang Wang <wangguoz@gmail.com> wrote:
>
>> I think we should not allow negative values, and today it seems that this
>> is not checked against.
>>
>> In fact, it should be a one-liner fix in the `config.define` function call
>> to constraint its possible value range. Would you like to contribute a PR?
>>
>>
>> Guozhang
>>
>>
>> On Fri, Oct 12, 2018 at 10:56 PM Tomoyuki Saito <aocchoda@gmail.com>
>> wrote:
>>
>> > Hello Guozhang,
>> >
>> > Thank you for your reply.
>> >
>> > > setting to "0" will actually mean to commit every time.
>> >
>> > Hum, I somehow misunderstood the code. Now I understand that is true.
>> >
>> > > You should actually set it to Long.MAX_VALUE to indicate "not commit
>> > regularly by intervals"
>> >
>> > I see. I'd consider taking that approach!
>> >
>> >
>> > Another question just from curiosity:
>> >
>> > From looking at code, setting `commit.interval.ms` to a negative value
>> can
>> > also indicate "not commit regularly by intervals." In other words, it
>> has
>> > the same effect as setting it to Long.MAX_VALUE.
>> > Is this true?
>> >
>> > The implication of setting the property to a negative value is
>> > undocumented, so I'm thinking setting it to Long.MAX_VALUE is
>> preferable.
>> >
>> > Thank you.
>> > Tomoyuki
>> >
>> > On Sat, Oct 13, 2018 at 1:48 AM Guozhang Wang <wangguoz@gmail.com>
>> wrote:
>> >
>> > > Hello Tomoyuki,
>> > >
>> > > 1. Seems a good use case for Streams.
>> > > 2. You should actually set it to Long.MAX_VALUE to indicate "not
>> commit
>> > > regularly by intervals", setting to "0" will actually mean to commit
>> > every
>> > > time. Then you can leverage on ProcessorContext.commit() to manually
>> > commit
>> > > after the batch is done.
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > >
>> > >
>> > > On Wed, Oct 10, 2018 at 11:15 PM, Tomoyuki Saito <aocchoda@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I'm exploring whether it is possible to use Kafka Streams for batch
>> > > > processing with at-least-once semantics.
>> > > >
>> > > > What I want to do is to insert records in an external storage in
>> bulk,
>> > > and
>> > > > execute offset-commit after the bulk insertion to achieve
>> at-least-once
>> > > > semantics.
>> > > > A processing topology can be very simple like:
>> > > > ```
>> > > > TopologyBuilder builder = new TopologyBuilder();
>> > > > builder.addSource("source", "topic");
>> > > > builder.addProcessor("processor", processorSupplier, "source");
>> > > > new KafkaStreams(builder, streamsConfig);
>> > > > ```
>> > > >
>> > > > My questions are:
>> > > >
>> > > > 1. Could you suggest how to achieve that? Can it be better to use
>> > > > KafkaConsumer instead of KafkaStreams?
>> > > >
>> > > > 2. From my understanding, when setting StreamsConfig `
>> > commit.interval.ms
>> > > `
>> > > > to 0, we can turn off offset-commit by KafkaStreams internal logic
>> (in
>> > > > StreamThread#maybeCommit), and control when to commit offsets with
>> > > > `ProcessorContext#commit`. Is my understanding right? Any expected
>> > issues
>> > > > for this approach?
>> > > >
>> > > > Thank you,
>> > > > Tomoyuki
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message