flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljoscha <...@git.apache.org>
Subject [GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...
Date Thu, 25 Jan 2018 14:12:35 GMT
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163840020
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
    @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
     		}
     	}
     
    +	private <K, V> BroadcastState<K, V> getBroadcastState(
    +			final MapStateDescriptor<K, V> stateDescriptor,
    +			final OperatorStateHandle.Mode mode) throws StateMigrationException {
    +
    +		Preconditions.checkNotNull(stateDescriptor);
    +		String name = Preconditions.checkNotNull(stateDescriptor.getName());
    +
    +		@SuppressWarnings("unchecked")
    +		BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K,
V>) accessedBroadcastStatesByName.get(name);
    +		if (previous != null) {
    +			checkStateNameAndMode(
    +					previous.getStateMetaInfo().getName(),
    +					name,
    +					previous.getStateMetaInfo().getAssignmentMode(),
    +					mode);
    +			return previous;
    +		}
    +
    +		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    +		TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    +		TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    +
    +		BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K,
V>) registeredBroadcastStates.get(name);
    +
    +		if (broadcastState == null) {
    +			broadcastState = new HeapBroadcastState<>(
    +					new RegisteredBroadcastBackendStateMetaInfo<>(
    +							name,
    +							mode,
    +							broadcastStateKeySerializer,
    +							broadcastStateValueSerializer));
    +			registeredBroadcastStates.put(name, broadcastState);
    +		} else {
    --- End diff --
    
    Does this to the compatibility-check dance every time the state is accessed? Might be
a bit to much and we could do it only the first time a state is accessed after restore.


---

Mime
View raw message