kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: KafkaStreams pause specific topic partition consumption
Date Mon, 08 May 2017 22:19:01 GMT
I see.

I you do the step of storing the end offsets in your database before
starting up Streams this would work.

What you could do as a work around (even if it might not be a nice
solution), is to apply a `transform()` as your first operator. Within
`transfrom()` you get access to there current record offset via
`context.offset` (context object is provided via `init()`). Thus, you
can implement an "offset filter" and also track if all partitions did
reach their end offset (you also get a records partitions via context).

Thus, if one record is after the partition end-offset, you just filter
the record out. If all partitions did reach end-offset, you can set a
flag to notify you "main" thread to close() Kafka Streams instances.

Does this make sense?


-Matthias

On 5/8/17 12:49 PM, Timur Yusupov wrote:
> Matthias,
> 
> Thanks for your answers.
> 
>>> So we are considering to just pause specific
>>> topic partitions as soon as we arrive to stop offsets for them.
>> I am just wondering how you would do this in a fault-tolerant way (if you
> would have pause API)?
> On start of batch cycle we have to store somewhere (for our use case
> database we already use will work) end offsets for topic partitions we are
> interested in. Then we just need to process all messages up to stored end
> offsets. In case application is restarted - it first checks database for
> stored end offsets.
> 
>>> 2) Assume we process multiple topics in some parallel way and want to
> pause
>>> some topics while waiting for other topics to catch up.
>> Streams synchronizes topics on time automatically for your. So I am
> wondering why this does not work for you?
> Right, this is probably a bad example, but use case 1) with batch
> processing is still relevant.
> 
> 
>>
>> -Matthias
>>
>>
>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
>>> I see it is possible to pause specific topic partition consumption when
>>> using KafkaConsumer directly, but looks like it is not possible when
>> using
>>> KafkaStreams.
>>>
>>> There are following use cases for that:
>>> 1) Doing batch processing using Kafka Streams (I found
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>> proposal for Kafka Streams, but according to
>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
>>> development on that side). So we are considering to just pause specific
>>> topic partitions as soon as we arrive to stop offsets for them.
>>>
>>> 2) Assume we process multiple topics in some parallel way and want to
>> pause
>>> some topics while waiting for other topics to catch up.
>>>
>>> Actually, the first use case is more important for us, so would be good
>> to
>>> know if there is a possibility or some improvements are already planned
>> for
>>> allowing to pause specific topic partition consumption in KafkaStream.
>>>
>>
>>
> 
> 


Mime
View raw message