kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: KafkaStreams pause specific topic partition consumption
Date Mon, 08 May 2017 23:07:54 GMT
Yes. That is something you would need to do external too.

There is a KIP for a tool
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling)
-- but you can also do this using a single `KafkaConsumer` with
`group.id == application.id` that gets all partitions assigned and does
the corresponding seek plus commitSync.

Note, you need to make sure that Streams "consumer group" is completely
inactive to avoid conflict. To could add a check similar to
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L94


-Matthias

On 5/8/17 4:01 PM, Timur Yusupov wrote:
> That means in order to process filtered out records in a next batch, we
> have to seek KafkaStreams back, right?
> 
> On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> I see.
>>
>> I you do the step of storing the end offsets in your database before
>> starting up Streams this would work.
>>
>> What you could do as a work around (even if it might not be a nice
>> solution), is to apply a `transform()` as your first operator. Within
>> `transfrom()` you get access to there current record offset via
>> `context.offset` (context object is provided via `init()`). Thus, you
>> can implement an "offset filter" and also track if all partitions did
>> reach their end offset (you also get a records partitions via context).
>>
>> Thus, if one record is after the partition end-offset, you just filter
>> the record out. If all partitions did reach end-offset, you can set a
>> flag to notify you "main" thread to close() Kafka Streams instances.
>>
>> Does this make sense?
>>
>>
>> -Matthias
>>
>> On 5/8/17 12:49 PM, Timur Yusupov wrote:
>>> Matthias,
>>>
>>> Thanks for your answers.
>>>
>>>>> So we are considering to just pause specific
>>>>> topic partitions as soon as we arrive to stop offsets for them.
>>>> I am just wondering how you would do this in a fault-tolerant way (if
>> you
>>> would have pause API)?
>>> On start of batch cycle we have to store somewhere (for our use case
>>> database we already use will work) end offsets for topic partitions we
>> are
>>> interested in. Then we just need to process all messages up to stored end
>>> offsets. In case application is restarted - it first checks database for
>>> stored end offsets.
>>>
>>>>> 2) Assume we process multiple topics in some parallel way and want to
>>> pause
>>>>> some topics while waiting for other topics to catch up.
>>>> Streams synchronizes topics on time automatically for your. So I am
>>> wondering why this does not work for you?
>>> Right, this is probably a bad example, but use case 1) with batch
>>> processing is still relevant.
>>>
>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
>>>>> I see it is possible to pause specific topic partition consumption when
>>>>> using KafkaConsumer directly, but looks like it is not possible when
>>>> using
>>>>> KafkaStreams.
>>>>>
>>>>> There are following use cases for that:
>>>>> 1) Doing batch processing using Kafka Streams (I found
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>>>> proposal for Kafka Streams, but according to
>>>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
>>>>> development on that side). So we are considering to just pause specific
>>>>> topic partitions as soon as we arrive to stop offsets for them.
>>>>>
>>>>> 2) Assume we process multiple topics in some parallel way and want to
>>>> pause
>>>>> some topics while waiting for other topics to catch up.
>>>>>
>>>>> Actually, the first use case is more important for us, so would be good
>>>> to
>>>>> know if there is a possibility or some improvements are already planned
>>>> for
>>>>> allowing to pause specific topic partition consumption in KafkaStream.
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
> 
> 


Mime
View raw message