flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: WaterMark & Eventwindow not fired correctly
Date Wed, 09 Aug 2017 12:55:13 GMT
Hi,

So when the parallelism of the timestamp assigner is different from the parallelism of the
map(KeyMapFunc()) or the window then it works? But when the parallelism is the same it does
not work?

If this is true, then I would assume, that some parallel instances of the timestamp assigner
don't get any events and therefore don't advance the watermark. This, in turn, would mean
that the downstream watermark also doesn't advance. Could you check in the web interface if
all parallel instances of the assigner are processing elements when you have the same parallelism
for all operations?

Best,
Aljoscha

> On 9. Aug 2017, at 11:33, aitozi <gjying1314@gmail.com> wrote:
> 
> Hi, Bellow is my code 
> 
> splitStream.select(duringTime + "")
>                .map(new KeyMapFunc())
>                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
>                .setParallelism(300)
>                .keyBy(_SQL, _KEY, _SALT)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
>                .apply(new WindowSaltFunc())
>                .keyBy(_SQL, _KEY)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
>                .apply(new WindowFunc())
>                .addSink(new FlinkKafkaProducer010<>("topic", new
> SimpleSerializationSchema(), this.properties));
> 
> and 
> 
> public class DelaySaltWatermarks implements
> AssignerWithPeriodicWatermarks<ContentMessage> {
> 
>    private long currentMaxTimestamp;
> 
>    @Nullable
>    @Override
>    public Watermark getCurrentWatermark() {
>        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
>    }
> 
>    @Override
>    public long extractTimestamp(ContentMessage contentMessage, long l) {
>        long timestamp = contentMessage.getTimestamp();
>        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>        return timestamp;
>    }
> }
> 
> and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
> the window can be fired.
> 
> thanks,
> aitozi
> 
> 
> Aljoscha Krettek wrote
>> Hi,
>> 
>> So I understood that you have roughly this pipeline:
>> 
>> Input 1 --\
>>           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
>> Input 2 --/
>> 
>> If the timestamp assigner is after the CoFlatMap the processInput() method
>> of the extractor should still be called. Not by the StreamInputProcessor
>> but by ChainingOutput [1], which basically connects the Two-Input
>> CoFlatMap to the one-input operator that comes after that. The could still
>> be a bug in there somewhere, however.
>> 
>> Could you maybe send me the relevant parts of your code, so that I can
>> have a look. Or provide a minimal example.
>> 
>> Best,
>> Aljoscha
>> 
>> [1]
>> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
>> 
>>> On 7. Aug 2017, at 19:21, aitozi &lt;
> 
>> gjying1314@
> 
>> &gt; wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> my flink version is 1.2
>>> 
>>> i am work on this problem these days. Below is my found.
>>> 
>>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>>> the before operator, the before operator has two input(it is a
>>> "connected"
>>> Co-FlatMap operator with parallelism 240), it runs into that the
>>> watermark
>>> didn't update.
>>> 
>>> the i look into the source code, that the
>>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>>> has
>>> method with processElement1() and processElement2() method, but all of
>>> them
>>> do not run processElement in StreamInputProcessor to
>>> extractTimestamp(shown
>>> in TimestampsAndPeriodicWatermarksOperator)
>>> 
>>> so that, the timestamp is not update, and my waterMark is update just
>>> like
>>> the class BoundedOutOfOrdernessTimestampExtractor .
>>> 
>>> So, is it a bug that the timestamp is not update when deal with a two
>>> input
>>> stream.
>>> 
>>> Ps: my English is not very good , i dont know can you understand me :)
>>> 
>>> thanks,
>>> aitozi
>>> 
>>> 
>>> 
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com
<http://nabble.com/>.


Mime
View raw message