flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8176) Dispatcher does not start SubmittedJobGraphStore
Date Fri, 01 Dec 2017 11:08:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274256#comment-16274256
] 

ASF GitHub Bot commented on FLINK-8176:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5107#discussion_r154320903
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
---
    @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
     			heartbeatServices,
     			mock(MetricRegistryImpl.class),
     			fatalErrorHandler,
    -			jobManagerRunner,
    -			jobId);
    +			mockJobManagerRunner,
    +			TEST_JOB_ID);
     
    -		try {
    -			dispatcher.start();
    +		dispatcher.start();
    +	}
     
    -			CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +	@After
    +	public void tearDown() throws Exception {
    +		try {
    +			fatalErrorHandler.rethrowError();
    +		} finally {
    +			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    +		}
    +	}
     
    -			// wait for the leader to be elected
    -			leaderFuture.get();
    +	/**
    +	 * Tests that we can submit a job to the Dispatcher which then spawns a
    +	 * new JobManagerRunner.
    +	 */
    +	@Test
    +	public void testJobSubmission() throws Exception {
    +		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
     
    -			DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
    +		// wait for the leader to be elected
    +		leaderFuture.get();
     
    -			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
    +		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
     
    -			acknowledgeFuture.get();
    +		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph,
timeout);
     
    -			verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
    +		acknowledgeFuture.get();
     
    -			// check that no error has occurred
    -			fatalErrorHandler.rethrowError();
    -		} finally {
    -			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -		}
    +		verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start();
     	}
     
     	/**
     	 * Tests that the dispatcher takes part in the leader election.
     	 */
     	@Test
     	public void testLeaderElection() throws Exception {
    -		TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
    -		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    -
     		UUID expectedLeaderSessionId = UUID.randomUUID();
    -		CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>();
    -		SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class);
    -		TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService()
{
    -			@Override
    -			public void confirmLeaderSessionID(UUID leaderSessionId) {
    -				super.confirmLeaderSessionID(leaderSessionId);
    -				leaderSessionIdFuture.complete(leaderSessionId);
    -			}
    -		};
    -
    -		haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
    -		haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
    -		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
    -		final JobID jobId = new JobID();
    -
    -		final TestingDispatcher dispatcher = new TestingDispatcher(
    -			rpcService,
    -			Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
    -			new Configuration(),
    -			haServices,
    -			mock(ResourceManagerGateway.class),
    -			mock(BlobServer.class),
    -			heartbeatServices,
    -			mock(MetricRegistryImpl.class),
    -			fatalErrorHandler,
    -			mock(JobManagerRunner.class),
    -			jobId);
     
    -		try {
    -			dispatcher.start();
    +		assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
     
    -			assertFalse(leaderSessionIdFuture.isDone());
    +		dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
     
    -			testingLeaderElectionService.isLeader(expectedLeaderSessionId);
    +		UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture()
    +			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
     
    -			UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +		assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
     
    -			assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +		verify(submittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    +	}
     
    -			verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    -		} finally {
    -			RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -		}
    +	/**
    +	 * Test callbacks from
    +	 * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
    +	 */
    +	@Test
    +	public void testSubmittedJobGraphListener() throws Exception {
    +		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +		leaderFuture.get();
    +
    +		dispatcher.submitJob(jobGraph, timeout);
    +
    +		// pretend that other Dispatcher has removed job from submittedJobGraphStore
    +		dispatcher.onRemovedJobGraph(TEST_JOB_ID);
    +		assertThat(dispatcher.listJobs(timeout).get(), hasSize(0));
    --- End diff --
    
    Test is not thread-safe.


> Dispatcher does not start SubmittedJobGraphStore
> ------------------------------------------------
>
>                 Key: FLINK-8176
>                 URL: https://issues.apache.org/jira/browse/FLINK-8176
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Job-Submission, YARN
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> {{Dispatcher}} never calls start on its {{SubmittedJobGraphStore}} instance. Hence, when
a Job is submitted (YARN session mode with HA enabled), an {{IllegalStateException}} is thrown:
> {noformat}
> java.lang.IllegalStateException: Not running. Forgot to call start()?
>         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.verifyIsRunning(ZooKeeperSubmittedJobGraphStore.java:411)
>         at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.putJobGraph(ZooKeeperSubmittedJobGraphStore.java:222)
>         at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:202)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:207)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:151)
>         at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>         at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$0(AkkaRpcActor.java:129)
>         at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> {noformat}
> *Expected Behavior*
> In {{start()}} method, the submittedJobGraphStore should be started as so:
> {code}
> submittedJobGraphStore.start(this);
> {code}
> To enable this, the {{Dispatcher}} must implement the interface {{SubmittedJobGraphStore.SubmittedJobGraphListener}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message