flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] azagrebin commented on a change in pull request #6556: FLINK-10042][state] Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes
Date Mon, 20 Aug 2018 17:52:13 GMT
azagrebin commented on a change in pull request #6556: FLINK-10042][state] Extract snapshot
algorithms from inner classes of RocksDBKeyedStateBackend into full classes
URL: https://github.com/apache/flink/pull/6556#discussion_r211340145
 
 

 ##########
 File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -508,41 +486,87 @@ public void restore(Collection<KeyedStateHandle> restoreState)
throws Exception
 		restoredKvStateMetaInfos.clear();
 
 		try {
+			RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null;
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
 			} else {
 				KeyedStateHandle firstStateHandle = restoreState.iterator().next();
 				if (firstStateHandle instanceof IncrementalKeyedStateHandle
 					|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
-					RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-					restoreOperation.restore(restoreState);
+					incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+					incrementalRestoreOperation.restore(restoreState);
 				} else {
-					RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
-					restoreOperation.doRestore(restoreState);
+					RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+					fullRestoreOperation.doRestore(restoreState);
 				}
 			}
+
+			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();
 			throw ex;
 		}
 	}
 
-	@Override
-	public void notifyCheckpointComplete(long completedCheckpointId) {
-
-		if (!enableIncrementalCheckpointing) {
-			return;
+	@VisibleForTesting
+	void initializeSnapshotStrategy(
+		@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
+
+		this.savepointSnapshotStrategy =
+			new RocksFullSnapshotStrategy<>(
+				db,
+				rocksDBResourceGuard,
+				keySerializer,
+				kvStateInformation,
+				keyGroupRange,
+				keyGroupPrefixBytes,
+				localRecoveryConfig,
+				cancelStreamRegistry,
+				keyGroupCompressionDecorator);
+
+		if (enableIncrementalCheckpointing) {
+			final UUID backendUID;
+			final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+			final long lastCompletedCheckpointId;
+
+			if (incrementalRestoreOperation == null) {
+				backendUID = UUID.randomUUID();
+				materializedSstFiles = new TreeMap<>();
+				lastCompletedCheckpointId = -1L;
+			} else {
+				backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
 
 Review comment:
   I suggest to consider putting (backendUID, materializedSstFiles, lastCompletedCheckpointId)
into some kind of `IncSnapContext`, keep it in backend (empty/random in constructor), rewrite
it in `restore` from `incrementalRestoreOperation` where it is restored.
   
   Also `snapshot` could call some kind of factory method for creating strategy depending
on the config right before performing it, if I do not miss some other tricky thing between
restore and next snapshot. I think it will be more clear.
   
   In general, maybe later, it could be alternatively considered to create some overclass
managing interactions between snapshot strategy and restore operation of certain type: sync/async
where this `IncSnapContext` could go. Backend could switch between them depending on config.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message