I think it would be better without such a limitation.I want to consult another problem. When I use BucketingSink（I use aws s3）, the filename of a few files after checkpoint still hasn't changed, resulting in the underline prefix of the final generation of a small number of files. After analysis, it is found that it is due to the eventually consistent of S3.Are there any better solutions available？thanks
If I use ProcessFunction , I can get it! But I want to know that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application .
Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction  to access the timestamp of a record via the ProcessFunction's Context object.