flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Yan <yan.xiao.bin.m...@gmail.com>
Subject Re: Record timestamp from kafka
Date Tue, 10 Apr 2018 11:40:03 GMT


> On Apr 10, 2018, at 7:32 PM, Ben Yan <yan.xiao.bin.mail@gmail.com> wrote:
> 
> Hi Chesnay:
> 
>         I think it would be better without such a limitation.I want to consult another
problem. When I use BucketingSink(I use aws s3), the filename of a few files after checkpoint
still hasn't changed, resulting in the underline prefix of the final generation of a small
number of files. After analysis, it is found that it is due to the eventually consistent 
of S3.Are there any better solutions available?thanks
> See : https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22
<https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22>
 
> Best
> Ben
> 
>> On Apr 10, 2018, at 6:29 PM, Ben Yan <yan.xiao.bin.mail@gmail.com <mailto:yan.xiao.bin.mail@gmail.com>>
wrote:
>> 
>> Hi Fabian.
>> 
>> 	If I use ProcessFunction , I can get it! But I want to know  that how to get Kafka
timestamp in like flatmap and map methods of datastream using scala programming language.
>> Thanks!
>> 
>> Best
>> Ben
>> 
>>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
wrote:
>>> 
>>> Hi Navneeth,
>>> 
>>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you
configure EventTime for an application [1].
>>> Since Flink treats record timestamps as meta data, they are not directly accessible
by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record
via the ProcessFunction's Context object.
>>> 
>>> Best, Fabian
>>> 
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>>> 
>>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.mail@gmail.com <mailto:yan.xiao.bin.mail@gmail.com>>:
>>> hi,
>>> Is that what you mean?
>>> See : https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
<https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>

>>> 
>>> Best
>>> Ben
>>> 
>>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavneeth2@gmail.com
<mailto:reachnavneeth2@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Is there way to get the kafka timestamp in deserialization schema? All records
are written to kafka with timestamp and I would like to set that timestamp to every record
that is ingested. Thanks.
>>> 
>>> 
>> 
> 


Mime
View raw message