flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6563) Expose time indicator attributes in the KafkaTableSource
Date Tue, 19 Sep 2017 08:42:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171319#comment-16171319
] 

ASF GitHub Bot commented on FLINK-6563:
---------------------------------------

Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r139629938
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
---
    @@ -106,8 +240,191 @@
     		return deserializationSchema;
     	}
     
    -	@Override
    -	public String explainSource() {
    -		return "";
    +	/**
    +	 * Assigns ingestion time timestamps and watermarks.
    +	 */
    +	public static class IngestionTimeWatermarkAssigner implements AssignerWithPeriodicWatermarks<Row>
{
    +
    +		private long curTime = Long.MIN_VALUE;
    +
    +		@Override
    +		public long extractTimestamp(Row element, long previousElementTimestamp) {
    +			long t = System.currentTimeMillis();
    +			if (t > curTime) {
    +				curTime = t;
    +			}
    +			return curTime;
    +		}
    +
    +		@Nullable
    +		@Override
    +		public Watermark getCurrentWatermark() {
    +			return new Watermark(curTime - 1);
    +		}
    +	}
    +
    +	protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
    +		return this.timestampAssigner;
    +	}
    +
    +	/**
    +	 * Checks that the provided row time attribute is valid, determines its position in
the schema,
    +	 * and adjusts the return type.
    +	 *
    +	 * @param rowtime The attribute to check.
    +	 */
    +	private void configureRowTimeAttribute(String rowtime) {
    +		Preconditions.checkNotNull(rowtime, "Row time attribute must not be null.");
    +
    +		if (this.ingestionTimeAttribute != null) {
    +			throw new ValidationException(
    +				"You can only specify a row time attribute OR an ingestion time attribute.");
    +		}
    +
    +		if (this.rowTimeAttribute != null) {
    +			throw new ValidationException(
    +				"Row time attribute can only be specified once.");
    +		}
    +
    +		// get current fields
    +		String[] fieldNames = ((RowTypeInfo) this.getReturnType()).getFieldNames();
    +		TypeInformation[] fieldTypes = ((RowTypeInfo) this.getReturnType()).getFieldTypes();
    +
    +		// check if the rowtime field exists and remember position
    +		this.rowtimeFieldPos = -1;
    --- End diff --
    
    As of today the answer is no -- we only need a single, top-level field. There might be
use cases that have the rowtime in the nested fields. `TimestampAssigner` does come handy
as the flexibility is quite important to us.
    
    Maybe it makes sense to provide some default implementation of `TimestampAssigner` to
extract a field in the row in order to make it easier to use?
    



> Expose time indicator attributes in the KafkaTableSource
> --------------------------------------------------------
>
>                 Key: FLINK-6563
>                 URL: https://issues.apache.org/jira/browse/FLINK-6563
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>            Priority: Critical
>             Fix For: 1.4.0
>
>
> This is a follow up for FLINK-5884.
> After FLINK-5884 requires the {{TableSource}} interfaces to expose the processing time
and the event time for the data stream. This jira proposes to expose these two information
in the Kafka table source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message