flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
Date Sat, 14 Jul 2018 16:27:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544257#comment-16544257
] 

ASF GitHub Bot commented on FLINK-9489:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6333#discussion_r202518473
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
---
    @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe
     	}
     
     	@SuppressWarnings("unchecked")
    -	public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot)
{
    +	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot)
{
     		this(
     			snapshot.getName(),
     			OperatorStateHandle.Mode.valueOf(
     				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
    -			(TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
    -			(TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
    +			(TypeSerializer<K>) Preconditions.checkNotNull(
    +				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
    +			(TypeSerializer<V>) Preconditions.checkNotNull(
    +				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
     		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
     	}
     
     	/**
     	 * Creates a deep copy of the itself.
     	 */
    -	public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
    -		return new RegisteredBroadcastBackendStateMetaInfo<>(this);
    +	@Nonnull
    +	public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
    +		return new RegisteredBroadcastStateBackendMetaInfo<>(this);
     	}
     
     	@Nonnull
     	@Override
     	public StateMetaInfoSnapshot snapshot() {
    -		Map<String, String> optionsMap = Collections.singletonMap(
    -			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
    -			assignmentMode.toString());
    -		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
    -		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new
HashMap<>(2);
    -		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
    -		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
    -		serializerMap.put(keySerializerKey, keySerializer.duplicate());
    -		serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
    -		serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
    -		serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
    -
    -		return new StateMetaInfoSnapshot(
    -			name,
    -			StateMetaInfoSnapshot.BackendStateType.BROADCAST,
    -			optionsMap,
    -			serializerConfigSnapshotsMap,
    -			serializerMap);
    +		if (precomputedSnapshot == null) {
    +			precomputedSnapshot = precomputeSnapshot();
    +		}
    +		return precomputedSnapshot;
    --- End diff --
    
    What if the serializers are not all immutable? Should we need a `immutable` field for
it? Only when it is true we return the `precomputeSnapshot`.


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-9489
>                 URL: https://issues.apache.org/jira/browse/FLINK-9489
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., stored
inside the managed keyed state. This means that we have to connect our preparation for asynchronous
checkpoints with the backend, so that the timers are written as part of the state for each
key-group. This means that we will also free up the raw keyed state an might expose it to
user functions in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message