flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] tillrohrmann commented on a change in pull request #15812: remove slotpoolImpl
Date Tue, 18 May 2021 08:02:53 GMT

tillrohrmann commented on a change in pull request #15812:
URL: https://github.com/apache/flink/pull/15812#discussion_r634118669



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
##########
@@ -144,9 +144,9 @@ void start(
     Collection<SlotInfo> getAllocatedSlotsInformation();
 
     /**
-     * Allocates the available slot with the given allocation id under the given request
id for the
-     * given requirement profile. The slot must be able to fulfill the requirement profile,
-     * otherwise an {@link IllegalStateException} will be thrown.
+     * QueryableStateClient.java Allocates the available slot with the given allocation id
under the

Review comment:
       Why did you insert the `QueryableStateClient.java` here?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclareResourceRequirementServiceConnectionManager.java
##########
@@ -87,7 +87,9 @@ private void triggerResourceRequirementsSubmission(
                 () -> sendResourceRequirements(resourceRequirementsToSend),
                 new ExponentialBackoffRetryStrategy(
                         Integer.MAX_VALUE, sleepOnError, maxSleepOnError),
-                throwable -> !(throwable instanceof CancellationException),
+                throwable ->
+                        !(throwable instanceof CancellationException)
+                                && !(throwable instanceof UnsupportedOperationException),

Review comment:
       Why is this change necessary? This does not look right to me.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -48,19 +48,21 @@
     @Test
     public void testSlotAllocationFulfilledWithWorkloadSpreadOut()
             throws InterruptedException, ExecutionException {
+        PhysicalSlotRequest request0 = physicalSlotProviderResource.createSimpleRequest();
+        PhysicalSlotRequest request1 = physicalSlotProviderResource.createSimpleRequest();
+
+        CompletableFuture<PhysicalSlotRequest.Result> resultCompletableFuture0 =
+                physicalSlotProviderResource.allocateSlot(request0);
+        CompletableFuture<PhysicalSlotRequest.Result> resultCompletableFuture1 =
+                physicalSlotProviderResource.allocateSlot(request1);
+
         physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
                 ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY);
         physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
                 ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY, ResourceProfile.ANY);
 
-        PhysicalSlotRequest request0 = physicalSlotProviderResource.createSimpleRequest();
-        PhysicalSlotRequest request1 = physicalSlotProviderResource.createSimpleRequest();
-
-        PhysicalSlotRequest.Result result0 =
-                physicalSlotProviderResource.allocateSlot(request0).get();
-        PhysicalSlotRequest.Result result1 =
-                physicalSlotProviderResource.allocateSlot(request1).get();
-
+        PhysicalSlotRequest.Result result0 = resultCompletableFuture0.get();
+        PhysicalSlotRequest.Result result1 = resultCompletableFuture1.get();

Review comment:
       Why are these changes necessary?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -178,34 +159,34 @@ public void testExtraSlotsAreKept() throws Exception {
                 assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
             }
 
-            // wait until we have timed out the slot request
-            slotRequestTimeoutFuture.get();
-
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));

Review comment:
       Same here.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##########
@@ -50,7 +48,7 @@
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 
-/** Tests how the {@link SlotPoolImpl} completes slot requests. */
+/** Tests how the {@link SlotPool} completes slot requests. */
 public class SlotPoolRequestCompletionTest extends TestLogger {

Review comment:
       ```suggestion
   public class DeclarativeSlotPoolBridgeRequestCompletionTest extends TestLogger {
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
##########
@@ -272,8 +272,8 @@ public void testFailingExecutionAfterRestart() throws Exception {
 
     /**
      * Tests that a graph is not restarted after cancellation via a call to {@link
-     * ExecutionGraph#failGlobal(Throwable)}. This can happen when a slot is released concurrently
-     * with cancellation.
+     * ExecutionGraph#failJob(Throwable, long)} . This can happen when a slot is released
+     * concurrently with cancellation.

Review comment:
       I think this test does neither call `ExecutionGraph.failGlobal` nor `ExecutionGraph.failJob`.
Hence, I would suggest to say "Tests that a graph is not restarted after cancellation via
a call to {@link Execution#fail(Throwable)}".

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java
##########
@@ -54,9 +54,8 @@ private SlotPoolUtils() {
         throw new UnsupportedOperationException("Cannot instantiate this class.");
     }
 
-    static TestingSlotPoolImpl createAndSetUpSlotPool(
+    static SlotPool createAndSetUpSlotPool(

Review comment:
       ```suggestion
       static DeclarativeSlotPoolBridge createAndSetUpDeclarativeSlotPoolBridge(
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -178,34 +159,34 @@ public void testExtraSlotsAreKept() throws Exception {
                 assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
             }
 
-            // wait until we have timed out the slot request
-            slotRequestTimeoutFuture.get();
-
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
 
-            AllocationID allocationId = allocationIdFuture.get();
-            final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
+            final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
             final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
             final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
 
             testMainThreadExecutor.execute(
                     () -> pool.registerTaskManager(taskManagerLocation.getResourceID()));
-
             assertTrue(
                     testMainThreadExecutor.execute(
                             () ->
-                                    pool.offerSlot(
-                                            taskManagerLocation, taskManagerGateway, slotOffer)));
+                                    pool.offerSlots(
+                                                    taskManagerLocation,
+                                                    taskManagerGateway,
+                                                    Lists.newArrayList(slotOffer))
+                                            != null));
 
-            assertTrue(pool.containsAvailableSlot(allocationId));
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAvailableSlotsInformation().isEmpty()));
         }
     }
 
-    private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
+    private SlotPool createAndSetUpSlotPool() throws Exception {
         return new SlotPoolBuilder(testMainThreadExecutor.getMainThreadExecutor()).build();
     }
 
-    private TestingSlotPoolImpl createAndSetUpSlotPoolWithoutResourceManager() throws Exception
{
+    private SlotPool createAndSetUpSlotPoolWithoutResourceManager() throws Exception {

Review comment:
       I think the test class should be renamed in to `DeclarativeSlotPoolBridgeInteractionsTest`.
Also please rename this method into `createAndSetUpDeclarativeSlotPoolBridgeWithoutResourceManager`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -105,16 +107,4 @@ public void testSlotAllocationFulfilledWithNewSlots()
         physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(ResourceProfile.ANY);
         slotFuture.get();
     }
-
-    @Test
-    public void testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewSlots()
-            throws Exception {
-        TestingSlotPoolImpl slotPool =
-                new SlotPoolBuilder(physicalSlotProviderResource.getMainThreadExecutor()).build();
-        assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
-
-        new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut(), slotPool);
-        assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
-    }

Review comment:
       Why can this test be removed?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java
##########
@@ -90,6 +87,11 @@ public void testSlotAllocationFulfilledWithPreferredInputOverwrittingSpreadOut()
         PhysicalSlotRequest.Result result1 =
                 physicalSlotProviderResource.allocateSlot(request1).get();
 
+        physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
+                ResourceProfile.ANY, ResourceProfile.ANY);
+        physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(
+                ResourceProfile.ANY, ResourceProfile.ANY);
+

Review comment:
       Same here, why did you move this code block around?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java
##########
@@ -33,7 +33,7 @@
 
 import java.util.concurrent.CompletableFuture;
 
-/** Builder for a {@link TestingSlotPoolImpl}. */
+/** Builder for a {@link DeclarativeSlotPool}. */
 public class SlotPoolBuilder {

Review comment:
       ```suggestion
   public class DeclarativeSlotPoolBridgeBuilder {
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -105,22 +102,16 @@ public void testCancelSlotAllocationWithoutResourceManager() throws
Exception {
                 assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
             }
 
-            // wait for the timeout of the pending slot request
-            timeoutFuture.get();
-
-            assertEquals(0L, pool.getNumberOfWaitingForResourceRequests());
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));

Review comment:
       Why is this the same test as the previous version? In the old test, we test that the
pending slot requests are cleared. Here we are testing that there is no allocated slot. This
is not the same.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -137,32 +128,22 @@ public void testSlotAllocationTimeout() throws Exception {
                 assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
             }
 
-            // wait until we have timed out the slot request
-            slotRequestTimeoutFuture.get();
-
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));
         }
     }
 
-    /** Tests that extra slots are kept by the {@link SlotPoolImpl}. */
+    /** Tests that extra slots are kept by the {@link SlotPool}. */
     @Test
     public void testExtraSlotsAreKept() throws Exception {

Review comment:
       I think this test is no longer valid for the `DeclarativeSlotPoolBridge`. Instead,
the test should make sure that extra slots are rejected.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithDefaultSlotSelectionStrategyTest.java
##########
@@ -61,16 +61,4 @@ public void testSlotAllocationFulfilledWithNewSlots()
         physicalSlotProviderResource.registerSlotOffersFromNewTaskExecutor(ResourceProfile.ANY);
         slotFuture.get();
     }
-
-    @Test
-    public void testIndividualBatchSlotRequestTimeoutCheckIsDisabledOnAllocatingNewSlots()
-            throws Exception {
-        TestingSlotPoolImpl slotPool =
-                new SlotPoolBuilder(physicalSlotProviderResource.getMainThreadExecutor()).build();
-        assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(true));
-
-        new PhysicalSlotProviderImpl(
-                LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
-        assertThat(slotPool.isBatchSlotRequestTimeoutCheckEnabled(), is(false));
-    }

Review comment:
       Why is it ok to delete this test instead of using the `DeclarativeSlotPoolBridge` here?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
##########
@@ -137,32 +128,22 @@ public void testSlotAllocationTimeout() throws Exception {
                 assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
             }
 
-            // wait until we have timed out the slot request
-            slotRequestTimeoutFuture.get();
-
-            assertEquals(0L, pool.getNumberOfPendingRequests());
+            testMainThreadExecutor.execute(
+                    () -> assertTrue(pool.getAllocatedSlotsInformation().isEmpty()));

Review comment:
       Same here. Why is this the same test?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##########
@@ -68,114 +52,6 @@ public void setup() {
         resourceManagerGateway = new TestingResourceManagerGateway();
     }
 

Review comment:
       This test class also needs to be renamed in to `DeclarativeSlotPoolBridgePendingRequestFailureTest`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message