storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/5] storm git commit: Provide Socket time out for nimbus thrift client
Date Wed, 04 Jan 2017 08:58:32 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch bd93b7f7b -> 3129bfa37


Provide Socket time out for nimbus thrift client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/209e2cd5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/209e2cd5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/209e2cd5

Branch: refs/heads/1.x-branch
Commit: 209e2cd504f1e7078451b919ae7b765bcf2fe4a2
Parents: bd93b7f
Author: Sanket <schintap@untilservice-lm>
Authored: Wed Dec 21 15:08:12 2016 -0600
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Wed Jan 4 17:35:10 2017 +0900

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 storm-core/src/jvm/org/apache/storm/Config.java |  7 +++++
 .../security/auth/SaslTransportPlugin.java      | 28 ++++++++++++--------
 .../security/auth/ThriftConnectionType.java     | 19 +++++++++----
 4 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/209e2cd5/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 041a628..a8a9ce6 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -40,6 +40,7 @@ storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list"
 storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin"
+storm.thrift.socket.timeout.ms: 600000
 storm.principal.tolocal: "org.apache.storm.security.auth.DefaultPrincipalToLocal"
 storm.group.mapping.service: "org.apache.storm.security.auth.ShellBasedGroupsMapping"
 storm.group.mapping.service.params: null

http://git-wip-us.apache.org/repos/asf/storm/blob/209e2cd5/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index 41b3f67..1b9725e 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -484,6 +484,13 @@ public class Config extends HashMap<String, Object> {
     public static final String NIMBUS_THRIFT_TRANSPORT_PLUGIN = "nimbus.thrift.transport";
 
     /**
+     * How long before a Thrift Client socket hangs before timeout
+     * and restart the socket.
+     */
+    @isInteger
+    public static final String STORM_THRIFT_SOCKET_TIMEOUT_MS = "storm.thrift.socket.timeout.ms";
+
+    /**
      * The host that the master server is running on, added only for backward compatibility,
      * the usage deprecated in favor of nimbus.seeds config.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/209e2cd5/storm-core/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java b/storm-core/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
index cc840dd..cad2b30 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
@@ -64,8 +64,14 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
     @Override
     public TServer getServer(TProcessor processor) throws IOException, TTransportException
{
         int port = type.getPort(storm_conf);
+        Integer socketTimeout = type.getSocketTimeOut(storm_conf);
         TTransportFactory serverTransportFactory = getServerTransportFactory();
-        TServerSocket serverTransport = new TServerSocket(port);
+        TServerSocket serverTransport = null;
+        if (socketTimeout != null) {
+            serverTransport = new TServerSocket(port, socketTimeout);
+        } else {
+            serverTransport = new TServerSocket(port);
+        }
         int numWorkerThreads = type.getNumThreads(storm_conf);
         Integer queueSize = type.getQueueSize(storm_conf);
 
@@ -96,12 +102,12 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
     protected abstract TTransportFactory getServerTransportFactory() throws IOException;
 
 
-    /**                                                                                 
                                                                                         
 
-     * Processor that pulls the SaslServer object out of the transport, and             
                                                                                         
 
-     * assumes the remote user's UGI before calling through to the original             
                                                                                         
 
-     * processor.                                                                       
                                                                                         
 
-     *                                                                                  
                                                                                         
 
-     * This is used on the server side to set the UGI for each specific call.           
                                                                                         
 
+    /**
+     * Processor that pulls the SaslServer object out of the transport, and
+     * assumes the remote user's UGI before calling through to the original
+     * processor.
+     *
+     * This is used on the server side to set the UGI for each specific call.
      */
     private static class TUGIWrapProcessor implements TProcessor {
         final TProcessor wrapped;
@@ -111,7 +117,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
         }
 
         public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException
{
-            //populating request context 
+            //populating request context
             ReqContext req_context = ReqContext.context();
 
             TTransport trans = inProt.getTransport();
@@ -127,7 +133,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
             Socket socket = tsocket.getSocket();
             req_context.setRemoteAddress(socket.getInetAddress());
 
-            //remote subject 
+            //remote subject
             SaslServer saslServer = saslTrans.getSaslServer();
             String authId = saslServer.getAuthorizationID();
             Subject remoteUser = new Subject();
@@ -146,8 +152,8 @@ public abstract class SaslTransportPlugin implements ITransportPlugin
{
             this.name =  name;
         }
 
-        /**                                                                             
                                                                                         
        
-         * Get the full name of the user.                                               
                                                                                         
        
+        /**
+         * Get the full name of the user.
          */
         public String getName() {
             return name;

http://git-wip-us.apache.org/repos/asf/storm/blob/209e2cd5/storm-core/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
index f627956..bc7c966 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
@@ -27,25 +27,27 @@ import java.util.Map;
  */
 public enum ThriftConnectionType {
     NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, Config.NIMBUS_QUEUE_SIZE,
-         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE),
+         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Config.STORM_THRIFT_SOCKET_TIMEOUT_MS),
     DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, Config.DRPC_QUEUE_SIZE,
-         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE),
+         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null),
     DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_INVOCATIONS_PORT,
null,
-         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE);
+         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null);
 
     private final String _transConf;
     private final String _portConf;
     private final String _qConf;
     private final String _threadsConf;
     private final String _buffConf;
+    private final String _socketTimeoutConf;
 
     ThriftConnectionType(String transConf, String portConf, String qConf,
-                         String threadsConf, String buffConf) {
+                         String threadsConf, String buffConf, String socketTimeoutConf) {
         _transConf = transConf;
         _portConf = portConf;
         _qConf = qConf;
         _threadsConf = threadsConf;
         _buffConf = buffConf;
+        _socketTimeoutConf = socketTimeoutConf;
     }
 
     public String getTransportPlugin(Map conf) {
@@ -67,7 +69,7 @@ public enum ThriftConnectionType {
         return (Integer)conf.get(_qConf);
     }
 
-    public int getNumThreads(Map conf) { 
+    public int getNumThreads(Map conf) {
         return Utils.getInt(conf.get(_threadsConf));
     }
 
@@ -80,4 +82,11 @@ public enum ThriftConnectionType {
     public int getMaxBufferSize(Map conf) {
         return Utils.getInt(conf.get(_buffConf));
     }
+
+    public Integer getSocketTimeOut(Map conf) {
+        if (_socketTimeoutConf == null) {
+            return null;
+        }
+        return Utils.getInt(conf.get(_socketTimeoutConf));
+    }
 }


Mime
View raw message