flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl0u <...@git.apache.org>
Subject [GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Date Thu, 25 Jan 2018 15:13:40 GMT
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163870666
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
    @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
     		}
     	}
     
    +	private <K, V> BroadcastState<K, V> getBroadcastState(
    +			final MapStateDescriptor<K, V> stateDescriptor,
    +			final OperatorStateHandle.Mode mode) throws StateMigrationException {
    +
    +		Preconditions.checkNotNull(stateDescriptor);
    +		String name = Preconditions.checkNotNull(stateDescriptor.getName());
    +
    +		@SuppressWarnings("unchecked")
    +		BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K,
V>) accessedBroadcastStatesByName.get(name);
    +		if (previous != null) {
    +			checkStateNameAndMode(
    +					previous.getStateMetaInfo().getName(),
    +					name,
    +					previous.getStateMetaInfo().getAssignmentMode(),
    +					mode);
    +			return previous;
    +		}
    +
    +		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    +		TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    +		TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    +
    +		BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K,
V>) registeredBroadcastStates.get(name);
    +
    +		if (broadcastState == null) {
    +			broadcastState = new HeapBroadcastState<>(
    +					new RegisteredBroadcastBackendStateMetaInfo<>(
    +							name,
    +							mode,
    +							broadcastStateKeySerializer,
    +							broadcastStateValueSerializer));
    +			registeredBroadcastStates.put(name, broadcastState);
    +		} else {
    --- End diff --
    
    No, because we have the `accessedBroadcastStatesByName.get(name)` above (line 641). 
    
    As soon as we create or restore the broadcast state, we put it there (line 708). The next
time we will try to access it, we will hit the cache (`accessedBroadcastStatesByName`) so
we will not go through the creation/check phase.


---

Mime
View raw message