flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10042) Extract snapshot algorithms from inner classes into full classes
Date Mon, 20 Aug 2018 17:53:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586301#comment-16586301
] 

ASF GitHub Bot commented on FLINK-10042:
----------------------------------------

azagrebin commented on a change in pull request #6556: FLINK-10042][state] Extract snapshot
algorithms from inner classes of RocksDBKeyedStateBackend into full classes
URL: https://github.com/apache/flink/pull/6556#discussion_r211340145
 
 

 ##########
 File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -508,41 +486,87 @@ public void restore(Collection<KeyedStateHandle> restoreState)
throws Exception
 		restoredKvStateMetaInfos.clear();
 
 		try {
+			RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null;
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
 			} else {
 				KeyedStateHandle firstStateHandle = restoreState.iterator().next();
 				if (firstStateHandle instanceof IncrementalKeyedStateHandle
 					|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
-					RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-					restoreOperation.restore(restoreState);
+					incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+					incrementalRestoreOperation.restore(restoreState);
 				} else {
-					RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
-					restoreOperation.doRestore(restoreState);
+					RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+					fullRestoreOperation.doRestore(restoreState);
 				}
 			}
+
+			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();
 			throw ex;
 		}
 	}
 
-	@Override
-	public void notifyCheckpointComplete(long completedCheckpointId) {
-
-		if (!enableIncrementalCheckpointing) {
-			return;
+	@VisibleForTesting
+	void initializeSnapshotStrategy(
+		@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
+
+		this.savepointSnapshotStrategy =
+			new RocksFullSnapshotStrategy<>(
+				db,
+				rocksDBResourceGuard,
+				keySerializer,
+				kvStateInformation,
+				keyGroupRange,
+				keyGroupPrefixBytes,
+				localRecoveryConfig,
+				cancelStreamRegistry,
+				keyGroupCompressionDecorator);
+
+		if (enableIncrementalCheckpointing) {
+			final UUID backendUID;
+			final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+			final long lastCompletedCheckpointId;
+
+			if (incrementalRestoreOperation == null) {
+				backendUID = UUID.randomUUID();
+				materializedSstFiles = new TreeMap<>();
+				lastCompletedCheckpointId = -1L;
+			} else {
+				backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
 
 Review comment:
   I suggest to consider putting (backendUID, materializedSstFiles, lastCompletedCheckpointId)
into some kind of `IncSnapContext`, keep it in backend (empty/random in constructor), rewrite
it in `restore` from `incrementalRestoreOperation` where it is restored.
   
   Also `snapshot` could call some kind of factory method for creating strategy depending
on the config right before performing it, if I do not miss some other tricky thing between
restore and next snapshot. I think it will be more clear.
   
   In general, maybe later, it could be alternatively considered to create some overclass
managing interactions between snapshot strategy and restore operation of certain type: sync/async
where this `IncSnapContext` could go. Backend could switch between them depending on config.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Extract snapshot algorithms from inner classes into full classes
> ----------------------------------------------------------------
>
>                 Key: FLINK-10042
>                 URL: https://issues.apache.org/jira/browse/FLINK-10042
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message