storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [07/12] storm git commit: adjustment a few functions
Date Wed, 09 Mar 2016 15:54:00 GMT
adjustment a few functions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d1a9b3d0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d1a9b3d0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d1a9b3d0

Branch: refs/heads/master
Commit: d1a9b3d0d622ab26a4243d9b77f4201fa88be657
Parents: 491ff98
Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Authored: Wed Mar 2 09:51:04 2016 +0800
Committer: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Committed: Wed Mar 2 10:15:32 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  4 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  2 +-
 .../src/jvm/org/apache/storm/LocalDRPC.java     | 14 ++----
 .../jvm/org/apache/storm/daemon/DrpcServer.java | 53 +++++++-------------
 .../test/clj/org/apache/storm/drpc_test.clj     |  6 +--
 .../apache/storm/security/auth/auth_test.clj    |  2 -
 .../storm/security/auth/drpc_auth_test.clj      |  2 +-
 7 files changed, 26 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index a128972..96568e1 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -68,13 +68,13 @@
   ([]
     (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
           drpc-http-port (int (conf DRPC-HTTP-PORT))
-          drpc-server (DrpcServer.)
+          drpc-server (DrpcServer. conf)
           http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
       (when (> drpc-http-port 0)
         (let [app (-> (webapp drpc-server http-creds-handler)
                     requests-middleware)]
           (.setHttpServlet drpc-server (ring.util.servlet/servlet app))))
-      (.launchServer drpc-server false conf)))
+      (.launchServer drpc-server)))
 )
 
 (defn -main []

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 7295679..52d0ef6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -1283,7 +1283,7 @@
       (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream.
(ConfigUtils/supervisorStormConfPath tmproot)) nil)
       (finally
         (.shutdown blob-store)))
-    (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+     (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
 
     (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf
conf storm-id)) stormroot)
     (let [classloader (.getContextClassLoader (Thread/currentThread))

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index c08c73e..ccdf634 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm;
 
-import org.apache.log4j.Logger;
 import org.apache.storm.daemon.DrpcServer;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExecutionException;
@@ -30,20 +29,13 @@ import org.apache.thrift.TException;
 import java.util.Map;
 
 public class LocalDRPC implements ILocalDRPC {
-    private static final Logger LOG = Logger.getLogger(LocalDRPC.class);
 
-    private DrpcServer handler = new DrpcServer();
-    private Thread thread;
+    private final DrpcServer handler;
     private final String serviceId;
 
     public LocalDRPC() {
-        try {
-            Map conf = ConfigUtils.readStormConfig();
-            handler.launchServer(true, conf);
-        }catch (Exception e){
-            throw Utils.wrapInRuntime(e);
-        }
-
+        Map conf = ConfigUtils.readStormConfig();
+        handler = new DrpcServer(conf);
         serviceId = ServiceRegistry.registerService(handler);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
index ae410d1..d8d33bd 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
@@ -61,7 +61,6 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
 
     private IAuthorizer authorizer;
 
-    //TODO: To be removed after porting drpc.clj
     private Servlet httpServlet;
 
     private AtomicInteger ctr = new AtomicInteger(0);
@@ -92,18 +91,17 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
     private final static Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
     private final static Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
     
-    public DrpcServer() {
-
+    public DrpcServer(Map conf) {
+        this.conf = conf;
+        this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER)));
+        initClearThread();
     }
 
-    //TODO: to be removed
     public void setHttpServlet(Servlet httpServlet) {
         this.httpServlet = httpServlet;
     }
 
-
-
-    private ThriftServer initHandlerServer(Map conf, final DrpcServer service) throws Exception
{
+    private ThriftServer initHandlerServer(final DrpcServer service) throws Exception {
         int port = (int) conf.get(Config.DRPC_PORT);
         if (port > 0) {
             handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service),
ThriftConnectionType.DRPC);
@@ -111,7 +109,7 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         return handlerServer;
     }
 
-    private ThriftServer initInvokeServer(Map conf, final DrpcServer service) throws Exception
{
+    private ThriftServer initInvokeServer(final DrpcServer service) throws Exception {
         invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
                 ThriftConnectionType.DRPC_INVOCATIONS);
         return invokeServer;
@@ -149,17 +147,15 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
     }
     private void initThrift() throws Exception {
 
-        handlerServer = initHandlerServer(conf, this);
-        invokeServer = initInvokeServer(conf, this);
+        handlerServer = initHandlerServer(this);
+        invokeServer = initInvokeServer(this);
         httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
         Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
             @Override
             public void run() {
-                if (handlerServer != null) {
+                if (handlerServer != null)
                     handlerServer.stop();
-                } else {
-                    invokeServer.stop();
-                }
+                invokeServer.stop();
             }
         });
         LOG.info("Starting Distributed RPC servers...");
@@ -189,7 +185,7 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
             public Object call() throws Exception {
                 for (Map.Entry<String, InternalRequest> e : outstandingRequests.entrySet())
{
                     InternalRequest internalRequest = e.getValue();
-                    if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS),
0)) {
+                    if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS)))
{
                         String id = e.getKey();
                         Semaphore sem = internalRequest.sem;
                         if (sem != null) {
@@ -199,7 +195,6 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
                             sem.release();
                         }
                         cleanup(id);
-                        LOG.info("Clear request " + id);
                     }
                 }
                 return getTimeoutCheckSecs();
@@ -211,18 +206,10 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         return timeoutCheckSecs;
     }
 
-    public void launchServer(boolean isLocal, Map conf) throws Exception {
-
+    public void launchServer() throws Exception {
         LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
-        this.conf = conf;
-        authorizer = mkAuthorizationHandler((String) (conf.get(Config.DRPC_AUTHORIZER)),
conf);
-
-        initClearThread();
-        if (!isLocal){
-            initThrift();
-            initHttp();
-        }
-
+        initThrift();
+        initHttp();
     }
 
     @Override
@@ -276,11 +263,7 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         if (result == null) {
             throw new DRPCExecutionException("Request timed out");
         }
-        try {
-            return String.valueOf(result);
-        }catch (Exception e){
-            throw new DRPCExecutionException(e.getMessage());
-        }
+        return (String) result;
     }
 
     @Override
@@ -363,16 +346,14 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
     }
 
     // TO be replaced by Common.mkAuthorizationHandler
-    private IAuthorizer mkAuthorizationHandler(String klassname, Map conf) {
+    private IAuthorizer mkAuthorizationHandler(String klassname) {
         IAuthorizer authorizer = null;
         Class aznClass = null;
         if (StringUtils.isNotBlank(klassname)) {
             try {
                 aznClass = Class.forName(klassname);
                 authorizer = (IAuthorizer) aznClass.newInstance();
-                if (authorizer != null) {
-                    authorizer.prepare(conf);
-                }
+                authorizer.prepare(conf);
             } catch (Exception e) {
                 LOG.error("mkAuthorizationHandler failed!", e);
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 4879d0d..a20872e 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -235,10 +235,9 @@
         conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
         mock-cu (proxy [ConfigUtils] []
                   (readStormConfigImpl [] conf))
-        drpc-handler (proxy [DrpcServer] []
+        drpc-handler (proxy [DrpcServer] [conf]
                        (acquireQueue [function] queue))]
     (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-      (.launchServer drpc-handler true conf)
       (is (thrown? DRPCExecutionException
             (.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
       (is (= 0 (.size queue))))))
@@ -249,11 +248,10 @@
         conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
         mock-cu (proxy [ConfigUtils] []
                   (readStormConfigImpl [] conf))
-        drpc-handler (proxy [DrpcServer] []
+        drpc-handler (proxy [DrpcServer] [conf]
           (acquireQueue [function] queue)
           (getTimeoutCheckSecs [] delay-seconds))]
     (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-      (.launchServer drpc-handler true conf)
       (is (thrown? DRPCExecutionException
             (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args"))))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index a366efa..27f5816 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -27,8 +27,6 @@
   (:import [javax.security.auth Subject])
   (:import [java.net InetAddress])
   (:import [org.apache.storm Config])
-  (:import [org.mockito Mockito])
-  (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.storm.generated AuthorizationException])
   (:import [org.apache.storm.utils NimbusClient ConfigUtils])
   (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])

http://git-wip-us.apache.org/repos/asf/storm/blob/d1a9b3d0/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
index 3eef31b..6b1aaa4 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
@@ -38,7 +38,7 @@
         conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
         conf (assoc conf DRPC-PORT client-port)
         conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
-        service-handler (let [drpc-service (DrpcServer.)] (.launchServer drpc-service true
conf) drpc-service)
+        service-handler (DrpcServer. conf)
         handler-server (ThriftServer. conf
                                       (DistributedRPC$Processor. service-handler)
                                       ThriftConnectionType/DRPC)


Mime
View raw message