flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: assignTimestampsAndWatermarks not working as expected
Date Thu, 04 May 2017 16:49:47 GMT
Hi Jayesh,

Glad that it finally worked! 

From a first look, I cannot spot anything wrong with the code itself.
The only thing I have to note is that the locations of the logs and the printouts  you put
in your code differ and normally they are not printed in the console.


> On May 4, 2017, at 6:45 PM, Jayesh Patel <jpatel@keywcorp.com> wrote:
> I figured out what’s wrong – there was a silly mistake on my side.  There is nothing
wrong with the code  here, but please do let me know if you see anything wrong with my approach.
> Thank you.
> From: Jayesh Patel 
> Sent: Thursday, May 04, 2017 10:00 AM
> To: 'user@flink.apache.org' <user@flink.apache.org>
> Subject: assignTimestampsAndWatermarks not working as expected
> Can anybody see what’s wrong with the following code?  I am using Flink 1.2 and have
tried running it in Eclipse (local mode) as well as on a 3 node cluster and it’s not behaving
as expected.
> The idea is to have a custom source collect messages from a JMS topic (I have a fake
source for now that generates some out of order messages with event time that is not delayed
more than 5 seconds).  The source doesn’t collectWithTimestamp() or emitWatermark().
> The messages (events) include the event time.  In order to allow for late or out of order
messages I use assignTimestampsAndWatermarks with BoundedOutOfOrdernessTimestampExtractor
and the extractTimestamp() method retrieves the event time from the event.
> When I run this job, I don’t get the printout from the extractTimestamp() method, nor
do I get the logTuples.print() or stampedLogs.print() output.  When running on the local environment(Eclipse)
I do see the printouts from the fake source (MockSource – not shown here).  But I don’t
even get those when run from my 3 node cluster with parallelism of 3.
> public static void main(String[] args) throws Exception {
>        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>        env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, didn’t
affect the behavior
>        DataStream<Message> logs = env.addSource(new MockSource());
>        DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new ParseEvent());
>        logTuples.print();
>        DataStream<Tuple2<String, CEFEvent>> stampedLogs = logTuples.assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5))
>                      private static final long serialVersionUID = 1L;
>                      @Override
>                      public long extractTimestamp(Tuple2<String,CEFEvent> element)
>                             // This is how to extract timestamp from the event
>                            long eventTime = element.f1.getEventStartTime().toInstant().toEpochMilli();
>                            System.out.println("returning event time " + eventTime);
>                            return eventTime;
>                      }});
>        stampedLogs.print();
>        env.execute(“simulation”);
> }
> Thank you,
> Jayesh

View raw message