storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [09/12] storm git commit: Merge branch 'master' into DRPC, and update some metric
Date Wed, 09 Mar 2016 15:54:02 GMT
Merge branch 'master' into DRPC, and update some metric


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc25c51e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc25c51e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc25c51e

Branch: refs/heads/master
Commit: bc25c51ea079782cf9fe8890520efdec4812a7b5
Parents: b0c9574 b477939
Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Authored: Tue Mar 8 19:11:19 2016 +0800
Committer: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Committed: Tue Mar 8 19:12:13 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CHANGELOG.md                                    |  13 ++
 .../travis/print-errors-from-test-reports.py    |   4 +
 pom.xml                                         |  23 +++
 .../src/clj/org/apache/storm/MockAutoCred.clj   |  58 -------
 .../src/clj/org/apache/storm/daemon/common.clj  |  13 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |   8 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  27 ++--
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 155 +++++++++---------
 .../clj/org/apache/storm/daemon/supervisor.clj  |  27 +++-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  81 +++++-----
 .../src/clj/org/apache/storm/ui/helpers.clj     |  10 +-
 .../storm/cluster/StormClusterStateImpl.java    |   7 +-
 .../jvm/org/apache/storm/daemon/DrpcServer.java |  36 ++---
 .../storm/daemon/metrics/MetricsUtils.java      |   2 +-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |   2 +
 .../storm/metric/StormMetricsRegistry.java      |  84 ++++++++++
 .../auth/AbstractSaslClientCallbackHandler.java |  76 +++++++++
 .../auth/AbstractSaslServerCallbackHandler.java |  94 +++++++++++
 .../apache/storm/security/auth/AuthUtils.java   |  40 +++++
 .../auth/digest/ClientCallbackHandler.java      |  60 +------
 .../auth/digest/ServerCallbackHandler.java      |  61 +------
 .../storm/security/auth/kerberos/AutoTGT.java   |  64 ++++----
 .../auth/kerberos/AutoTGTKrb5LoginModule.java   |   8 +-
 .../auth/plain/PlainClientCallbackHandler.java  |  31 ++++
 .../auth/plain/PlainSaslTransportPlugin.java    |  71 +++++++++
 .../auth/plain/PlainServerCallbackHandler.java  |  55 +++++++
 .../security/auth/plain/SaslPlainServer.java    | 158 +++++++++++++++++++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  16 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   9 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |  10 +-
 .../security/auth/auto_login_module_test.clj    |  24 ++-
 .../clj/org/apache/storm/supervisor_test.clj    |  17 +-
 .../test/jvm/org/apache/storm/MockAutoCred.java |  75 +++++++++
 34 files changed, 1012 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc25c51e/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index 96568e1,0fe19e9..872407c
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@@ -15,19 -15,166 +15,19 @@@
  ;; limitations under the License.
  
  (ns org.apache.storm.daemon.drpc
 -  (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext]
 -           [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration]
 +  (:import [org.apache.storm.security.auth AuthUtils ReqContext]
-            [org.apache.storm.daemon DrpcServer])
++           [org.apache.storm.daemon DrpcServer]
+            [org.apache.storm.metric StormMetricsRegistry])
 -  (:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase])
    (:import [org.apache.storm.utils Utils])
 -  (:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
 -            DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
 -            DistributedRPCInvocations$Processor])
 -  (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
 -            ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
 -  (:import [org.apache.storm.daemon Shutdownable]
 -           [org.apache.storm.utils Time])
 -  (:import [java.net InetAddress])
 -  (:import [org.apache.storm.generated AuthorizationException]
 -           [org.apache.storm.utils VersionInfo ConfigUtils]
 -           [org.apache.storm.logging ThriftAccessLogger])
 +  (:import [org.apache.storm.utils ConfigUtils])
    (:use [org.apache.storm config log util])
 -  (:use [org.apache.storm.daemon common])
    (:use [org.apache.storm.ui helpers])
    (:use compojure.core)
    (:use ring.middleware.reload)
    (:require [compojure.handler :as handler])
-   (:require [metrics.meters :refer [defmeter mark!]])
    (:gen-class))
  
- (defmeter drpc:num-execute-http-requests)
+ (def drpc:num-execute-http-requests (StormMetricsRegistry/registerMeter "drpc:num-execute-http-requests"))
 -(def drpc:num-execute-calls (StormMetricsRegistry/registerMeter "drpc:num-execute-calls"))
 -(def drpc:num-result-calls (StormMetricsRegistry/registerMeter "drpc:num-result-calls"))
 -(def drpc:num-failRequest-calls (StormMetricsRegistry/registerMeter "drpc:num-failRequest-calls"))
 -(def drpc:num-fetchRequest-calls (StormMetricsRegistry/registerMeter "drpc:num-fetchRequest-calls"))
 -(def drpc:num-shutdown-calls (StormMetricsRegistry/registerMeter "drpc:num-shutdown-calls"))
 -
 -(def STORM-VERSION (VersionInfo/getVersion))
 -
 -(defn timeout-check-secs [] 5)
 -
 -(defn acquire-queue [queues-atom function]
 -  (swap! queues-atom
 -    (fn [amap]
 -      (if-not (amap function)
 -        (assoc amap function (ConcurrentLinkedQueue.))
 -        amap)))
 -  (@queues-atom function))
 -
 -(defn check-authorization
 -  ([aclHandler mapping operation context]
 -    (if (not-nil? context)
 -      (ThriftAccessLogger/logAccess (.requestID context) (.remoteAddress context) (.principal
context) operation))
 -    (if aclHandler
 -      (let [context (or context (ReqContext/context))]
 -        (if-not (.permit aclHandler context operation mapping)
 -          (let [principal (.principal context)
 -                user (if principal (.getName principal) "unknown")]
 -              (throw (AuthorizationException.
 -                       (str "DRPC request '" operation "' for '"
 -                            user "' user is not authorized"))))))))
 -  ([aclHandler mapping operation]
 -    (check-authorization aclHandler mapping operation (ReqContext/context))))
 -
 -;; TODO: change this to use TimeCacheMap
 -(defn service-handler [conf]
 -  (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
 -        ctr (atom 0)
 -        id->sem (atom {})
 -        id->result (atom {})
 -        id->start (atom {})
 -        id->function (atom {})
 -        id->request (atom {})
 -        request-queues (atom {})
 -        cleanup (fn [id] (swap! id->sem dissoc id)
 -                  (swap! id->result dissoc id)
 -                  (swap! id->function dissoc id)
 -                  (swap! id->request dissoc id)
 -                  (swap! id->start dissoc id))
 -        my-ip (.getHostAddress (InetAddress/getLocalHost))
 -        clear-thread (Utils/asyncLoop
 -                       (fn []
 -                         (doseq [[id start] @id->start]
 -                           (when (> (Time/deltaSecs start) (conf DRPC-REQUEST-TIMEOUT-SECS))
 -                             (when-let [sem (@id->sem id)]
 -                               (.remove (acquire-queue request-queues (@id->function
id)) (@id->request id))
 -                               (log-warn "Timeout DRPC request id: " id " start at " start)
 -                               (.release sem))
 -                             (cleanup id)))
 -                         (timeout-check-secs)))]
 -    (reify DistributedRPC$Iface
 -      (^String execute
 -        [this ^String function ^String args]
 -        (.mark drpc:num-execute-calls)
 -        (log-debug "Received DRPC request for " function " (" args ") at " (System/currentTimeMillis))
 -        (check-authorization drpc-acl-handler
 -                             {DRPCAuthorizerBase/FUNCTION_NAME function}
 -                             "execute")
 -        (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
 -              ^Semaphore sem (Semaphore. 0)
 -              req (DRPCRequest. args id)
 -              ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
 -          (swap! id->start assoc id (Time/currentTimeSecs))
 -          (swap! id->sem assoc id sem)
 -          (swap! id->function assoc id function)
 -          (swap! id->request assoc id req)
 -          (.add queue req)
 -          (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
 -          (.acquire sem)
 -          (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
 -          (let [result (@id->result id)]
 -            (cleanup id)
 -            (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
 -            (if (instance? DRPCExecutionException result)
 -              (throw result)
 -              (if (nil? result)
 -                (throw (DRPCExecutionException. "Request timed out"))
 -                result)))))
 -
 -      DistributedRPCInvocations$Iface
 -
 -      (^void result
 -        [this ^String id ^String result]
 -        (.mark drpc:num-result-calls)
 -        (when-let [func (@id->function id)]
 -          (check-authorization drpc-acl-handler
 -                               {DRPCAuthorizerBase/FUNCTION_NAME func}
 -                               "result")
 -          (let [^Semaphore sem (@id->sem id)]
 -            (log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
 -            (when sem
 -              (swap! id->result assoc id result)
 -              (.release sem)
 -              ))))
 -
 -      (^void failRequest
 -        [this ^String id]
 -        (.mark drpc:num-failRequest-calls)
 -        (when-let [func (@id->function id)]
 -          (check-authorization drpc-acl-handler
 -                               {DRPCAuthorizerBase/FUNCTION_NAME func}
 -                               "failRequest")
 -          (let [^Semaphore sem (@id->sem id)]
 -            (when sem
 -              (swap! id->result assoc id (DRPCExecutionException. "Request failed"))
 -              (.release sem)))))
 -
 -      (^DRPCRequest fetchRequest
 -        [this ^String func]
 -        (.mark drpc:num-fetchRequest-calls)
 -        (check-authorization drpc-acl-handler
 -                             {DRPCAuthorizerBase/FUNCTION_NAME func}
 -                             "fetchRequest")
 -        (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
 -              ret (.poll queue)]
 -          (if ret
 -            (do (log-debug "Fetched request for " func " at " (System/currentTimeMillis))
 -              ret)
 -            (DRPCRequest. "" ""))))
 -
 -      Shutdownable
 -
 -      (shutdown
 -        [this]
 -        (.mark drpc:num-shutdown-calls)
 -        (.interrupt clear-thread)))))
  
  (defn handle-request [handler]
    (fn [request]

http://git-wip-us.apache.org/repos/asf/storm/blob/bc25c51e/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/bc25c51e/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
index d8d33bd,0000000..c4792d0
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
@@@ -1,364 -1,0 +1,356 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.daemon;
 +
 +import com.codahale.metrics.Meter;
- import com.codahale.metrics.MetricRegistry;
 +import com.google.common.collect.ImmutableMap;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.storm.Config;
- import org.apache.storm.daemon.metrics.MetricsUtils;
- import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.logging.ThriftAccessLogger;
++import org.apache.storm.metric.StormMetricsRegistry;
 +import org.apache.storm.security.auth.*;
 +import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
 +import org.apache.storm.ui.FilterConfiguration;
 +import org.apache.storm.ui.IConfigurator;
 +import org.apache.storm.ui.UIHelpers;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.utils.VersionInfo;
 +import org.apache.thrift.TException;
 +import org.eclipse.jetty.server.Server;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import javax.servlet.Servlet;
 +import java.security.Principal;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
- 
 +public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface,
AutoCloseable {
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class);
 +    private final Long timeoutCheckSecs = 5L;
 +
 +    private Map conf;
 +
 +    private ThriftServer handlerServer;
 +    private ThriftServer invokeServer;
 +    private IHttpCredentialsPlugin httpCredsHandler;
 +
 +    private Thread clearThread;
 +
 +    private IAuthorizer authorizer;
 +
 +    private Servlet httpServlet;
 +
 +    private AtomicInteger ctr = new AtomicInteger(0);
 +    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues
= new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
 +
 +    private static class InternalRequest {
 +        public final Semaphore sem;
 +        public final int startTimeSecs;
 +        public final String function;
 +        public final DRPCRequest request;
 +        public volatile Object result;
 +
 +        public InternalRequest(String function, DRPCRequest request) {
 +            sem = new Semaphore(0);
 +            startTimeSecs = Time.currentTimeSecs();
 +            this.function = function;
 +            this.request = request;
 +        }
 +    }
++
 +    private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>();
 +
++    private final static Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
++    private final static Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
++    private final static Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
++    private final static Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
++    private final static Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
++    private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
 +
-     //TODO: to be replaced by a common registry
-     private final static Meter meterHttpRequests = new MetricRegistry().meter("drpc:num-execute-http-requests");
-     private final static Meter meterExecuteCalls = new MetricRegistry().meter("drpc:num-execute-calls");
-     private final static Meter meterResultCalls = new MetricRegistry().meter("drpc:num-result-calls");
-     private final static Meter meterFailRequestCalls = new MetricRegistry().meter("drpc:num-failRequest-calls");
-     private final static Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
-     private final static Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
-     
 +    public DrpcServer(Map conf) {
 +        this.conf = conf;
 +        this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER)));
 +        initClearThread();
 +    }
 +
 +    public void setHttpServlet(Servlet httpServlet) {
 +        this.httpServlet = httpServlet;
 +    }
 +
 +    private ThriftServer initHandlerServer(final DrpcServer service) throws Exception {
 +        int port = (int) conf.get(Config.DRPC_PORT);
 +        if (port > 0) {
 +            handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service),
ThriftConnectionType.DRPC);
 +        }
 +        return handlerServer;
 +    }
 +
 +    private ThriftServer initInvokeServer(final DrpcServer service) throws Exception {
 +        invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
 +                ThriftConnectionType.DRPC_INVOCATIONS);
 +        return invokeServer;
 +    }
 +
-     private void initHttp() throws Exception{
++    private void initHttp() throws Exception {
 +        LOG.info("Starting  RPC Http servers...");
 +        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
 +        if (drpcHttpPort != null && drpcHttpPort > 0) {
 +            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
 +            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
 +            FilterConfiguration filterConfiguration = new FilterConfiguration(filterParams,
filterClass);
 +            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
 +            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
 +            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
 +            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
 +            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
 +            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
 +            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
 +            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
 +            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
 +            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
 +            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
 +
 +            UIHelpers.stormRunJetty(drpcHttpPort, new IConfigurator() {
 +                @Override
 +                public void execute(Server s) {
 +                    UIHelpers.configSsl(s, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType,
httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
 +                            httpsNeedClientAuth, httpsWantClientAuth);
 +                    UIHelpers.configFilter(s, httpServlet, filterConfigurations);
 +                }
 +            });
 +        }
 +
 +    }
++
 +    private void initThrift() throws Exception {
 +
 +        handlerServer = initHandlerServer(this);
 +        invokeServer = initInvokeServer(this);
 +        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
 +        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
 +            @Override
 +            public void run() {
 +                if (handlerServer != null)
 +                    handlerServer.stop();
 +                invokeServer.stop();
 +            }
 +        });
 +        LOG.info("Starting Distributed RPC servers...");
 +        new Thread(new Runnable() {
 +
 +            @Override
 +            public void run() {
 +                invokeServer.serve();
 +            }
 +        }).start();
 +
-         //TODO: To be replaced by Common.StartMetricsReporters
-         List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
-         for (PreparableReporter reporter : reporters) {
-             reporter.prepare(new MetricRegistry(), conf);
-             reporter.start();
-             LOG.info("Started statistics report plugin: {}", reporter);
-         }
++        StormMetricsRegistry.startMetricsReporters(conf);
++
 +        if (handlerServer != null)
 +            handlerServer.serve();
 +    }
 +
 +    private void initClearThread() {
 +        clearThread = Utils.asyncLoop(new Callable() {
 +
 +            @Override
 +            public Object call() throws Exception {
 +                for (Map.Entry<String, InternalRequest> e : outstandingRequests.entrySet())
{
 +                    InternalRequest internalRequest = e.getValue();
 +                    if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS)))
{
 +                        String id = e.getKey();
 +                        Semaphore sem = internalRequest.sem;
 +                        if (sem != null) {
 +                            String func = internalRequest.function;
 +                            acquireQueue(func).remove(internalRequest.request);
 +                            LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
 +                            sem.release();
 +                        }
 +                        cleanup(id);
 +                    }
 +                }
 +                return getTimeoutCheckSecs();
 +            }
 +        });
 +    }
 +
 +    public Long getTimeoutCheckSecs() {
 +        return timeoutCheckSecs;
 +    }
 +
 +    public void launchServer() throws Exception {
 +        LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
 +        initThrift();
 +        initHttp();
 +    }
 +
 +    @Override
 +    public void close() {
 +        meterShutdownCalls.mark();
 +        clearThread.interrupt();
 +    }
 +
 +    public void cleanup(String id) {
 +        outstandingRequests.remove(id);
 +    }
 +
 +    @Override
 +    public String execute(String functionName, String funcArgs) throws DRPCExecutionException,
AuthorizationException, org.apache.thrift.TException {
 +        meterExecuteCalls.mark();
 +        LOG.debug("Received DRPC request for {} ({}) at {} ", functionName, funcArgs, System.currentTimeMillis());
 +        Map<String, String> map = new HashMap<>();
 +        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
 +        checkAuthorization(authorizer, map, "execute");
 +
 +        int newid = 0;
 +        int orig = 0;
 +        do {
 +            orig = ctr.get();
 +            newid = (orig + 1) % 1000000000;
 +        } while (!ctr.compareAndSet(orig, newid));
 +        String strid = String.valueOf(newid);
 +
 +        DRPCRequest req = new DRPCRequest(funcArgs, strid);
 +        InternalRequest internalRequest = new InternalRequest(functionName, req);
 +        this.outstandingRequests.put(strid, internalRequest);
 +        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
 +        queue.add(req);
 +        LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
 +        try {
 +            internalRequest.sem.acquire();
 +        } catch (InterruptedException e) {
 +            LOG.error("acquire fail ", e);
 +        }
 +        LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
 +
 +        Object result = internalRequest.result;
 +
 +        LOG.debug("Returning for DRPC request for " + functionName + " " + funcArgs + "
at " + (System.currentTimeMillis()));
 +
 +        this.cleanup(strid);
 +
-         if (result instanceof DRPCExecutionException ) {
-             throw (DRPCExecutionException)result;
++        if (result instanceof DRPCExecutionException) {
++            throw (DRPCExecutionException) result;
 +        }
 +        if (result == null) {
 +            throw new DRPCExecutionException("Request timed out");
 +        }
 +        return (String) result;
 +    }
 +
 +    @Override
 +    public void result(String id, String result) throws AuthorizationException, TException
{
 +        meterResultCalls.mark();
 +        InternalRequest internalRequest = this.outstandingRequests.get(id);
 +        if (internalRequest != null) {
 +            Map<String, String> map = ImmutableMap.of(DRPCAuthorizerBase.FUNCTION_NAME,
internalRequest.function);
 +            checkAuthorization(authorizer, map, "result");
 +            Semaphore sem = internalRequest.sem;
 +            LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
 +            if (sem != null) {
 +                internalRequest.result = result;
 +                sem.release();
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException,
TException {
 +        meterFetchRequestCalls.mark();
 +        Map<String, String> map = new HashMap<>();
 +        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
 +        checkAuthorization(authorizer, map, "fetchRequest");
 +        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
 +        DRPCRequest req = queue.poll();
 +        if (req != null) {
 +            LOG.debug("Fetched request for {} at {}", functionName, System.currentTimeMillis());
 +            return req;
 +        } else {
 +            return new DRPCRequest("", "");
 +        }
 +    }
 +
 +    @Override
 +    public void failRequest(String id) throws AuthorizationException, TException {
 +        meterFailRequestCalls.mark();
 +        InternalRequest internalRequest = this.outstandingRequests.get(id);
 +        if (internalRequest != null) {
 +            Map<String, String> map = new HashMap<>();
 +            map.put(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
 +            checkAuthorization(authorizer, map, "failRequest");
 +            Semaphore sem = internalRequest.sem;
 +            if (sem != null) {
 +                internalRequest.result = new DRPCExecutionException("Request failed");
 +                sem.release();
 +            }
 +        }
 +    }
 +
 +    protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
 +        ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
 +        if (reqQueue == null) {
 +            reqQueue = new ConcurrentLinkedQueue<>();
 +            ConcurrentLinkedQueue<DRPCRequest> old = requestQueues.putIfAbsent(function,
reqQueue);
 +            if (old != null) {
 +                reqQueue = old;
 +            }
 +        }
 +        return reqQueue;
 +    }
 +
 +    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation,
ReqContext reqContext) throws AuthorizationException {
 +        if (reqContext != null) {
 +            ThriftAccessLogger.logAccess(reqContext.requestID(), reqContext.remoteAddress(),
reqContext.principal(), operation);
 +        }
 +        if (aclHandler != null) {
 +            if (reqContext == null)
 +                reqContext = ReqContext.context();
 +            if (!aclHandler.permit(reqContext, operation, mapping)) {
 +                Principal principal = reqContext.principal();
 +                String user = (principal != null) ? principal.getName() : "unknown";
 +                throw new AuthorizationException("DRPC request '" + operation + "' for '"
+ user + "' user is not authorized");
 +            }
 +        }
 +    }
 +
 +    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation)
throws AuthorizationException {
 +        checkAuthorization(aclHandler, mapping, operation, ReqContext.context());
 +    }
 +
 +    // TO be replaced by Common.mkAuthorizationHandler
 +    private IAuthorizer mkAuthorizationHandler(String klassname) {
 +        IAuthorizer authorizer = null;
 +        Class aznClass = null;
 +        if (StringUtils.isNotBlank(klassname)) {
 +            try {
 +                aznClass = Class.forName(klassname);
 +                authorizer = (IAuthorizer) aznClass.newInstance();
 +                authorizer.prepare(conf);
 +            } catch (Exception e) {
 +                LOG.error("mkAuthorizationHandler failed!", e);
 +            }
 +        }
 +        LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass,
authorizer);
 +        return authorizer;
 +    }
 +}


Mime
View raw message