flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] zentol commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover regions if intermediate results are unavailable
Date Wed, 15 May 2019 11:32:04 GMT
zentol commented on a change in pull request #8430: [FLINK-12068] [runtime] Backtrack failover
regions if intermediate results are unavailable
URL: https://github.com/apache/flink/pull/8430#discussion_r284183023
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
 ##########
 @@ -162,40 +201,90 @@ private void buildOneRegionForAllVertices() {
 	 * In this strategy, all task vertices in 'involved' regions are proposed to be restarted.
 	 * The 'involved' regions are calculated with rules below:
 	 * 1. The region containing the failed task is always involved
-	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing
or Corrupted,
+	 * 2. If an input result partition of an involved region is not available, i.e. Missing
or Corrupted,
 	 *    the region containing the partition producer task is involved
-	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 * 3. If a region is involved, all of its consumer regions are involved
 	 *
 	 * @param executionVertexId ID of the failed task
 	 * @param cause cause of the failure
 	 * @return set of IDs of vertices to restart
 	 */
 	@Override
 	public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId,
Throwable cause) {
-		final FailoverRegion failedRegion = regions.get(executionVertexId);
+		LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId);
+
+		final FailoverRegion failedRegion = vertexToRegionMap.get(executionVertexId);
 		if (failedRegion == null) {
 			// TODO: show the task name in the log
 			throw new IllegalStateException("Can not find the failover region for task " + executionVertexId,
cause);
 		}
 
-		// TODO: if the failure cause is data consumption error, mark the corresponding data partition
to be unavailable
+		// if the failure cause is data consumption error, mark the corresponding data partition
to be failed,
+		// so that the failover process will try to recover it
+		if (cause instanceof DataConsumptionException) {
+			resultPartitionAvailabilityChecker.markResultPartitionFailed(
+				((DataConsumptionException) cause).getPartitionId().getPartitionId());
+		}
 
-		return getRegionsToRestart(failedRegion).stream().flatMap(
+		// calculate the tasks to restart based on the result of regions to restart
+		Set<FailoverRegion> regionsToRestart = getRegionsToRestart(failedRegion);
+		Set<ExecutionVertexID> tasksToRestart = regionsToRestart.stream().flatMap(
 			r -> r.getAllExecutionVertexIDs().stream()).collect(Collectors.toSet());
+
+		// the previous failed partition will be recovered. remove its failed state from the checker
+		if (cause instanceof DataConsumptionException) {
+			resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(
+				((DataConsumptionException) cause).getPartitionId().getPartitionId());
+		}
+
+		LOG.info("{} tasks should be restarted to recover the failed task {}. ", tasksToRestart.size(),
executionVertexId);
+		return tasksToRestart;
 	}
 
 	/**
 	 * All 'involved' regions are proposed to be restarted.
 	 * The 'involved' regions are calculated with rules below:
 	 * 1. The region containing the failed task is always involved
-	 * 2. TODO: If an input result partition of an involved region is not available, i.e. Missing
or Corrupted,
+	 * 2. If an input result partition of an involved region is not available, i.e. Missing
or Corrupted,
 	 *    the region containing the partition producer task is involved
-	 * 3. TODO: If a region is involved, all of its consumer regions are involved
+	 * 3. If a region is involved, all of its consumer regions are involved
 	 */
-	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion regionToRestart) {
-		return Collections.singleton(regionToRestart);
+	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
+		IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>();
+		IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>();
+
+		// start from the failed region to visit all involved regions
+		Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
+		regionsToVisit.add(failedRegion);
+		while (!regionsToVisit.isEmpty()) {
+			FailoverRegion regionToRestart = regionsToVisit.poll();
+
+			if (!visitedRegions.containsKey(regionToRestart)) {
+				visitedRegions.put(regionToRestart, null);
+
+				// an involved region should be restarted
+				regionsToRestart.put(regionToRestart, null);
 
-		// TODO: implement backtracking logic
+				// if a needed input result partition is not available, its producer region is involved
+				for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
+					for (FailoverEdge inEdge : vertex.getInputEdges()) {
+						if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID()))
{
+							FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
+							if (!visitedRegions.containsKey(producerRegion)) {
+								regionsToVisit.add(producerRegion);
+							}
+						}
+					}
+				}
+
+				// all consumer regions of an involved region should be involved
+				for (FailoverRegion consumerRegion : regionConsumers.get(regionToRestart)) {
 
 Review comment:
   `regionsToVisit.addAll(regionConsumers.get(regionToRestart))`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message