kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From frederic arno <frederica...@gmail.com>
Subject Re: kafka-streams punctuate with WALL_CLOCK_TIME triggered immediately
Date Thu, 07 Dec 2017 06:24:40 GMT
Hello Guozhang,

Thanks for your reply.

I created the following issue: https://issues.apache.org/jira/browse/KAFKA-6323

I also further analyzed the memory problem and found out that it is a
non-issue. It was only a consequence of the above issue and happened
because at each punctuation I immediately canceled the scheduler and
created a new one, this was leading to a huge GC pressure (a few calls
to context.schedule() every milliseconds) and the apparent memory
problem.

You are absolutely right about PunctuationSchedule being always popped
from the queue.

Thanks, Fred

On Thu, Dec 7, 2017 at 10:19 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
> Hello Fred,
>
> Thanks for reporting the issue.
>
> 1) Nice find about the punctuation start time with WALL_CLOCK_TIME type. I
> agree with you that this should better be initialized as current time
> + interval.
> Do you mind creating a JIRA for Kafka? And if you'd like to submit a patch
> for it that would be best :)
>
> 2) About the OOM error, if it is consistently reproducible could you make a
> thread dump upon the exception? That would help me to investigate further
> to see if there is anything from the Streams library behind the scene. I
> took a pass over the code but I cannot find any obvious leaks. Note that in
> the PunctuationQueue#mayPunctuate
>
>
> PunctuationSchedule sched = top;
> pq.poll();
>
>
> if (!sched.isCancelled()) {
>     processorNodePunctuator.punctuate(sched.node(), timestamp, type,
> sched.punctuator());
>     pq.add(sched.next(timestamp));
>     punctuated = true;
> }
>
>
> I.e. the cancelled punctuation is still popped from the queue before
> checking its "cancelled" flag.
>
>
> Guozhang
>
>
>
> On Tue, Dec 5, 2017 at 7:42 PM, frederic arno <fredericarno@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am using kafka and kafka-streams 1.0.0
>>
>> When working on a custom Processor from which I am scheduling a
>> punctuation using WALL_CLOCK_TIME. I've noticed that whatever the
>> punctuation interval I set, a call to my Punctuator is always
>> triggered immediately. Is that a bug?
>>
>> Having a quick look at kafka-streams' code, I could find that all
>> PunctuationSchedule's timestamps are matched against the current time
>> in order to decide whether or not to trigger the punctuator
>> (org.apache.kafka.streams.processor.internals.PunctuationQue
>> ue#mayPunctuate).
>> However, I've only seen code that initializes PunctuationSchedule's
>> timestamp to 0, which I guess is what is causing an immediate
>> punctuation. At least when using WALL_CLOCK_TIME, shouldn't the
>> PunctuationSchedule's timestamp be initialized to current time +
>> interval?
>>
>> I am also hitting an OutOfMemoryError when running integration tests:
>> java.lang.OutOfMemoryError: Java heap space
>>         at java.util.Arrays.copyOf(Arrays.java:3181)
>>         at java.util.PriorityQueue.grow(PriorityQueue.java:300)
>>         at java.util.PriorityQueue.offer(PriorityQueue.java:339)
>>         at java.util.PriorityQueue.add(PriorityQueue.java:321)
>>         at org.apache.kafka.streams.processor.internals.PunctuationQueu
>> e.mayPunctuate(PunctuationQueue.java:55)
>>         at org.apache.kafka.streams.processor.internals.StreamTask.mayb
>> ePunctuateSystemTime(StreamTask.java:619)
>>         at org.apache.kafka.streams.processor.internals.AssignedTasks.
>> punctuate(AssignedTasks.java:430)
>>         at org.apache.kafka.streams.processor.internals.TaskManager.
>> punctuate(TaskManager.java:324)
>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>> punctuate(StreamThread.java:969)
>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>> runOnce(StreamThread.java:834)
>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:774)
>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:744)
>>
>> I am only using WALL_CLOCK_TIME punctuation type, from a single
>> processor (4 instances are running as I have 4 partitions on the
>> processed topic). The punctuation interval is set to 1 minute, and I
>> am canceling the scheduler at each punctuation, re-scheduling a new
>> one (dealing with variable intervals).
>> Although I run my tests in a JVM with little heap space, I am
>> wondering if there could be a memory leak around there as I've not
>> seen where canceled PunctuationSchedule are removed from the
>> PunctuationQueue...
>>
>> Thank you, Fred
>>
>
>
>
> --
> -- Guozhang

Mime
View raw message