storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [storm] 02/04: STORM-3379: Fix intermittent NPE during worker boot in local mode
Date Sun, 05 May 2019 14:14:09 GMT
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit c985695e0728eb3af171ca6d346c51cc4e0b083a
Author: Stig Rohde Døssing <srdo@apache.org>
AuthorDate: Sat Apr 13 20:33:29 2019 +0200

    STORM-3379: Fix intermittent NPE during worker boot in local mode
---
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  26 +++--
 .../apache/storm/daemon/worker/WorkerState.java    |  45 +++++----
 .../auth/workertoken/WorkerTokenAuthorizer.java    |   2 +-
 .../org/apache/storm/utils/SupervisorClient.java   |   5 +-
 .../apache/storm/utils/SupervisorIfaceFactory.java |  30 ++++++
 .../apache/storm/daemon/supervisor/Container.java  |   4 +-
 .../storm/daemon/supervisor/ContainerLauncher.java |   7 +-
 .../storm/daemon/supervisor/LocalContainer.java    |  10 +-
 .../daemon/supervisor/LocalContainerLauncher.java  |   7 +-
 .../storm/daemon/supervisor/ReadClusterState.java  |   3 +-
 .../apache/storm/daemon/supervisor/Supervisor.java | 108 ++++++++++++---------
 .../nimbus/AssignmentDistributionService.java      |   2 +-
 12 files changed, 164 insertions(+), 85 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index e950c051..175a91a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -14,6 +14,7 @@ package org.apache.storm.daemon.worker;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -22,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -45,6 +47,7 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.Supervisor;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
@@ -61,6 +64,7 @@ import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -87,6 +91,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private AtomicReference<Credentials> credentialsAtom;
     private Subject subject;
     private Collection<IAutoCredentials> autoCreds;
+    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
 
 
     /**
@@ -103,7 +108,7 @@ public class Worker implements Shutdownable, DaemonCommon {
      */
 
     public Worker(Map<String, Object> conf, IContext context, String topologyId, String
assignmentId,
-                  int supervisorPort, int port, String workerId) {
+                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory>
supervisorIfaceSupplier) {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
@@ -113,6 +118,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
         this.metricRegistry = new StormMetricRegistry();
+        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
     }
 
     public static void main(String[] args) throws Exception {
@@ -125,8 +131,16 @@ public class Worker implements Shutdownable, DaemonCommon {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
-        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
-                                   Integer.parseInt(portStr), workerId);
+        int supervisorPortInt = Integer.parseInt(supervisorPort);
+        Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
+            try {
+                return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
+            } catch (UnknownHostException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        };
+        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
+                                   Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
         worker.start();
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -172,7 +186,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage,
IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
-        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort,
port, workerId,
+        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier,
port, workerId,
                                       topologyConf, stateStorage, stormClusterState, autoCreds,
metricRegistry);
 
         // Heartbeat here so that worker process dies if this fails
@@ -425,8 +439,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
                                                                                   lsWorkerHeartbeat.get_executors(),
                                                                                   lsWorkerHeartbeat.get_time_secs());
-        try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
supervisorPort)) {
-            client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+        try (SupervisorIfaceFactory fac = supervisorIfaceSupplier.get()) {
+            fac.getIface().sendSupervisorWorkerHeartbeat(workerHeartbeat);
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 2aa96a9..913261d 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -69,6 +69,7 @@ import org.apache.storm.serialization.ITupleSerializer;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.shade.org.apache.commons.lang.Validate;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
@@ -76,6 +77,7 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Utils.SmartThread;
@@ -92,7 +94,7 @@ public class WorkerState {
     final IConnection receiver;
     final String topologyId;
     final String assignmentId;
-    final int supervisorPort;
+    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
     final int port;
     final String workerId;
     final IStateStorage stateStorage;
@@ -151,18 +153,18 @@ public class WorkerState {
     private final StormMetricRegistry metricRegistry;
 
     public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId,
String assignmentId,
-                       int supervisorPort, int port, String workerId, Map<String, Object>
topologyConf, IStateStorage stateStorage,
+                       Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier, int
port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
                        IStormClusterState stormClusterState, Collection<IAutoCredentials>
autoCredentials,
                        StormMetricRegistry metricRegistry) throws IOException,
         InvalidTopologyException {
         this.metricRegistry = metricRegistry;
         this.autoCredentials = autoCredentials;
         this.conf = conf;
+        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
         this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState,
topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
-        this.supervisorPort = supervisorPort;
         this.port = port;
         this.workerId = workerId;
         this.stateStorage = stateStorage;
@@ -370,9 +372,14 @@ public class WorkerState {
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-
+    
     public void refreshConnections() {
-        Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
+        Assignment assignment = null;
+        try {
+            assignment = getLocalAssignment(stormClusterState, topologyId);
+        } catch (Exception e) {
+            LOG.warn("Failed to read assignment. This should only happen when topology is
shutting down.", e);
+        }
 
         Set<NodeInfo> neededConnections = new HashSet<>();
         Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
@@ -393,6 +400,7 @@ public class WorkerState {
         Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections);
         Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections);
 
+        Map<String, String> nodeHost = assignment != null ? assignment.get_node_host()
: null;
         // Add new connections atomically
         cachedNodeToPortSocket.getAndUpdate(prev -> {
             Map<NodeInfo, IConnection> next = new HashMap<>(prev);
@@ -400,7 +408,8 @@ public class WorkerState {
                 next.put(nodeInfo,
                          mqContext.connect(
                              topologyId,
-                             assignment.get_node_host().get(nodeInfo.get_node()),    // Host
+                             //nodeHost is not null here, as newConnections is only non-empty
if assignment was not null above.
+                             nodeHost.get(nodeInfo.get_node()),    // Host
                              nodeInfo.get_port().iterator().next().intValue(),       // Port
                              workerTransfer.getRemoteBackPressureStatus()));
             }
@@ -625,7 +634,8 @@ public class WorkerState {
         LOG.info("Reading assignments");
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
-        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf,
stormClusterState, topologyId).get_executor_node_port();
+        Map<List<Long>, NodeInfo> executorToNodePort = 
+            getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
         for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet())
{
             NodeInfo nodeInfo = entry.getValue();
             if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next()
== port) {
@@ -635,18 +645,17 @@ public class WorkerState {
         return executorsAssignedToThisWorker;
     }
 
-    private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState
stormClusterState, String topologyId) {
-        if (!ConfigUtils.isLocalMode(conf)) {
-            try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf,
Utils.hostname(),
-                                                                                        
 supervisorPort)) {
-                Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
-                return assignment;
-            } catch (Throwable tr1) {
+    private Assignment getLocalAssignment(IStormClusterState stormClusterState, String topologyId)
{
+        try (SupervisorIfaceFactory fac = supervisorIfaceSupplier.get()) {
+            return fac.getIface().getLocalAssignmentForStorm(topologyId);
+        } catch (Throwable e) {
                 //if any error/exception thrown, fetch it from zookeeper
-                return stormClusterState.remoteAssignmentInfo(topologyId, null);
+            Assignment assignment = stormClusterState.remoteAssignmentInfo(topologyId, null);
+            if (assignment == null) {
+                throw new RuntimeException("Failed to read worker assignment."
+                    + " Supervisor client threw exception, and assignment in Zookeeper was
null", e);
             }
-        } else {
-            return stormClusterState.remoteAssignmentInfo(topologyId, null);
+            return assignment;
         }
     }
 
@@ -666,7 +675,7 @@ public class WorkerState {
         for (List<Long> executor : executors) {
             int port = this.getPort();
             receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
-                recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
+                                                      recvQueueSize, overflowLimit, recvBatchSize,
backPressureWaitStrategy,
                 this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(),
metricRegistry));
 
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index f321221..c225e27 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -152,4 +152,4 @@ public class WorkerTokenAuthorizer implements PasswordProvider, Closeable
{
             state.disconnect();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java b/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
index 2364777..64d5ace 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
  * <li>nimbus -> supervisor: assign assignments for a node.</li>
  * </ul>
  */
-public class SupervisorClient extends ThriftClient {
+public class SupervisorClient extends ThriftClient implements SupervisorIfaceFactory {
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class);
     private Supervisor.Client client;
 
@@ -76,7 +76,8 @@ public class SupervisorClient extends ThriftClient {
         }
     }
 
-    public Supervisor.Client getClient() {
+    @Override
+    public Supervisor.Client getIface() {
         return client;
     }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java b/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java
new file mode 100644
index 0000000..eddfa73
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.io.Closeable;
+
+public interface SupervisorIfaceFactory extends Closeable {
+
+    org.apache.storm.generated.Supervisor.Iface getIface();
+
+    @Override
+    default void close() {
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 8ad5936..8b58483 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -309,7 +309,7 @@ public abstract class Container implements Killable {
         }
         return ret;
     }
-
+    
     @Override
     public boolean areAllProcessesDead() throws IOException {
         Set<Long> pids = getAllPids();
@@ -325,7 +325,7 @@ public abstract class Container implements Killable {
                 break;
             }
         }
-
+        
         if (allDead && shutdownTimer != null) {
             shutdownTimer.stop();
             shutdownTimer = null;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
index 7df2036..b310018 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -46,14 +46,17 @@ public abstract class ContainerLauncher {
      * @param sharedContext Used in local mode to let workers talk together without netty
      * @param metricsRegistry The metrics registry.
      * @param containerMemoryTracker The shared memory tracker for the supervisor's containers
+     * @param localSupervisor The local supervisor Thrift interface. Only used for local
clusters, distributed clusters use Thrift directly.
      * @return the proper container launcher
      * @throws IOException on any error
      */
     public static ContainerLauncher make(Map<String, Object> conf, String supervisorId,
int supervisorPort,
                                          IContext sharedContext, StormMetricsRegistry metricsRegistry,

-                                         ContainerMemoryTracker containerMemoryTracker) throws
IOException {
+                                         ContainerMemoryTracker containerMemoryTracker,
+                                         org.apache.storm.generated.Supervisor.Iface localSupervisor)
throws IOException {
         if (ConfigUtils.isLocalMode(conf)) {
-            return new LocalContainerLauncher(conf, supervisorId, supervisorPort, sharedContext,
metricsRegistry, containerMemoryTracker);
+            return new LocalContainerLauncher(conf, supervisorId, supervisorPort, sharedContext,
metricsRegistry, containerMemoryTracker,
+                localSupervisor);
         }
 
         ResourceIsolationInterface resourceIsolationManager = null;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
index 1a5fd82..228da84 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -27,15 +27,18 @@ import org.slf4j.LoggerFactory;
 public class LocalContainer extends Container {
     private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
     private final IContext _sharedContext;
+    private final org.apache.storm.generated.Supervisor.Iface localSupervisor;
     private volatile boolean _isAlive = false;
 
     public LocalContainer(Map<String, Object> conf, String supervisorId, int supervisorPort,
int port,
                           LocalAssignment assignment, IContext sharedContext, StormMetricsRegistry
metricsRegistry,
-                          ContainerMemoryTracker containerMemoryTracker) throws IOException
{
+                          ContainerMemoryTracker containerMemoryTracker,
+                          org.apache.storm.generated.Supervisor.Iface localSupervisor) throws
IOException {
         super(ContainerType.LAUNCH, conf, supervisorId, supervisorPort, port, assignment,
null, null, null, null, metricsRegistry, 
             containerMemoryTracker);
         _sharedContext = sharedContext;
         _workerId = Utils.uuid();
+        this.localSupervisor = localSupervisor;
     }
 
     @Override
@@ -50,7 +53,10 @@ public class LocalContainer extends Container {
 
     @Override
     public void launch() throws IOException {
-        Worker worker = new Worker(_conf, _sharedContext, _topologyId, _supervisorId, _supervisorPort,
_port, _workerId);
+        Worker worker = new Worker(_conf, _sharedContext, _topologyId, _supervisorId, _supervisorPort,
_port, _workerId,
+            () -> {
+                return () -> localSupervisor;
+            });
         try {
             worker.start();
         } catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
index 77b04d4..d9f3f8d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
@@ -29,22 +29,25 @@ public class LocalContainerLauncher extends ContainerLauncher {
     private final IContext _sharedContext;
     private final StormMetricsRegistry metricsRegistry;
     private final ContainerMemoryTracker containerMemoryTracker;
+    private final org.apache.storm.generated.Supervisor.Iface localSupervisor;
 
     public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, int
supervisorPort,
                                   IContext sharedContext, StormMetricsRegistry metricsRegistry,

-                                  ContainerMemoryTracker containerMemoryTracker) {
+                                  ContainerMemoryTracker containerMemoryTracker,
+                                  org.apache.storm.generated.Supervisor.Iface localSupervisor)
{
         _conf = conf;
         _supervisorId = supervisorId;
         _supervisorPort = supervisorPort;
         _sharedContext = sharedContext;
         this.metricsRegistry = metricsRegistry;
         this.containerMemoryTracker = containerMemoryTracker;
+        this.localSupervisor = localSupervisor;
     }
 
     @Override
     public Container launchContainer(int port, LocalAssignment assignment, LocalState state)
throws IOException {
         LocalContainer ret = new LocalContainer(_conf, _supervisorId, _supervisorPort,
-            port, assignment, _sharedContext, metricsRegistry, containerMemoryTracker);
+            port, assignment, _sharedContext, metricsRegistry, containerMemoryTracker, localSupervisor);
         ret.setup();
         ret.launch();
         return ret;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 46b0e60..6b18bbe 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -86,7 +86,8 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         this.slotMetrics = supervisor.getSlotMetrics();
 
         this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisorPort,
-            supervisor.getSharedContext(), supervisor.getMetricsRegistry(), supervisor.getContainerMemoryTracker());
+            supervisor.getSharedContext(), supervisor.getMetricsRegistry(), supervisor.getContainerMemoryTracker(),
+            supervisor.getSupervisorThriftInterface());
 
         this.metricsProcessor = null;
         try {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ed79c35..9bc4f7f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -112,6 +112,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private ThriftServer thriftServer;
     //used for local cluster heartbeating
     private Nimbus.Iface localNimbus;
+    //Passed to workers in local clusters, exposed by thrift server in distributed mode
+    private org.apache.storm.generated.Supervisor.Iface supervisorThriftInterface;
 
     private Supervisor(ISupervisor iSupervisor, StormMetricsRegistry metricsRegistry)
         throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException
{
@@ -178,6 +180,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+        
+        this.supervisorThriftInterface = createSupervisorIface();
     }
 
     /**
@@ -393,6 +397,59 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
+    private org.apache.storm.generated.Supervisor.Iface createSupervisorIface() {
+        return new org.apache.storm.generated.Supervisor.Iface() {
+            @Override
+            public void sendSupervisorAssignments(SupervisorAssignments assignments)
+                throws AuthorizationException, TException {
+                checkAuthorization("sendSupervisorAssignments");
+                LOG.info("Got an assignments from master, will start to sync with assignments:
{}", assignments);
+                SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(),
assignments,
+                    getReadClusterState());
+                getEventManger().add(syn);
+            }
+
+            @Override
+            public Assignment getLocalAssignmentForStorm(String id)
+                throws NotAliveException, AuthorizationException, TException {
+                Map<String, Object> topoConf = null;
+                try {
+                    topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                } catch (IOException e) {
+                    LOG.warn("Topology config is not localized yet...");
+                }
+                checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
+                Assignment assignment = getStormClusterState().assignmentInfo(id, null);
+                if (null == assignment) {
+                    throw new WrappedNotAliveException("No local assignment assigned for
storm: "
+                        + id
+                        + " for node: "
+                        + getHostName());
+                }
+                return assignment;
+            }
+
+            @Override
+            public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
+                throws AuthorizationException, NotAliveException, TException {
+                // do nothing except validate heartbeat for now.
+                String id = heartbeat.get_storm_id();
+                Map<String, Object> topoConf = null;
+                try {
+                    topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                } catch (IOException e) {
+                    LOG.warn("Topology config is not localized yet...");
+                    throw new WrappedNotAliveException(id + " does not appear to be alive,
you should probably exit");
+                }
+                checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
+            }
+        };
+    }
+
+    public org.apache.storm.generated.Supervisor.Iface getSupervisorThriftInterface() {
+        return supervisorThriftInterface;
+    }
+    
     private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException
{
         // validate port
         int port = getThriftServerPort();
@@ -404,53 +461,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw new RuntimeException(e);
         }
 
-        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
-            new org.apache.storm.generated.Supervisor.Iface() {
-                @Override
-                public void sendSupervisorAssignments(SupervisorAssignments assignments)
-                    throws AuthorizationException, TException {
-                    checkAuthorization("sendSupervisorAssignments");
-                    LOG.info("Got an assignments from master, will start to sync with assignments:
{}", assignments);
-                    SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(),
assignments,
-                                                                            getReadClusterState());
-                    getEventManger().add(syn);
-                }
-
-                @Override
-                public Assignment getLocalAssignmentForStorm(String id)
-                    throws NotAliveException, AuthorizationException, TException {
-                    Map<String, Object> topoConf = null;
-                    try {
-                        topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
-                    } catch (IOException e) {
-                        LOG.warn("Topology config is not localized yet...");
-                    }
-                    checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
-                    Assignment assignment = getStormClusterState().assignmentInfo(id, null);
-                    if (null == assignment) {
-                        throw new WrappedNotAliveException("No local assignment assigned
for storm: "
-                                                    + id
-                                                    + " for node: "
-                                                    + getHostName());
-                    }
-                    return assignment;
-                }
-
-                @Override
-                public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
-                    throws AuthorizationException, NotAliveException, TException {
-                    // do nothing except validate heartbeat for now.
-                    String id = heartbeat.get_storm_id();
-                    Map<String, Object> topoConf = null;
-                    try {
-                        topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
-                    } catch (IOException e) {
-                        LOG.warn("Topology config is not localized yet...");
-                        throw new WrappedNotAliveException(id + " does not appear to be alive,
you should probably exit");
-                    }
-                    checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
-                }
-            });
+        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor<>(supervisorThriftInterface);
         this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
         this.thriftServer.serve();
     }
@@ -536,7 +547,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getThriftServerPort(),
-                                                                    getSharedContext(), getMetricsRegistry(),
getContainerMemoryTracker());
+                                                                    getSharedContext(), getMetricsRegistry(),
getContainerMemoryTracker(),
+                                                                    supervisorThriftInterface);
                 killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
             } catch (Exception e) {
                 throw Utils.wrapInRuntime(e);
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index b912513..05cb1df 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -288,7 +288,7 @@ public class AssignmentDistributionService implements Closeable {
                 try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
                                                                                     assignments.getHost(),
assignments.getServerPort())) {
                     try {
-                        client.getClient().sendSupervisorAssignments(assignments.getAssignments());
+                        client.getIface().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
                         //just ignore the exception.
                         LOG.error("Exception when trying to send assignments to node {}:
{}", assignments.getNode(), e.getMessage());


Mime
View raw message