kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Yusupov <ttyusu...@gmail.com>
Subject Re: KafkaStreams pause specific topic partition consumption
Date Mon, 08 May 2017 23:14:14 GMT
Got it, thanks, Matthias!

On Tue, May 9, 2017 at 2:07 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Yes. That is something you would need to do external too.
>
> There is a KIP for a tool
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 122%3A+Add+Reset+Consumer+Group+Offsets+tooling)
> -- but you can also do this using a single `KafkaConsumer` with
> `group.id == application.id` that gets all partitions assigned and does
> the corresponding seek plus commitSync.
>
> Note, you need to make sure that Streams "consumer group" is completely
> inactive to avoid conflict. To could add a check similar to
> https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/tools/StreamsResetter.java#L94
>
>
> -Matthias
>
> On 5/8/17 4:01 PM, Timur Yusupov wrote:
> > That means in order to process filtered out records in a next batch, we
> > have to seek KafkaStreams back, right?
> >
> > On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> I see.
> >>
> >> I you do the step of storing the end offsets in your database before
> >> starting up Streams this would work.
> >>
> >> What you could do as a work around (even if it might not be a nice
> >> solution), is to apply a `transform()` as your first operator. Within
> >> `transfrom()` you get access to there current record offset via
> >> `context.offset` (context object is provided via `init()`). Thus, you
> >> can implement an "offset filter" and also track if all partitions did
> >> reach their end offset (you also get a records partitions via context).
> >>
> >> Thus, if one record is after the partition end-offset, you just filter
> >> the record out. If all partitions did reach end-offset, you can set a
> >> flag to notify you "main" thread to close() Kafka Streams instances.
> >>
> >> Does this make sense?
> >>
> >>
> >> -Matthias
> >>
> >> On 5/8/17 12:49 PM, Timur Yusupov wrote:
> >>> Matthias,
> >>>
> >>> Thanks for your answers.
> >>>
> >>>>> So we are considering to just pause specific
> >>>>> topic partitions as soon as we arrive to stop offsets for them.
> >>>> I am just wondering how you would do this in a fault-tolerant way (if
> >> you
> >>> would have pause API)?
> >>> On start of batch cycle we have to store somewhere (for our use case
> >>> database we already use will work) end offsets for topic partitions we
> >> are
> >>> interested in. Then we just need to process all messages up to stored
> end
> >>> offsets. In case application is restarted - it first checks database
> for
> >>> stored end offsets.
> >>>
> >>>>> 2) Assume we process multiple topics in some parallel way and want
to
> >>> pause
> >>>>> some topics while waiting for other topics to catch up.
> >>>> Streams synchronizes topics on time automatically for your. So I am
> >>> wondering why this does not work for you?
> >>> Right, this is probably a bad example, but use case 1) with batch
> >>> processing is still relevant.
> >>>
> >>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
> >>>>> I see it is possible to pause specific topic partition consumption
> when
> >>>>> using KafkaConsumer directly, but looks like it is not possible
when
> >>>> using
> >>>>> KafkaStreams.
> >>>>>
> >>>>> There are following use cases for that:
> >>>>> 1) Doing batch processing using Kafka Streams (I found
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>>>> proposal for Kafka Streams, but according to
> >>>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> >>>>> development on that side). So we are considering to just pause
> specific
> >>>>> topic partitions as soon as we arrive to stop offsets for them.
> >>>>>
> >>>>> 2) Assume we process multiple topics in some parallel way and want
to
> >>>> pause
> >>>>> some topics while waiting for other topics to catch up.
> >>>>>
> >>>>> Actually, the first use case is more important for us, so would
be
> good
> >>>> to
> >>>>> know if there is a possibility or some improvements are already
> planned
> >>>> for
> >>>>> allowing to pause specific topic partition consumption in
> KafkaStream.
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message