flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstantin.kn...@tngtech.com>
Subject Custom TimestampExtractor and FlinkKafkaConsumer082
Date Sun, 15 Nov 2015 21:55:51 GMT
Hi everyone,

I have the following issue with Flink (0.10) and Kafka.

I am using a very simple TimestampExtractor like [1], which just
extracts a millis timestamp from a POJO. In my streaming job, I read in
these POJOs from Kafka using the FlinkKafkaConsumer082 like this:

stream = env.addSource(new FlinkKafkaConsumer082<
                new AvroPojoDeserializationSchema(),

I have timestampEnabled() and the TimeCharacteristics are EventTime,
AutoWatermarkIntervall is 500.

The problem is, when I do something like:

stream.assignTimestamps(new PojoTimestampExtractor(6000))
.timeWindowAll(Time.of(1, TimeUnit.SECONDS)


the windows never get triggered.

If I use ProcessingTime it works.
If I use env.fromCollection(...) instead of the KafkaSource it works
with EventTime, too.

Any ideas what I could be doing wrong are highly appreciated.




public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {

    final private long maxDelay;

    public  PojoTimestampExtractor(long maxDelay) {
        this.maxDelay = maxDelay;

    public long extractTimestamp(Pojo fightEvent, long l) {
        return pojo.getTime();

    public long extractWatermark(Pojo pojo, long l) {
        return pojo.getTime() - maxDelay;

    public long getCurrentWatermark() {
        return Long.MIN_VALUE;

View raw message