kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timur Yusupov <ttyusu...@gmail.com>
Subject Re: KafkaStreams pause specific topic partition consumption
Date Mon, 08 May 2017 23:01:26 GMT
That means in order to process filtered out records in a next batch, we
have to seek KafkaStreams back, right?

On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> I see.
>
> I you do the step of storing the end offsets in your database before
> starting up Streams this would work.
>
> What you could do as a work around (even if it might not be a nice
> solution), is to apply a `transform()` as your first operator. Within
> `transfrom()` you get access to there current record offset via
> `context.offset` (context object is provided via `init()`). Thus, you
> can implement an "offset filter" and also track if all partitions did
> reach their end offset (you also get a records partitions via context).
>
> Thus, if one record is after the partition end-offset, you just filter
> the record out. If all partitions did reach end-offset, you can set a
> flag to notify you "main" thread to close() Kafka Streams instances.
>
> Does this make sense?
>
>
> -Matthias
>
> On 5/8/17 12:49 PM, Timur Yusupov wrote:
> > Matthias,
> >
> > Thanks for your answers.
> >
> >>> So we are considering to just pause specific
> >>> topic partitions as soon as we arrive to stop offsets for them.
> >> I am just wondering how you would do this in a fault-tolerant way (if
> you
> > would have pause API)?
> > On start of batch cycle we have to store somewhere (for our use case
> > database we already use will work) end offsets for topic partitions we
> are
> > interested in. Then we just need to process all messages up to stored end
> > offsets. In case application is restarted - it first checks database for
> > stored end offsets.
> >
> >>> 2) Assume we process multiple topics in some parallel way and want to
> > pause
> >>> some topics while waiting for other topics to catch up.
> >> Streams synchronizes topics on time automatically for your. So I am
> > wondering why this does not work for you?
> > Right, this is probably a bad example, but use case 1) with batch
> > processing is still relevant.
> >
> >
> >>
> >> -Matthias
> >>
> >>
> >> On 4/27/17 8:52 AM, Timur Yusupov wrote:
> >>> I see it is possible to pause specific topic partition consumption when
> >>> using KafkaConsumer directly, but looks like it is not possible when
> >> using
> >>> KafkaStreams.
> >>>
> >>> There are following use cases for that:
> >>> 1) Doing batch processing using Kafka Streams (I found
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>> proposal for Kafka Streams, but according to
> >>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> >>> development on that side). So we are considering to just pause specific
> >>> topic partitions as soon as we arrive to stop offsets for them.
> >>>
> >>> 2) Assume we process multiple topics in some parallel way and want to
> >> pause
> >>> some topics while waiting for other topics to catch up.
> >>>
> >>> Actually, the first use case is more important for us, so would be good
> >> to
> >>> know if there is a possibility or some improvements are already planned
> >> for
> >>> allowing to pause specific topic partition consumption in KafkaStream.
> >>>
> >>
> >>
> >
> >
>
>


-- 
С наилучшими пожеланиями, Тимур.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message