flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5746: [FLINK-8943] [ha] Fail Dispatcher if jobs cannot b...
Date Wed, 28 Mar 2018 12:00:00 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5746#discussion_r177724449
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
---
    @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
     		assertThat(jobIds, contains(jobGraph.getJobID()));
     	}
     
    +	/**
    +	 * Tests that the {@link Dispatcher} terminates if it cannot recover jobs ids from
    +	 * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +	 */
    +	@Test
    +	public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
    +		final FlinkException testException = new FlinkException("Test exception");
    +		submittedJobGraphStore.setJobIdsFunction(
    +			(Collection<JobID> jobIds) -> {
    +				throw testException;
    +			});
    +
    +		UUID expectedLeaderSessionId = UUID.randomUUID();
    +
    +		assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
    +
    +		dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
    +
    +		UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
    +			.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +		assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +
    +		// we expect that a fatal error occurred
    +		final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
    +
    +		assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(),
is(true));
    +
    +		fatalErrorHandler.clearError();
    +	}
    +
    +	/**
    +	 * Tests that the {@link Dispatcher} terminates if it cannot recover jobs from
    +	 * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +	 */
    +	@Test
    +	public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
    +		final FlinkException testException = new FlinkException("Test exception");
    +
    +		final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null);
    +		submittedJobGraphStore.putJobGraph(submittedJobGraph);
    +
    +		submittedJobGraphStore.setRecoverJobGraphFunction(
    +			(JobID jobId, Map<JobID, SubmittedJobGraph> submittedJobs) -> {
    +				throw testException;
    +			});
    +
    +		UUID expectedLeaderSessionId = UUID.randomUUID();
    --- End diff --
    
    Code looks duplicated from here on (`testFatalErrorAfterJobIdRecoveryFailure`)


---

Mime
View raw message