storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [06/12] storm git commit: update DrpcServer code based on revans2 and abhishekagarwal87
Date Wed, 09 Mar 2016 15:53:59 GMT
update DrpcServer code based on revans2  and abhishekagarwal87


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/491ff985
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/491ff985
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/491ff985

Branch: refs/heads/master
Commit: 491ff9856eaaa8bc0bab6dbc073016b920500216
Parents: 8e350d1
Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Authored: Sat Feb 27 11:59:05 2016 +0800
Committer: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Committed: Sat Feb 27 21:10:41 2016 +0800

----------------------------------------------------------------------
 .../org/apache/storm/starter/ManualDRPC.java    |   1 -
 .../src/jvm/org/apache/storm/LocalDRPC.java     |   2 +-
 .../jvm/org/apache/storm/daemon/DrpcServer.java | 215 +++++++++----------
 3 files changed, 105 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/491ff985/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
index f986c88..f1e052e 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
@@ -59,7 +59,6 @@ public class ManualDRPC {
         LocalCluster cluster = new LocalCluster();
         Config conf = new Config();
         cluster.submitTopology("exclaim", conf, builder.createTopology());
-
         System.out.println(drpc.execute("exclamation", "aaa"));
         System.out.println(drpc.execute("exclamation", "bbb"));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/491ff985/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index 0cc8e43..c08c73e 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -70,7 +70,7 @@ public class LocalDRPC implements ILocalDRPC {
     @Override
     public void shutdown() {
         ServiceRegistry.unregisterService(this.serviceId);
-        this.handler.shutdown();
+        this.handler.close();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/491ff985/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
index 7cee915..ae410d1 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
@@ -19,8 +19,7 @@ package org.apache.storm.daemon;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-import com.sun.net.httpserver.HttpsServer;
-import com.sun.org.apache.bcel.internal.generic.ARRAYLENGTH;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.MetricsUtils;
@@ -32,7 +31,6 @@ import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
 import org.apache.storm.ui.FilterConfiguration;
 import org.apache.storm.ui.IConfigurator;
 import org.apache.storm.ui.UIHelpers;
-import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.VersionInfo;
@@ -47,7 +45,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface,
Shutdownable {
+
+public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface,
AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class);
     private final Long timeoutCheckSecs = 5L;
@@ -62,40 +61,42 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
 
     private IAuthorizer authorizer;
 
-    // To be removed after porting drpc.clj
+    //TODO: To be removed after porting drpc.clj
     private Servlet httpServlet;
 
     private AtomicInteger ctr = new AtomicInteger(0);
-    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String,
Semaphore>();
-    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String,
Object>();
-    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String,
Integer>();
-    private ConcurrentHashMap<String, String> idtoFunction = new ConcurrentHashMap<String,
String>();
-    private ConcurrentHashMap<String, DRPCRequest> idtoRequest = new ConcurrentHashMap<String,
DRPCRequest>();
     private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues
= new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
 
-    private final Meter meterHttpRequests = new MetricRegistry().meter("drpc:num-execute-http-requests");
-    private final Meter meterExecuteCalls = new MetricRegistry().meter("drpc:num-execute-calls");
-    private final Meter meterResultCalls = new MetricRegistry().meter("drpc:num-result-calls");
-    private final Meter meterFailRequestCalls = new MetricRegistry().meter("drpc:num-failRequest-calls");
-    private final Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
-    private final Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
-    
-    public DrpcServer() {
-
+    private static class InternalRequest {
+        public final Semaphore sem;
+        public final int startTimeSecs;
+        public final String function;
+        public final DRPCRequest request;
+        public volatile Object result;
+
+        public InternalRequest(String function, DRPCRequest request) {
+            sem = new Semaphore(0);
+            startTimeSecs = Time.currentTimeSecs();
+            this.function = function;
+            this.request = request;
+        }
     }
+    private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>();
 
-    public IHttpCredentialsPlugin getHttpCredsHandler() {
-        return httpCredsHandler;
-    }
 
-    public void setHttpCredsHandler(IHttpCredentialsPlugin httpCredsHandler) {
-        this.httpCredsHandler = httpCredsHandler;
-    }
+    //TODO: to be replaced by a common registry
+    private final static Meter meterHttpRequests = new MetricRegistry().meter("drpc:num-execute-http-requests");
+    private final static Meter meterExecuteCalls = new MetricRegistry().meter("drpc:num-execute-calls");
+    private final static Meter meterResultCalls = new MetricRegistry().meter("drpc:num-result-calls");
+    private final static Meter meterFailRequestCalls = new MetricRegistry().meter("drpc:num-failRequest-calls");
+    private final static Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
+    private final static Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
+    
+    public DrpcServer() {
 
-    public Servlet getHttpServlet() {
-        return httpServlet;
     }
 
+    //TODO: to be removed
     public void setHttpServlet(Servlet httpServlet) {
         this.httpServlet = httpServlet;
     }
@@ -116,31 +117,9 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         return invokeServer;
     }
 
-    private void initServer() throws Exception {
+    private void initHttp() throws Exception{
+        LOG.info("Starting  RPC Http servers...");
         Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
-        handlerServer = initHandlerServer(conf, this);
-        invokeServer = initInvokeServer(conf, this);
-        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
-        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
-            @Override
-            public void run() {
-                if (handlerServer != null) {
-                    handlerServer.stop();
-                } else {
-                    invokeServer.stop();
-                }
-            }
-        });
-        LOG.info("Starting Distributed RPC servers...");
-
-        LOG.info("Starting Distributed RPC servers...");
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                invokeServer.serve();
-            }
-        }).start();
         if (drpcHttpPort != null && drpcHttpPort > 0) {
             String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
             Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
@@ -167,12 +146,37 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
             });
         }
 
-        // To be replaced by Common.StartMetricsReporters
+    }
+    private void initThrift() throws Exception {
+
+        handlerServer = initHandlerServer(conf, this);
+        invokeServer = initInvokeServer(conf, this);
+        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
+        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
+            @Override
+            public void run() {
+                if (handlerServer != null) {
+                    handlerServer.stop();
+                } else {
+                    invokeServer.stop();
+                }
+            }
+        });
+        LOG.info("Starting Distributed RPC servers...");
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                invokeServer.serve();
+            }
+        }).start();
+
+        //TODO: To be replaced by Common.StartMetricsReporters
         List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
         for (PreparableReporter reporter : reporters) {
             reporter.prepare(new MetricRegistry(), conf);
             reporter.start();
-            LOG.info("Started statistics report plugin...");
+            LOG.info("Started statistics report plugin: {}", reporter);
         }
         if (handlerServer != null)
             handlerServer.serve();
@@ -183,14 +187,14 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
 
             @Override
             public Object call() throws Exception {
-                for (Map.Entry<String, Integer> e : idtoStart.entrySet()) {
-
-                    if (Time.deltaSecs(e.getValue()) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS),
0)) {
+                for (Map.Entry<String, InternalRequest> e : outstandingRequests.entrySet())
{
+                    InternalRequest internalRequest = e.getValue();
+                    if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS),
0)) {
                         String id = e.getKey();
-                        Semaphore sem = idtoSem.get(id);
+                        Semaphore sem = internalRequest.sem;
                         if (sem != null) {
-                            String func = idtoFunction.get(id);
-                            acquireQueue(func).remove(idtoRequest.get(id));
+                            String func = internalRequest.function;
+                            acquireQueue(func).remove(internalRequest.request);
                             LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
                             sem.release();
                         }
@@ -214,84 +218,82 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         authorizer = mkAuthorizationHandler((String) (conf.get(Config.DRPC_AUTHORIZER)),
conf);
 
         initClearThread();
-        if (!isLocal)
-            initServer();
+        if (!isLocal){
+            initThrift();
+            initHttp();
+        }
+
     }
 
     @Override
-    public void shutdown() {
+    public void close() {
         meterShutdownCalls.mark();
         clearThread.interrupt();
     }
 
     public void cleanup(String id) {
-        idtoSem.remove(id);
-        idtoResult.remove(id);
-        idtoStart.remove(id);
-        idtoFunction.remove(id);
-        idtoRequest.remove(id);
+        outstandingRequests.remove(id);
     }
 
     @Override
     public String execute(String functionName, String funcArgs) throws DRPCExecutionException,
AuthorizationException, org.apache.thrift.TException {
         meterExecuteCalls.mark();
-        LOG.debug("Received DRPC request for {} {} at {} ", functionName, funcArgs, System.currentTimeMillis());
+        LOG.debug("Received DRPC request for {} ({}) at {} ", functionName, funcArgs, System.currentTimeMillis());
         Map<String, String> map = new HashMap<>();
         map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
         checkAuthorization(authorizer, map, "execute");
 
-        int idinc = this.ctr.incrementAndGet();
-        int maxvalue = 1000000000;
-        int newid = idinc % maxvalue;
-        if (idinc != newid) {
-            this.ctr.compareAndSet(idinc, newid);
-        }
-
+        int newid = 0;
+        int orig = 0;
+        do {
+            orig = ctr.get();
+            newid = (orig + 1) % 1000000000;
+        } while (!ctr.compareAndSet(orig, newid));
         String strid = String.valueOf(newid);
-        Semaphore sem = new Semaphore(0);
 
         DRPCRequest req = new DRPCRequest(funcArgs, strid);
-        this.idtoStart.put(strid, Time.currentTimeSecs());
-        this.idtoSem.put(strid, sem);
-        this.idtoFunction.put(strid, functionName);
-        this.idtoRequest.put(strid, req);
+        InternalRequest internalRequest = new InternalRequest(functionName, req);
+        this.outstandingRequests.put(strid, internalRequest);
         ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
         queue.add(req);
         LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
         try {
-            sem.acquire();
+            internalRequest.sem.acquire();
         } catch (InterruptedException e) {
             LOG.error("acquire fail ", e);
         }
         LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
 
-        Object result = this.idtoResult.get(strid);
+        Object result = internalRequest.result;
 
-        LOG.info("Returning for DRPC request for " + functionName + " " + funcArgs + " at
" + (System.currentTimeMillis()));
+        LOG.debug("Returning for DRPC request for " + functionName + " " + funcArgs + " at
" + (System.currentTimeMillis()));
 
         this.cleanup(strid);
 
-        if (result instanceof DRPCExecutionException) {
-            throw (DRPCExecutionException) result;
+        if (result instanceof DRPCExecutionException ) {
+            throw (DRPCExecutionException)result;
         }
         if (result == null) {
             throw new DRPCExecutionException("Request timed out");
         }
-        return String.valueOf(result);
+        try {
+            return String.valueOf(result);
+        }catch (Exception e){
+            throw new DRPCExecutionException(e.getMessage());
+        }
     }
 
     @Override
     public void result(String id, String result) throws AuthorizationException, TException
{
         meterResultCalls.mark();
-        String func = this.idtoFunction.get(id);
-        if (func != null) {
-            Map<String, String> map = new HashMap<>();
-            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
+        InternalRequest internalRequest = this.outstandingRequests.get(id);
+        if (internalRequest != null) {
+            Map<String, String> map = ImmutableMap.of(DRPCAuthorizerBase.FUNCTION_NAME,
internalRequest.function);
             checkAuthorization(authorizer, map, "result");
-            Semaphore sem = this.idtoSem.get(id);
+            Semaphore sem = internalRequest.sem;
             LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
             if (sem != null) {
-                this.idtoResult.put(id, result);
+                internalRequest.result = result;
                 sem.release();
             }
         }
@@ -316,14 +318,14 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
     @Override
     public void failRequest(String id) throws AuthorizationException, TException {
         meterFailRequestCalls.mark();
-        String func = this.idtoFunction.get(id);
-        if (func != null) {
+        InternalRequest internalRequest = this.outstandingRequests.get(id);
+        if (internalRequest != null) {
             Map<String, String> map = new HashMap<>();
-            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
+            map.put(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
             checkAuthorization(authorizer, map, "failRequest");
-            Semaphore sem = this.idtoSem.get(id);
+            Semaphore sem = internalRequest.sem;
             if (sem != null) {
-                this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
+                internalRequest.result = new DRPCExecutionException("Request failed");
                 sem.release();
             }
         }
@@ -332,8 +334,11 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
     protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
         ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
         if (reqQueue == null) {
-            reqQueue = new ConcurrentLinkedQueue<DRPCRequest>();
-            requestQueues.put(function, reqQueue);
+            reqQueue = new ConcurrentLinkedQueue<>();
+            ConcurrentLinkedQueue<DRPCRequest> old = requestQueues.putIfAbsent(function,
reqQueue);
+            if (old != null) {
+                reqQueue = old;
+            }
         }
         return reqQueue;
     }
@@ -375,16 +380,4 @@ public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocatio
         LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass,
authorizer);
         return authorizer;
     }
-
-    public Map getConf() {
-        return conf;
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Utils.setupDefaultUncaughtExceptionHandler();
-        final DrpcServer service = new DrpcServer();
-        service.launchServer(false, ConfigUtils.readStormConfig());
-    }
-
 }
\ No newline at end of file


Mime
View raw message