storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agre...@apache.org
Subject [storm] branch master updated: STORM-3634 add missing numa ports to supervisor.slots.ports
Date Thu, 14 May 2020 18:08:20 GMT
This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b36eae  STORM-3634 add missing numa ports to supervisor.slots.ports
     new d5256ce  Merge pull request #3267 from agresch/agresch_storm_3634
2b36eae is described below

commit 2b36eae9385616a26dd0f84c3071b8c4908d57d4
Author: Aaron Gresch <agresch@yahoo-inc.com>
AuthorDate: Tue May 12 14:42:08 2020 -0500

    STORM-3634 add missing numa ports to supervisor.slots.ports
---
 .../storm/daemon/supervisor/ReadClusterState.java  |  7 +++--
 .../storm/daemon/supervisor/SupervisorUtils.java   | 30 ++++++++++++++++++++++
 .../handler/LogviewerLogSearchHandler.java         |  4 +--
 3 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 35d0857..0186230 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -23,7 +23,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
-import org.apache.storm.DaemonConfig;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.supervisor.Slot.MachineState;
 import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
@@ -98,9 +97,9 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         }
 
         @SuppressWarnings("unchecked")
-        List<Number> ports = (List<Number>) superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS);
-        for (Number port : ports) {
-            slots.put(port.intValue(), mkSlot(port.intValue()));
+        List<Integer> ports = SupervisorUtils.getSlotsPorts(superConf);
+        for (Integer port : ports) {
+            slots.put(port, mkSlot(port));
         }
 
         try {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 3460600..b8d2e5e 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -25,8 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.localizer.LocalResource;
@@ -72,6 +75,33 @@ public class SupervisorUtils {
         return null;
     }
 
+    /**
+     * gets the set of all configured numa ports for a specific supervisor.
+     * @param supervisorConf supervisorConf
+     * @return set of all numa ports
+     */
+    public static Set<Integer> getNumaPorts(Map<String, Object> supervisorConf)
{
+        Set<Integer> numaPorts = new HashSet<>();
+        Map<String, Object> validatedNumaMap = getNumaMap(supervisorConf);
+        for (Map.Entry<String, Object> numaEntry : validatedNumaMap.entrySet()) {
+            Map<String, Object> numaMap  = (Map<String, Object>) numaEntry.getValue();
+            List<Integer> portList = (List<Integer>) numaMap.get(NUMA_PORTS);
+            numaPorts.addAll(portList);
+        }
+        return numaPorts;
+    }
+
+    public static List<Integer> getSlotsPorts(Map<String, Object> supervisorConf)
{
+        List<Integer> slotsPorts = (List<Integer>) supervisorConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
+                new ArrayList<>());
+        // It's possible we have numaPorts specified that weren't configured in SUPERVISOR_SLOTS_PORTS.
 Make
+        // sure we handle these ports as well.
+        Set<Integer> numaPorts = SupervisorUtils.getNumaPorts(supervisorConf);
+        numaPorts.removeAll(slotsPorts);
+        slotsPorts.addAll(numaPorts);
+        return slotsPorts;
+    }
+
     public static void rmrAsUser(Map<String, Object> conf, String id, String path)
throws IOException {
         String user = ServerUtils.getFileOwner(path);
         String logPreFix = "rmr " + id;
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
index 1feb6d5..bac5a7b 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
@@ -67,6 +67,7 @@ import org.apache.storm.daemon.logviewer.utils.ExceptionMeterNames;
 import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder;
 import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer;
 import org.apache.storm.daemon.logviewer.utils.WorkerLogs;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.daemon.ui.InvalidRequestException;
 import org.apache.storm.daemon.utils.StreamUtil;
 import org.apache.storm.daemon.utils.UrlBuilder;
@@ -259,8 +260,7 @@ public class LogviewerLogSearchHandler {
                 int port = Integer.parseInt(portStr);
                 // check just the one port
                 @SuppressWarnings("unchecked")
-                List<Integer> slotsPorts = (List<Integer>) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS,
-                    new ArrayList<>());
+                List<Integer> slotsPorts = SupervisorUtils.getSlotsPorts(stormConf);
                 boolean containsPort = slotsPorts.stream()
                     .anyMatch(slotPort -> slotPort != null && (slotPort == port));
                 if (!containsPort) {


Mime
View raw message