kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: kafka-streams punctuate with WALL_CLOCK_TIME triggered immediately
Date Thu, 07 Dec 2017 02:19:19 GMT
Hello Fred,

Thanks for reporting the issue.

1) Nice find about the punctuation start time with WALL_CLOCK_TIME type. I
agree with you that this should better be initialized as current time
+ interval.
Do you mind creating a JIRA for Kafka? And if you'd like to submit a patch
for it that would be best :)

2) About the OOM error, if it is consistently reproducible could you make a
thread dump upon the exception? That would help me to investigate further
to see if there is anything from the Streams library behind the scene. I
took a pass over the code but I cannot find any obvious leaks. Note that in
the PunctuationQueue#mayPunctuate

PunctuationSchedule sched = top;

if (!sched.isCancelled()) {
    processorNodePunctuator.punctuate(sched.node(), timestamp, type,
    punctuated = true;

I.e. the cancelled punctuation is still popped from the queue before
checking its "cancelled" flag.


On Tue, Dec 5, 2017 at 7:42 PM, frederic arno <fredericarno@gmail.com>

> Hello all,
> I am using kafka and kafka-streams 1.0.0
> When working on a custom Processor from which I am scheduling a
> punctuation using WALL_CLOCK_TIME. I've noticed that whatever the
> punctuation interval I set, a call to my Punctuator is always
> triggered immediately. Is that a bug?
> Having a quick look at kafka-streams' code, I could find that all
> PunctuationSchedule's timestamps are matched against the current time
> in order to decide whether or not to trigger the punctuator
> (org.apache.kafka.streams.processor.internals.PunctuationQue
> ue#mayPunctuate).
> However, I've only seen code that initializes PunctuationSchedule's
> timestamp to 0, which I guess is what is causing an immediate
> punctuation. At least when using WALL_CLOCK_TIME, shouldn't the
> PunctuationSchedule's timestamp be initialized to current time +
> interval?
> I am also hitting an OutOfMemoryError when running integration tests:
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.Arrays.copyOf(Arrays.java:3181)
>         at java.util.PriorityQueue.grow(PriorityQueue.java:300)
>         at java.util.PriorityQueue.offer(PriorityQueue.java:339)
>         at java.util.PriorityQueue.add(PriorityQueue.java:321)
>         at org.apache.kafka.streams.processor.internals.PunctuationQueu
> e.mayPunctuate(PunctuationQueue.java:55)
>         at org.apache.kafka.streams.processor.internals.StreamTask.mayb
> ePunctuateSystemTime(StreamTask.java:619)
>         at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuate(AssignedTasks.java:430)
>         at org.apache.kafka.streams.processor.internals.TaskManager.
> punctuate(TaskManager.java:324)
>         at org.apache.kafka.streams.processor.internals.StreamThread.
> punctuate(StreamThread.java:969)
>         at org.apache.kafka.streams.processor.internals.StreamThread.
> runOnce(StreamThread.java:834)
>         at org.apache.kafka.streams.processor.internals.StreamThread.
> runLoop(StreamThread.java:774)
>         at org.apache.kafka.streams.processor.internals.StreamThread.
> run(StreamThread.java:744)
> I am only using WALL_CLOCK_TIME punctuation type, from a single
> processor (4 instances are running as I have 4 partitions on the
> processed topic). The punctuation interval is set to 1 minute, and I
> am canceling the scheduler at each punctuation, re-scheduling a new
> one (dealing with variable intervals).
> Although I run my tests in a JVM with little heap space, I am
> wondering if there could be a memory leak around there as I've not
> seen where canceled PunctuationSchedule are removed from the
> PunctuationQueue...
> Thank you, Fred

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message