flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Date Mon, 02 Jul 2018 09:49:48 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6194#discussion_r199441371
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
---
    @@ -842,6 +852,100 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws
Throwable
     		}
     	}
     
    +	@Test
    +	public void testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws
Exception {
    +
    +		final long checkpointId = 42L;
    +		final long timestamp = 1L;
    +
    +		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
    +		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
    +
    +		// mock the operators
    +		StreamOperator<?> statelessOperator =
    +			mock(StreamOperator.class);
    +
    +		final OperatorID operatorID = new OperatorID();
    +		when(statelessOperator.getOperatorID()).thenReturn(operatorID);
    +
    +		// mock the returned empty snapshot result (all state handles are null)
    +		OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures();
    +		when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class),
any(CheckpointStreamFactory.class)))
    +			.thenReturn(statelessOperatorSnapshotResult);
    +
    +		// set up the task
    +		StreamOperator<?>[] streamOperators = {statelessOperator};
    +		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
    +		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
    +
    +		FileSystem checkpointFileSystem = mock(FileSystem.class);
    +		FileSystem savepointFileSystem = mock(FileSystem.class);
    +
    +		FileSystem.setFsFactories(new HashMap<String, FileSystemFactory>() {{
    +			this.put("file", new FileSystemFactory() {
    +
    +				@Override
    +				public String getScheme() {
    +					return "file";
    +				}
    +
    +				@Override
    +				public void configure(Configuration config) {
    +
    +				}
    +
    +				@Override
    +				public FileSystem create(URI fsUri) throws IOException {
    +					return savepointFileSystem;
    +				}
    +			});
    +			this.put("hdfs", new FileSystemFactory() {
    +				@Override
    +				public String getScheme() {
    +					return "hdfs";
    +				}
    +
    +				@Override
    +				public void configure(Configuration config) {
    +
    +				}
    +
    +				@Override
    +				public FileSystem create(URI fsUri) throws IOException {
    +					return checkpointFileSystem;
    +				}
    +			});
    +		}});
    +
    +		CheckpointStorage checkpointStorage = spy(new FsCheckpointStorage(new Path("hdfs://test1/"),
new Path("file:///test2/"), new JobID(), 1024));
    +
    +		CheckpointStorageLocationReference locationReference = AbstractFsCheckpointStorage.encodePathAsReference(new
Path("file:///test2/"));
    +
    +		when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId, locationReference)).then(new
Answer<Object>() {
    +			@Override
    +			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    +				// valid
    +				FsCheckpointStorageLocation checkpointStorageLocation = (FsCheckpointStorageLocation)
invocationOnMock.callRealMethod();
    +				assertEquals(savepointFileSystem, checkpointStorageLocation.getFileSystem());
    +				return checkpointStorageLocation;
    +			}
    +		});
    +
    +		Whitebox.setInternalState(streamTask, "isRunning", true);
    +		Whitebox.setInternalState(streamTask, "lock", new Object());
    +		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
    +		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
    +		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
    +		Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage);
    +		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
    +
    +		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
    +
    +		streamTask.triggerCheckpoint(
    +			checkpointMetaData,
    +			new CheckpointOptions(CheckpointType.SAVEPOINT, locationReference));
    +	}
    --- End diff --
    
    This tests includes a lot of mocking, spying and whitebox testing. Usually these things
are really hard to maintain. I would, therefore, suggest to create a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation`
 method instead. 


---

Mime
View raw message