flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph to SchedulingTopology
Date Thu, 16 May 2019 13:12:41 GMT
zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] Implement ExecutionGraph
to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697765
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
 ##########
 @@ -62,121 +49,73 @@
 /**
  * Unit tests for {@link DefaultResultPartition}.
  */
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
 
-	private final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+	private final DefaultExecutionVertexTest.ExecutionStateProviderTest stateProvider = new
DefaultExecutionVertexTest.ExecutionStateProviderTest();
 
-	private final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
+	private List<SchedulingExecutionVertex> schedulingExecutionVertices;
 
-	private ExecutionGraph executionGraph;
-
-	private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
-	private List<IntermediateResultPartition> intermediateResultPartitions;
-
-	private List<SchedulingResultPartition> schedulingResultPartitions;
+	private DefaultResultPartition resultPartition;
 
 	@Before
-	public void setUp() throws Exception {
-		final int parallelism = 3;
-		JobVertex[] jobVertices = new JobVertex[2];
-		jobVertices[0] = createNoOpVertex(parallelism);
-		jobVertices[1] = createNoOpVertex(parallelism);
-		jobVertices[1].connectNewDataSetAsInput(jobVertices[0], ALL_TO_ALL, BLOCKING);
-		jobVertices[0].setInputDependencyConstraint(ALL);
-		jobVertices[1].setInputDependencyConstraint(ANY);
-		executionGraph = createSimpleTestGraph(
-			new JobID(),
-			taskManagerGateway,
-			triggeredRestartStrategy,
-			jobVertices);
-		adapter = new ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-		intermediateResultPartitions = new ArrayList<>();
-		schedulingResultPartitions = new ArrayList<>();
-
-		for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
-			for (Map.Entry<IntermediateResultPartitionID, IntermediateResultPartition> entry
-				: vertex.getProducedPartitions().entrySet()) {
-				intermediateResultPartitions.add(entry.getValue());
-				schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
-					.orElseThrow(() -> new IllegalArgumentException("can not find partition" + entry.getKey())));
-			}
-		}
-		assertEquals(parallelism, intermediateResultPartitions.size());
-	}
-
-	@Test
-	public void testBasicInfo() {
-		for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) {
-			final IntermediateResultPartition partition = intermediateResultPartitions.get(idx);
-			final SchedulingResultPartition schedulingResultPartition = schedulingResultPartitions.get(idx);
-			assertEquals(partition.getPartitionId(), schedulingResultPartition.getId());
-			assertEquals(partition.getIntermediateResult().getId(), schedulingResultPartition.getResultId());
-			assertEquals(partition.getResultType(), schedulingResultPartition.getPartitionType());
-		}
+	public void setUp() {
+		schedulingExecutionVertices = new ArrayList<>(2);
+		resultPartition = new DefaultResultPartition(
+			new IntermediateResultPartitionID(),
+			new IntermediateDataSetID(),
+			BLOCKING);
+
+		DefaultExecutionVertex vertex1 = new DefaultExecutionVertex(
+			new ExecutionVertexID(new JobVertexID(), 0),
+			Collections.singletonList(resultPartition),
+			ALL,
+			stateProvider);
+		resultPartition.setProducer(vertex1);
+		DefaultExecutionVertex vertex2 = new DefaultExecutionVertex(
+			new ExecutionVertexID(new JobVertexID(), 0),
+			java.util.Collections.emptyList(),
+			ALL,
+			stateProvider);
+		resultPartition.addConsumer(vertex2);
+		schedulingExecutionVertices.add(vertex1);
+		schedulingExecutionVertices.add(vertex2);
 	}
 
 	@Test
 	public void testGetConsumers() {
-		for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) {
-			Collection<ExecutionVertexID> schedulingConsumers = schedulingResultPartitions.get(idx).getConsumers()
-				.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
+		Collection<ExecutionVertexID> schedulingConsumers = resultPartition.getConsumers()
+			.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
 
-			Set<ExecutionVertexID> executionConsumers = new HashSet<>();
-			for (List<ExecutionEdge> list :intermediateResultPartitions.get(idx).getConsumers())
{
-				for (ExecutionEdge edge : list) {
-					final ExecutionVertex vertex = edge.getTarget();
-					executionConsumers.add(new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
-				}
-			}
-			assertThat(schedulingConsumers, containsInAnyOrder(executionConsumers.toArray()));
-		}
+		List<ExecutionVertexID> executionConsumers = Collections.singletonList(schedulingExecutionVertices.get(1).getId());
+		assertThat(schedulingConsumers, containsInAnyOrder(executionConsumers.toArray()));
 	}
 
 	@Test
 	public void testGetProducer() {
-		for (int idx = 0; idx < intermediateResultPartitions.size(); idx++) {
-			final ExecutionVertex vertex = intermediateResultPartitions.get(idx).getProducer();
-			assertEquals(schedulingResultPartitions.get(idx).getProducer().getId(),
-				new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
-		}
+		assertEquals(resultPartition.getProducer().getId(), schedulingExecutionVertices.get(0).getId());
 	}
 
 	@Test
 	public void testGetPartitionState() {
-		List<SchedulingExecutionVertex> schedulingExecutionVertices = new ArrayList<>();
-		executionGraph.getAllExecutionVertices().forEach(
-			vertex -> schedulingExecutionVertices.add(new DefaultExecutionVertex(vertex)));
-
 		final ExecutionState[] states = ExecutionState.values();
 
 Review comment:
   in-line, as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message