flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] GJL commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
Date Sun, 05 May 2019 19:40:03 GMT
GJL commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler
interface and adapt ExecutionGraph to it
URL: https://github.com/apache/flink/pull/8318#discussion_r281035837
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -392,43 +379,10 @@ public JobMaster(
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt) {
 
-		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
-		if (execution == null) {
-			// can happen when JobManager had already unregistered this execution upon on task failure,
-			// but TaskManager get some delay to aware of that situation
-			if (log.isDebugEnabled()) {
-				log.debug("Can not find Execution for attempt {}.", executionAttempt);
-			}
-			// but we should TaskManager be aware of this
-			return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt
" + executionAttempt));
-		}
-
-		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
-		if (vertex == null) {
-			log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
-			return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex
for vertex ID " + vertexID));
-		}
-
-		if (vertex.getSplitAssigner() == null) {
-			log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
-			return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex
ID " + vertexID));
-		}
-
-		final InputSplit nextInputSplit = execution.getNextInputSplit();
-
-		if (log.isDebugEnabled()) {
-			log.debug("Send next input split {}.", nextInputSplit);
-		}
-
 		try {
-			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
-			return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit));
-		} catch (Exception ex) {
-			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(),
ex);
-			IOException reason = new IOException("Could not serialize the next input split of class
" +
-					nextInputSplit.getClass() + ".", ex);
-			vertex.fail(reason);
-			return FutureUtils.completedExceptionally(reason);
+			return CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID, executionAttempt));
+		} catch (IOException e) {
 
 Review comment:
   Then I wonder what we should do about unchecked exceptions. See my other comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message