flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Record timestamp from kafka
Date Tue, 10 Apr 2018 10:32:07 GMT
You must use a ProcessFunction for this, the timestamps are not exposed 
in any way to map/flatmap functions.

On 10.04.2018 12:29, Ben Yan wrote:
> Hi Fabian.
>
> If I use ProcessFunction , I can get it! But I want to know  that how 
> to get Kafka timestamp in like flatmap and map methods of datastream 
> using scala programming language.
> Thanks!
>
> Best
> Ben
>
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhueske@gmail.com 
>> <mailto:fhueske@gmail.com>> wrote:
>>
>> Hi Navneeth,
>>
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion 
>> timestamp if you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not 
>> directly accessible by most functions. You can implement a 
>> ProcessFunction [2] to access the timestamp of a record via the 
>> ProcessFunction's Context object.
>>
>> Best, Fabian
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>
>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com 
>> <mailto:yan.xiao.bin.mail@gmail.com>>:
>>
>>     hi,
>>     Is that what you mean?
>>     See :
>>     https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
>>     <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>>
>>
>>     Best
>>     Ben
>>
>>>     On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan
>>>     <reachnavneeth2@gmail.com <mailto:reachnavneeth2@gmail.com>>
wrote:
>>>
>>>     Hi,
>>>
>>>     Is there way to get the kafka timestamp in deserialization
>>>     schema? All records are written to kafka with timestamp and I
>>>     would like to set that timestamp to every record that is
>>>     ingested. Thanks.
>>
>>
>


Mime
View raw message