flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality
Date Mon, 14 Nov 2016 13:24:06 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r87798796
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
---
    @@ -18,28 +18,77 @@
     package org.apache.flink.streaming.runtime.operators.windowing;
     
     import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
     
     /**
      * Stores the value and the timestamp of the record.
    + * 
      * @param <T> The type encapsulated value
      */
     @PublicEvolving
     public class TimestampedValue<T> {
     
    +	/** The actual value held by this record */
     	private T value;
    +
    +	/** The timestamp of the record */
     	private long timestamp;
     
    +	/** Flag whether the timestamp is actually set */
    +	private boolean hasTimestamp;
    +
    +	/**
    +	 * Creates a new TimestampedValue. The record does not have a timestamp.
    +	 */
    +	public TimestampedValue(T value) {
    +		this.value = value;
    +	}
    +
    +	/**
    +	 * Creates a new TimestampedValue wrapping the given value. The timestamp is set to
the
    +	 * given timestamp.
    +	 *
    +	 * @param value The value to wrap in this {@link TimestampedValue}
    +	 * @param timestamp The timestamp in milliseconds
    +	 */
     	public TimestampedValue(T value, long timestamp) {
     		this.value = value;
     		this.timestamp = timestamp;
    +		this.hasTimestamp = true;
     	}
     
    +	/**
    +	 * @return The value wrapped in this {@link TimestampedValue}.
    +	 */
     	public T getValue() {
     		return value;
     	}
     
    +	/**
    +	 * @return The timestamp associated with this stream value in milliseconds.
    +     */
     	public long getTimestamp() {
    --- End diff --
    
    We should throw this exception here:
    ```
    throw new IllegalStateException(
    					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or
" +
    							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
    ```
    
    and in the `TimeEvictor` check whether the elements have timestamps using `hasTimestamp()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message