flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
Date Thu, 12 Jul 2018 13:38:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541656#comment-16541656
] 

ASF GitHub Bot commented on FLINK-9701:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6313#discussion_r202036711
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
---
    @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
     
     	@Override
     	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
    -		return entriesStream()::iterator;
    +		return entries(e -> e);
     	}
     
    -	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
    +	private <R> Iterable<R> entries(
    +		Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
     		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
    -		withTs = withTs == null ? Collections.emptyList() : withTs;
    -		return StreamSupport
    -			.stream(withTs.spliterator(), false)
    -			.filter(this::unexpiredAndUpdateOrCleanup)
    -			.map(TtlMapState::unwrapWithoutTs);
    -	}
    -
    -	private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>>
e) {
    -		UV unexpiredValue;
    -		try {
    -			unexpiredValue = getWithTtlCheckAndUpdate(
    -				e::getValue,
    -				v -> original.put(e.getKey(), v),
    -				() -> original.remove(e.getKey()));
    -		} catch (Exception ex) {
    -			throw new FlinkRuntimeException(ex);
    -		}
    -		return unexpiredValue != null;
    -	}
    -
    -	private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK,
TtlValue<UV>> e) {
    -		return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
    +		return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList()
: withTs, resultMapper);
     	}
     
     	@Override
     	public Iterable<UK> keys() throws Exception {
    -		return entriesStream().map(Map.Entry::getKey)::iterator;
    +		return entries(Map.Entry::getKey);
     	}
     
     	@Override
     	public Iterable<UV> values() throws Exception {
    -		return entriesStream().map(Map.Entry::getValue)::iterator;
    +		return entries(Map.Entry::getValue);
     	}
     
     	@Override
     	public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
    -		return entriesStream().iterator();
    +		return entries().iterator();
     	}
     
     	@Override
     	public void clear() {
     		original.clear();
     	}
    +
    +	private class EntriesIterator<R> implements Iterator<R> {
    +		private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator;
    +		private final Function<Map.Entry<UK, UV>, R> resultMapper;
    +		private Map.Entry<UK, UV> nextUnexpired = null;
    +		private boolean rightAfterNextIsCalled = false;
    +
    +		private EntriesIterator(
    +			@Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
    +			@Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
    +			this.originalIterator = withTs.iterator();
    +			this.resultMapper = resultMapper;
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			rightAfterNextIsCalled = false;
    +			while (nextUnexpired == null && originalIterator.hasNext()) {
    +				nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next());
    +			}
    +			return nextUnexpired != null;
    +		}
    +
    +		@Override
    +		public R next() {
    +			if (hasNext()) {
    +				rightAfterNextIsCalled = true;
    +				R result = resultMapper.apply(nextUnexpired);
    +				nextUnexpired = null;
    +				return result;
    +			}
    +			throw new NoSuchElementException();
    +		}
    +
    +		@Override
    +		public void remove() {
    +			if (rightAfterNextIsCalled) {
    --- End diff --
    
    I think this a problematic for example in the sequence `hasNext()`, ``next()`, `hasNext()`,
`remove()` which is a valid interaction.


> Activate TTL in state descriptors
> ---------------------------------
>
>                 Key: FLINK-9701
>                 URL: https://issues.apache.org/jira/browse/FLINK-9701
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message