samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover
Date Tue, 12 Feb 2019 18:41:33 GMT
vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware
container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256083123
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##########
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest request) {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby constraints.
If not, we re-request resource
+  // for the container (similar to the case when we re-request for a launch-fail or request
expiry).
+  private boolean checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request,
String preferredHost,
+      SamzaResource samzaResource, SamzaApplicationState state) {
+
+    // If standby tasks are not enabled run streamprocessor and return true
+    if (!new JobConfig(config).getStandbyTasksEnabled()) {
+      runStreamProcessor(request, preferredHost);
+      return true;
+    }
+
+    String containerID = request.getContainerID();
+
+    if (checkStandbyConstraints(request, samzaResource, state)) {
+      // This resource can be used to launch this container
+      log.info("Running container {} on preferred host {} meets standby constraints, launching
on {}", containerID,
+          preferredHost, samzaResource.getHost());
+      runStreamProcessor(request, preferredHost);
+      state.successfulStandbyAllocations.incrementAndGet();
+      return true;
+    } else {
+      // This resource cannot be used to launch this container, so we treat it like a launch
fail, and issue an ANY_HOST request
+      log.info("Running container {} on host {} does not meet standby constraints, cancelling
resource request, releasing resource, and making a new ANY_HOST request",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+      state.failedStandbyAllocations.incrementAndGet();
+      return false;
+    }
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can be met on this
resource, given standby
+  // container constraints, and the current set of pending and running containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource,
+      SamzaApplicationState samzaApplicationState) {
+    String containerIDToStart = request.getContainerID();
+    String host = samzaResource.getHost();
+    List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart);
+
+    // Check if any of these conflicting containers are running/launching on host
+    for (String containerID : containerIDsForStandbyConstraints) {
+      SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID);
+
+      // return false if a conflicting container is pending for launch on the host
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container {} is already
scheduled on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+
+      // return false if a conflicting container is running on the host
+      resource = samzaApplicationState.runningContainers.get(containerID);
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container {} is already
running on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or resource-request
expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling rerequesting for container {} using an any host request");
+      super.requestResource(containerID, ResourceRequestState.ANY_HOST); // proceed with
a the anyhost request
+    } else {
+      requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke local method
& select a new standby if possible
+    }
+  }
+
+  // Intercept issuing of resource requests from the CPM
+  // 1. for ActiveContainers and instead choose a StandbyContainer to stop
+  // 2. for a StandbyContainer (after it has been chosen for failover), to put the active
on the standby's host
+  // and request another resource for the standby
+  // 3. for a standbyContainer (if not for a failover)
+  @Override
+  public void requestResource(String containerID, String preferredHost) {
+
+    // If StandbyTasks are not enabled, we simply forward the resource requests
+    if (!new JobConfig(config).getStandbyTasksEnabled()) {
+      super.requestResource(containerID, preferredHost);
+      return;
+    }
+
+    // If its an anyhost request for an active container, then we select a standby container
to stop and place this activeContainer on that standby's host
+    // we may have already chosen a standby (which didnt work for a failover)
+    if (!StandbyTaskUtil.isStandbyContainer(containerID) && preferredHost.equals(ResourceRequestState.ANY_HOST))
{
+      initiateActiveContainerFailover(containerID);
+    } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
 
 Review comment:
   move failover handling logic up-stream - maybe to the `ContainerProcessManager`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message