Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6313#discussion_r202036711
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- return entriesStream()::iterator;
+ return entries(e -> e);
}
- private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+ private <R> Iterable<R> entries(
+ Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
- withTs = withTs == null ? Collections.emptyList() : withTs;
- return StreamSupport
- .stream(withTs.spliterator(), false)
- .filter(this::unexpiredAndUpdateOrCleanup)
- .map(TtlMapState::unwrapWithoutTs);
- }
-
- private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>>
e) {
- UV unexpiredValue;
- try {
- unexpiredValue = getWithTtlCheckAndUpdate(
- e::getValue,
- v -> original.put(e.getKey(), v),
- () -> original.remove(e.getKey()));
- } catch (Exception ex) {
- throw new FlinkRuntimeException(ex);
- }
- return unexpiredValue != null;
- }
-
- private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK,
TtlValue<UV>> e) {
- return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
+ return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList()
: withTs, resultMapper);
}
@Override
public Iterable<UK> keys() throws Exception {
- return entriesStream().map(Map.Entry::getKey)::iterator;
+ return entries(Map.Entry::getKey);
}
@Override
public Iterable<UV> values() throws Exception {
- return entriesStream().map(Map.Entry::getValue)::iterator;
+ return entries(Map.Entry::getValue);
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
- return entriesStream().iterator();
+ return entries().iterator();
}
@Override
public void clear() {
original.clear();
}
+
+ private class EntriesIterator<R> implements Iterator<R> {
+ private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator;
+ private final Function<Map.Entry<UK, UV>, R> resultMapper;
+ private Map.Entry<UK, UV> nextUnexpired = null;
+ private boolean rightAfterNextIsCalled = false;
+
+ private EntriesIterator(
+ @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
+ @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+ this.originalIterator = withTs.iterator();
+ this.resultMapper = resultMapper;
+ }
+
+ @Override
+ public boolean hasNext() {
+ rightAfterNextIsCalled = false;
+ while (nextUnexpired == null && originalIterator.hasNext()) {
+ nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+ }
+ return nextUnexpired != null;
+ }
+
+ @Override
+ public R next() {
+ if (hasNext()) {
+ rightAfterNextIsCalled = true;
+ R result = resultMapper.apply(nextUnexpired);
+ nextUnexpired = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ if (rightAfterNextIsCalled) {
--- End diff --
I think this a problematic for example in the sequence `hasNext()`, ``next()`, `hasNext()`,
`remove()` which is a valid interaction.
---
|