flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4174) Enhance Window Evictor
Date Mon, 14 Nov 2016 13:24:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15663902#comment-15663902
] 

ASF GitHub Bot commented on FLINK-4174:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2736#discussion_r87798796
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/TimestampedValue.java
---
    @@ -18,28 +18,77 @@
     package org.apache.flink.streaming.runtime.operators.windowing;
     
     import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
     
     /**
      * Stores the value and the timestamp of the record.
    + * 
      * @param <T> The type encapsulated value
      */
     @PublicEvolving
     public class TimestampedValue<T> {
     
    +	/** The actual value held by this record */
     	private T value;
    +
    +	/** The timestamp of the record */
     	private long timestamp;
     
    +	/** Flag whether the timestamp is actually set */
    +	private boolean hasTimestamp;
    +
    +	/**
    +	 * Creates a new TimestampedValue. The record does not have a timestamp.
    +	 */
    +	public TimestampedValue(T value) {
    +		this.value = value;
    +	}
    +
    +	/**
    +	 * Creates a new TimestampedValue wrapping the given value. The timestamp is set to
the
    +	 * given timestamp.
    +	 *
    +	 * @param value The value to wrap in this {@link TimestampedValue}
    +	 * @param timestamp The timestamp in milliseconds
    +	 */
     	public TimestampedValue(T value, long timestamp) {
     		this.value = value;
     		this.timestamp = timestamp;
    +		this.hasTimestamp = true;
     	}
     
    +	/**
    +	 * @return The value wrapped in this {@link TimestampedValue}.
    +	 */
     	public T getValue() {
     		return value;
     	}
     
    +	/**
    +	 * @return The timestamp associated with this stream value in milliseconds.
    +     */
     	public long getTimestamp() {
    --- End diff --
    
    We should throw this exception here:
    ```
    throw new IllegalStateException(
    					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or
" +
    							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
    ```
    
    and in the `TimeEvictor` check whether the elements have timestamps using `hasTimestamp()`.


> Enhance Window Evictor
> ----------------------
>
>                 Key: FLINK-4174
>                 URL: https://issues.apache.org/jira/browse/FLINK-4174
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: vishnu viswanath
>            Assignee: vishnu viswanath
>
> Enhance the current functionality of Evictor as per this [design document|https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAjm5i9E4A_JlU/edit].
> This includes:
> - Allow eviction of elements from the window in any order (not only from the beginning).
To do this Evictor must go through the list of elements and remove the elements that have
to be evicted instead of the current approach of : returning the count of elements to be removed
from beginning.
> - Allow eviction to be done before/after applying the window function.
> FLIP page for this enhancement : [FLIP-4|https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message