storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [10/12] storm git commit: STORM-2018: Removed resource location support
Date Wed, 02 Nov 2016 23:48:44 GMT
STORM-2018: Removed resource location support


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0065ed9b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0065ed9b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0065ed9b

Branch: refs/heads/1.0.x-branch
Commit: 0065ed9b72d58294defdada2ef9f5dd95c150b57
Parents: c8210b8
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Thu Oct 27 13:36:00 2016 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Thu Oct 27 13:36:00 2016 -0500

----------------------------------------------------------------------
 .../storm/daemon/supervisor/BasicContainer.java | 83 +-------------------
 .../apache/storm/localizer/AsyncLocalizer.java  | 12 ---
 .../org/apache/storm/integration_test.clj       |  2 +-
 3 files changed, 3 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0065ed9b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
index daa1d00..7fe57bb 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -376,14 +376,12 @@ public class BasicContainer extends Container {
     /**
      * Compute the classpath for the worker process
      * @param stormJar the topology jar
-     * @param dependencyLocations any dependencies from the topology
      * @return the full classpath
      */
-    protected String getWorkerClassPath(String stormJar, List<String> dependencyLocations)
{
+    protected String getWorkerClassPath(String stormJar) {
         List<String> workercp = new ArrayList<>();
         workercp.addAll(frameworkClasspath());
         workercp.add(stormJar);
-        workercp.addAll(dependencyLocations);
         workercp.addAll(asStringList(_topoConf.get(Config.TOPOLOGY_CLASSPATH)));
         return CPJ.join(workercp);
     }
@@ -470,82 +468,6 @@ public class BasicContainer extends Container {
         return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml";
     }
     
-    private static class DependencyLocations {
-        private List<String> _data = null;
-        private final Map<String, Object> _conf;
-        private final String _topologyId;
-        private final AdvancedFSOps _ops;
-        private final String _stormRoot;
-        
-        public DependencyLocations(final Map<String, Object> conf, final String topologyId,
final AdvancedFSOps ops, final String stormRoot) {
-            _conf = conf;
-            _topologyId = topologyId;
-            _ops = ops;
-            _stormRoot = stormRoot;
-        }
-        
-        public String toString() {
-            List<String> data;
-            synchronized(this) {
-                data = _data;
-            }
-            return "DEP_LOCS for " + _topologyId +" => " + data;
-        }
-        
-        public synchronized List<String> get() throws IOException {
-            if (_data != null) {
-                return _data;
-            }
-            final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf,
_topologyId, _ops);
-            final List<String> dependencyLocations = new ArrayList<>();
-            if (stormTopology.get_dependency_jars() != null) {
-                for (String dependency : stormTopology.get_dependency_jars()) {
-                    dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
-                }
-            }
-
-            if (stormTopology.get_dependency_artifacts() != null) {
-                for (String dependency : stormTopology.get_dependency_artifacts()) {
-                    dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath());
-                }
-            }
-            _data = dependencyLocations;
-            return _data;
-        }
-    }
-
-    static class DepLRUCache {
-        public final int _maxSize = 100; //We could make this configurable in the future...
-        
-        @SuppressWarnings("serial")
-        private LinkedHashMap<String, DependencyLocations> _cache = new LinkedHashMap<String,
DependencyLocations>() {
-            @Override
-            protected boolean removeEldestEntry(Map.Entry<String,DependencyLocations>
eldest) {
-                return (size() > _maxSize);
-            }
-        };
-        
-        public synchronized DependencyLocations get(final Map<String, Object> conf,
final String topologyId, final AdvancedFSOps ops, String stormRoot) {
-            //Only go off of the topology id for now.
-            DependencyLocations dl = _cache.get(topologyId);
-            if (dl == null) {
-                _cache.put(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot));
-                dl = _cache.get(topologyId);
-            }
-            return dl;
-        }
-        
-        public synchronized void clear() {
-            _cache.clear();
-        }
-    }
-    
-    static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache();
-    
-    public static List<String> getDependencyLocationsFor(final Map<String, Object>
conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException
{
-        return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get();
-    }
-    
     /**
      * Get parameters for the class path of the worker process.  Also used by the
      * log Writer
@@ -555,8 +477,7 @@ public class BasicContainer extends Container {
      */
     private List<String> getClassPathParams(final String stormRoot) throws IOException
{
         final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot);
-        final List<String> dependencyLocations = getDependencyLocationsFor(_conf, _topologyId,
_ops, stormRoot);
-        final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations);
+        final String workerClassPath = getWorkerClassPath(stormJar);
         
         List<String> classPathParams = new ArrayList<>();
         classPathParams.add("-cp");

http://git-wip-us.apache.org/repos/asf/storm/blob/0065ed9b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
index d3e3925..9361107 100644
--- a/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java
@@ -231,18 +231,6 @@ public class AsyncLocalizer implements ILocalizer, Shutdownable {
                     }
                 }
 
-                StormTopology stormCode = ConfigUtils.readSupervisorTopology(_conf, _topologyId,
_fsOps);
-                List<String> dependencies = new ArrayList<>();
-                if (stormCode.is_set_dependency_jars()) {
-                    dependencies.addAll(stormCode.get_dependency_jars());
-                }
-                if (stormCode.is_set_dependency_artifacts()) {
-                    dependencies.addAll(stormCode.get_dependency_artifacts());
-                }
-                for (String dependency : dependencies) {
-                    localResourceList.add(new LocalResource(dependency, false));
-                }
-
                 if (!localResourceList.isEmpty()) {
                     File userDir = _localizer.getLocalUserFileCacheDir(user);
                     if (!_fsOps.fileExists(userDir)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/0065ed9b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 775949e..87fe9b0 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -154,7 +154,7 @@
                      {"2" (thrift/mk-bolt-spec {"1" :global} extend-timeout-twice)})]
     (submit-local-topology (:nimbus cluster)
                            "timeout-tester"
-                           {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
+                           {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10 TOPOLOGY-DEBUG true}
                            topology)
     (advance-cluster-time cluster 11)
     (.feed feeder ["a"] 1)


Mime
View raw message