kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Garcia <dav...@spiceworks.com>
Subject Re: KafkaStream: puncutuate() never called even when data is received by process()
Date Sat, 26 Nov 2016 21:39:29 GMT
I know that the Kafka team is working on a new way to reason about time.  My team's solution
was to not use punctuate...but this only works if you have guarantees that all of the tasks
will receive messages..if not all the partitions.  Another solution is to periodically send
canaries to all partitions your app is listening to.  In either case it's a bandaid.  I know
the team is aware of this bug and they are working on it.  Hopefully it will be addressed
in 0.10.1.1

Sent from my iPhone

> On Nov 24, 2016, at 1:55 AM, shahab <shahab.mokari@gmail.com> wrote:
> 
> Thanks for the comments.
> @David: yes, I have a source which is reading data from two topics and one
> of them were empty while the second one was loaded with plenty of data.
> So what do you suggest to solve this ?
> Here is snippet of my code:
> 
> StreamsConfig config = new StreamsConfig(configProperties);
> TopologyBuilder builder = new TopologyBuilder();
> AppSettingsFetcher appSettingsFetcher = initAppSettingsFetcher();
> 
> StateStoreSupplier company_bucket= Stores.create("CBS")
>        .withKeys(Serdes.String())
>        .withValues(Serdes.String())
>        .persistent()
>        .build();
> 
> StateStoreSupplier profiles= Stores.create("PR")
>        .withKeys(Serdes.String())
>        .withValues(Serdes.String())
>        .persistent()
>        .build();
> 
> 
> builder
>        .addSource("deltaSource", topicName, LoaderListener.LoadedDeltaTopic)
> 
>        .addProcessor("deltaProcess1", () -> new DeltaProcessor(
> 
>        ), "deltaSource")
>        .addProcessor("deltaProcess2", () -> new LoadProcessor(
> 
>        ), "deltaProcess1")
>        .addStateStore(profiles, "deltaProcess2", "deltaProcess1")
>        .addStateStore(company_bucket, "deltaProcess2", "deltaProcess1");
> 
> KafkaStreams streams = new KafkaStreams(builder, config);
> 
> streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
>    @Override
>    public void uncaughtException(Thread t, Throwable e) {
>        e.printStackTrace();
>    }
> });
> 
> streams.start();
> 
> 
>> On Wed, Nov 23, 2016 at 8:30 PM, David Garcia <davidg@spiceworks.com> wrote:
>> 
>> If you are consuming from more than one topic/partition, punctuate is
>> triggered when the “smallest” time-value changes.  So, if there is a
>> partition that doesn’t have any more messages on it, it will always have
>> the smallest time-value and that time value won’t change…hence punctuate
>> never gets called.
>> 
>> -David
>> 
>> On 11/23/16, 1:01 PM, "Matthias J. Sax" <matthias@confluent.io> wrote:
>> 
>>    Your understanding is correct:
>> 
>>    Punctuate is not triggered base on wall-clock time, but based in
>>    internally tracked "stream time" that is derived from
>> TimestampExtractor.
>>    Even if you use WallclockTimestampExtractor, "stream time" is only
>>    advance if there are input records.
>> 
>>    Not sure why punctuate() is not triggered as you say that you do have
>>    arriving data.
>> 
>>    Can you share your code?
>> 
>> 
>> 
>>    -Matthias
>> 
>> 
>>>    On 11/23/16 4:48 AM, shahab wrote:
>>> Hello,
>>> 
>>> I am using low level processor and I set the context.schedule(10000),
>>> assuming that punctuate() method is invoked every 10 sec .
>>> I have set
>>> configProperties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>> WallclockTimestampExtractor.class.getCanonicalName()) )
>>> 
>>> Although data is keep coming to the topology (as I have logged the
>> incoming
>>> tuples to process() ),  punctuate() is never executed.
>>> 
>>> What I am missing?
>>> 
>>> best,
>>> Shahab
>> 
>> 
>> 
>> 

Mime
View raw message