Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/6272 Thanks for fixing this potential bug. It makes sense to handle exception during `notifyBufferAvailable` on listener side. Just some thoughts on my side above. :) ---