flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Timestamp from Kafka record and watermark generation
Date Fri, 23 Feb 2018 13:45:55 GMT

This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500 <https://issues.apache.org/jira/browse/FLINK-8500>.
And yes, the workaround is to write an assigner from scratch but you can start by copying
the code of AscendingTimestampExtractor.

Sorry for the inconvenience.


> On 22. Feb 2018, at 12:05, Federico D'Ambrosio <fedexist@gmail.com> wrote:
> Hello everyone,
> I'm consuming from a Kafka topic, on which I'm writing with a FlinkKafkaProducer, with
the timestamp relative flag set to true.
> From what I gather from the documentation [1], Flink is aware of Kafka Record's timestamp
and only the watermark should be set with an appropriate TimestampExtractor, still I'm failing
to understand how to implement it in the right way.
> I thought that it would be possible to use the already existent AscendingTimestampExtractor,
overriding the extractTimestamp method, but it's marked final. 
> new FlinkKafkaConsumer010[Event](ingestion_topic, new JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>       def extractAscendingTimestamp(element: Event): Long = ???
>     })
> Should I need to implement my own TimestampExtractor (with the appropriate getCurrentWatermark
and extractTimestamp methods) ? 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
> Thank you,
> Federico

View raw message