flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.
Date Thu, 25 Jan 2018 14:13:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339257#comment-16339257
] 

ASF GitHub Bot commented on FLINK-8345:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163840020
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
    @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
     		}
     	}
     
    +	private <K, V> BroadcastState<K, V> getBroadcastState(
    +			final MapStateDescriptor<K, V> stateDescriptor,
    +			final OperatorStateHandle.Mode mode) throws StateMigrationException {
    +
    +		Preconditions.checkNotNull(stateDescriptor);
    +		String name = Preconditions.checkNotNull(stateDescriptor.getName());
    +
    +		@SuppressWarnings("unchecked")
    +		BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K,
V>) accessedBroadcastStatesByName.get(name);
    +		if (previous != null) {
    +			checkStateNameAndMode(
    +					previous.getStateMetaInfo().getName(),
    +					name,
    +					previous.getStateMetaInfo().getAssignmentMode(),
    +					mode);
    +			return previous;
    +		}
    +
    +		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    +		TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    +		TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    +
    +		BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K,
V>) registeredBroadcastStates.get(name);
    +
    +		if (broadcastState == null) {
    +			broadcastState = new HeapBroadcastState<>(
    +					new RegisteredBroadcastBackendStateMetaInfo<>(
    +							name,
    +							mode,
    +							broadcastStateKeySerializer,
    +							broadcastStateValueSerializer));
    +			registeredBroadcastStates.put(name, broadcastState);
    +		} else {
    --- End diff --
    
    Does this to the compatibility-check dance every time the state is accessed? Might be
a bit to much and we could do it only the first time a state is accessed after restore.


> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-8345
>                 URL: https://issues.apache.org/jira/browse/FLINK-8345
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message