storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: Let Topology query if blobs have changed
Date Tue, 06 Jun 2017 16:17:13 GMT
Repository: storm
Updated Branches:
  refs/heads/master 5befe2746 -> 7f3344747


Let Topology query if blobs have changed


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5cba2319
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5cba2319
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5cba2319

Branch: refs/heads/master
Commit: 5cba2319d9b312f5698fac8eb96ad132b426028e
Parents: 5befe27
Author: Govind Menon <govindappumenon@gmail.com>
Authored: Fri Jun 2 16:55:19 2017 -0500
Committer: Govind Menon <govindappumenon@gmail.com>
Committed: Mon Jun 5 13:30:45 2017 -0500

----------------------------------------------------------------------
 .../bolt/format/TestSimpleFileNameFormat.java   |  2 +-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  2 +-
 .../src/jvm/org/apache/storm/Config.java        |  8 ++++
 .../src/jvm/org/apache/storm/daemon/Task.java   |  2 +
 .../org/apache/storm/daemon/worker/Worker.java  | 49 +++++++++++++++++++-
 .../apache/storm/daemon/worker/WorkerState.java |  7 +++
 .../org/apache/storm/task/TopologyContext.java  |  9 +++-
 .../jvm/org/apache/storm/utils/ConfigUtils.java | 30 ++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   | 25 ++++++++++
 .../src/main/java/org/apache/storm/Testing.java |  1 +
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  2 +-
 .../storm/daemon/supervisor/Container.java      |  2 +-
 .../daemon/supervisor/SupervisorUtils.java      |  4 +-
 .../org/apache/storm/localizer/Localizer.java   | 21 +++------
 .../org/apache/storm/utils/ServerUtils.java     | 43 ++---------------
 .../apache/storm/localizer/LocalizerTest.java   | 17 +++----
 16 files changed, 153 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
index 4c90295..051e4b7 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
@@ -74,6 +74,6 @@ public class TestSimpleFileNameFormat {
     private TopologyContext createTopologyContext(){
     	Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
         taskToComponent.put(7, "Xcom");
-    	return new TopologyContext(null, null, taskToComponent, null, null, null, null, null,
7, 6703, null, null, null, null, null, null);
+    	return new TopologyContext(null, null, taskToComponent, null, null, null, null, null,
null, 7, 6703, null, null, null, null, null, null);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index f346aef..01970de 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -746,7 +746,7 @@ public class TestHdfsSpout {
 
     public MockTopologyContext(int componentId) {
       // StormTopology topology, Map<String, Object> topoConf, Map<Integer, String>
taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String,
Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String
pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String,
Object> defaultResources, Map<String, Object> userResources, Map<String, Object>
executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics,
Atom openOrPrepareWasCalled
-      super(null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null);
+      super(null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null);
       this.componentId = componentId;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index b1f0381..9d659d5 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1526,6 +1526,14 @@ public class Config extends HashMap<String, Object> {
     @isPositiveNumber
     public static final String NUM_STAT_BUCKETS = "num.stat.buckets";
 
+    /**
+     * Interval to check for the worker to check for updated blobs and refresh worker state
accordingly.
+     * The default is 10 seconds
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS = "worker.blob.update.poll.interval.secs";
+
     public static void setClasspath(Map<String, Object> conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 5b180c8..5c28085 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -185,6 +185,8 @@ public class Task {
             workerData.getTaskToComponent(),
             workerData.getComponentToSortedTasks(),
             workerData.getComponentToStreamToFields(),
+            // This is updated by the Worker and the topology has shared access to it
+            workerData.getBlobToLastKnownVersion(),
             workerData.getTopologyId(),
             ConfigUtils.supervisorStormResourcesPath(
                     ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 5e49593..59e86c7 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
@@ -79,6 +81,7 @@ import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 public class Worker implements Shutdownable, DaemonCommon {
 
     private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+    private static final Pattern BLOB_VERSION_EXTRACTION = Pattern.compile(".*\\.([0-9]+)$");
     private final Map<String, Object> conf;
     private final IContext context;
     private final String topologyId;
@@ -246,7 +249,20 @@ public class Worker implements Shutdownable, DaemonCommon {
                             }
                         }
                     });
-              
+
+                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS,
10), new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    LOG.debug("Checking if blobs have updated");
+                                    updateBlobUpdates();
+                                } catch (IOException e) {
+                                    // IOException from reading the version files to be ignored
+                                    LOG.error(e.getStackTrace().toString());
+                                }
+                            }
+                        });
+
                 // The jitter allows the clients to get the data at different times, and
avoids thundering herd
                 if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING))
{
                     workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
@@ -299,6 +315,35 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
     }
 
+    public Map<String, Long> getCurrentBlobVersions() throws IOException {
+        Map<String, Long> results = new HashMap<>();
+        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String,
Object>>) workerState.getTopologyConf().get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        if (blobstoreMap != null) {
+            String stormRoot = ConfigUtils.supervisorStormDistRoot(workerState.getTopologyConf(),
workerState.getTopologyId());
+            for (Map.Entry<String, Map<String, Object>> entry : blobstoreMap.entrySet())
{
+                String localFileName = entry.getKey();
+                Map<String, Object> blobInfo = entry.getValue();
+                if (blobInfo != null && blobInfo.containsKey("localname")) {
+                    localFileName = (String) blobInfo.get("localname");
+                }
+
+                String blobWithVersion = new File(stormRoot, localFileName).getCanonicalFile().getName();
+                Matcher m = BLOB_VERSION_EXTRACTION.matcher(blobWithVersion);
+                if (m.matches()) {
+                    results.put(localFileName, Long.valueOf(m.group(1)));
+                }
+            }
+        }
+        return results;
+    }
+
+    public void updateBlobUpdates() throws IOException {
+        Map<String, Long> latestBlobVersions = getCurrentBlobVersions();
+        workerState.blobToLastKnownVersion.putAll(latestBlobVersions);
+        LOG.debug("Latest versions for blobs {}", latestBlobVersions);
+    }
+
+
     public void checkCredentialsChanged() {
         Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
         if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
@@ -409,6 +454,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             workerState.heartbeatTimer.close();
             workerState.refreshConnectionsTimer.close();
             workerState.refreshCredentialsTimer.close();
+            workerState.checkForUpdatedBlobsTimer.close();
             workerState.refreshBackpressureTimer.close();
             workerState.refreshActiveTimer.close();
             workerState.executorHeartbeatTimer.close();
@@ -437,6 +483,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             && workerState.refreshConnectionsTimer.isTimerWaiting()
             && workerState.refreshLoadTimer.isTimerWaiting()
             && workerState.refreshCredentialsTimer.isTimerWaiting()
+            && workerState.checkForUpdatedBlobsTimer.isTimerWaiting()
             && workerState.refreshBackpressureTimer.isTimerWaiting()
             && workerState.refreshActiveTimer.isTimerWaiting()
             && workerState.executorHeartbeatTimer.isTimerWaiting()

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 55def51..1ab1255 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -142,6 +144,8 @@ public class WorkerState {
         return componentToSortedTasks;
     }
 
+    public Map<String, Long> getBlobToLastKnownVersion() {return blobToLastKnownVersion;}
+
     public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket()
{
         return cachedNodeToPortSocket;
     }
@@ -191,6 +195,7 @@ public class WorkerState {
     final Map<Integer, String> taskToComponent;
     final Map<String, Map<String, Fields>> componentToStreamToFields;
     final Map<String, List<Integer>> componentToSortedTasks;
+    final ConcurrentMap<String, Long> blobToLastKnownVersion;
     final ReentrantReadWriteLock endpointSocketLock;
     final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
     final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
@@ -245,6 +250,7 @@ public class WorkerState {
     final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
     final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer");
     final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer");
+    final StormTimer checkForUpdatedBlobsTimer = mkHaltingTimer("check-for-updated-blobs-timer");
     final StormTimer resetLogLevelsTimer = mkHaltingTimer("reset-log-levels-timer");
     final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer");
     final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer");
@@ -284,6 +290,7 @@ public class WorkerState {
         this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, executors);
         this.shortExecutorReceiveQueueMap = new HashMap<>();
         this.taskIds = new ArrayList<>();
+        this.blobToLastKnownVersion = new ConcurrentHashMap<>();
         for (Map.Entry<List<Long>, DisruptorQueue> entry : executorReceiveQueueMap.entrySet())
{
             this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
             this.taskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
index 4c226ef..601d70f 100644
--- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
+++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
@@ -57,13 +57,15 @@ public class TopologyContext extends WorkerTopologyContext implements
IMetricsCo
     private Map<String, Object> _executorData;
     private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
     private AtomicBoolean _openOrPrepareWasCalled;
-
+    // This is updated by the Worker and the topology has shared access to it
+    private Map<String, Long> blobToLastKnownVersion;
 
     public TopologyContext(StormTopology topology,
                            Map<String, Object> topoConf,
                            Map<Integer, String> taskToComponent,
                            Map<String, List<Integer>> componentToSortedTasks,
                            Map<String, Map<String, Fields>> componentToStreamToFields,
+                           Map<String, Long> blobToLastKnownVersionShared,
                            String stormId,
                            String codeDir,
                            String pidDir,
@@ -82,6 +84,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo
         _executorData = executorData;
         _registeredMetrics = registeredMetrics;
         _openOrPrepareWasCalled = openOrPrepareWasCalled;
+        blobToLastKnownVersion = blobToLastKnownVersionShared;
     }
 
 
@@ -144,6 +147,10 @@ public class TopologyContext extends WorkerTopologyContext implements
IMetricsCo
         throw new NotImplementedException();
     }
 
+    public Map<String, Long> getBlobToLastKnownVersion() {
+        return blobToLastKnownVersion;
+    }
+
     /**
      * Gets the task id of this task.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
index 0101bbf..911d015 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.utils;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
@@ -30,7 +31,10 @@ import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 public class ConfigUtils {
     public static final String FILE_SEPARATOR = File.separator;
@@ -67,6 +71,32 @@ public class ConfigUtils {
         return true;
     }
 
+    /**
+     * Returns a Collection of file names found under the given directory.
+     * @param dir a directory
+     * @return the Collection of file names
+     */
+    public static Collection<String> readDirContents(String dir) {
+        Collection<File> ret = readDirFiles(dir);
+        return ret.stream().map( car -> car.getName() ).collect( Collectors.toList() );
+    }
+
+    /**
+     * Returns a Collection of files found under the given directory.
+     * @param dir a directory
+     * @return the Collection of file names
+     */
+    public static Collection<File> readDirFiles(String dir) {
+        Collection<File> ret = new HashSet<>();
+        File[] files = new File(dir).listFiles();
+        if (files != null) {
+            for (File f: files) {
+                ret.add(f);
+            }
+        }
+        return ret;
+    }
+
     // we use this "weird" wrapper pattern temporarily for mocking in clojure test
     public static String workerArtifactsRoot(Map<String, Object> conf) {
         return _instance.workerArtifactsRootImpl(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index bc1426f..26043d1 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1046,6 +1046,31 @@ public class Utils {
         return dump.toString();
     }
 
+    public static long getVersionFromBlobVersionFile(File versionFile) {
+        long currentVersion = 0;
+        if (versionFile.exists() && !(versionFile.isDirectory())) {
+            BufferedReader br = null;
+            try {
+                br = new BufferedReader(new FileReader(versionFile));
+                String line = br.readLine();
+                currentVersion = Long.parseLong(line);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    if (br != null) {
+                        br.close();
+                    }
+                } catch (Exception ignore) {
+                    LOG.error("Exception trying to cleanup", ignore);
+                }
+            }
+            return currentVersion;
+        } else {
+            return -1;
+        }
+    }
+
     public static boolean checkDirExists(String dir) {
         File file = new File(dir);
         return file.isDirectory();

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/Testing.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/Testing.java b/storm-server/src/main/java/org/apache/storm/Testing.java
index 4acfc12..d50f792 100644
--- a/storm-server/src/main/java/org/apache/storm/Testing.java
+++ b/storm-server/src/main/java/org/apache/storm/Testing.java
@@ -700,6 +700,7 @@ public class Testing {
                 taskToComp,
                 null,
                 compToStreamToFields,
+                null,
                 "test-storm-id",
                 null,
                 null,

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index de29eb1..5b78bd2 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2369,7 +2369,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
                 exec.prepare();
             }
-            
+
             if (isLeader()) {
                 for (String topoId: state.activeStorms()) {
                     transition(topoId, TopologyActions.STARTUP, null);

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 1bf1f58..f878507 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -449,7 +449,7 @@ public abstract class Container implements Killable {
      */
     protected Set<Long> getAllPids() throws IOException {
         Set<Long> ret = new HashSet<>();
-        for (String listing: ServerUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf,
_workerId))) {
+        for (String listing: ConfigUtils.readDirContents(ConfigUtils.workerPidsRoot(_conf,
_workerId))) {
             ret.add(Long.valueOf(listing));
         }
         

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index da8591a..8582fb4 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -115,7 +115,7 @@ public class SupervisorUtils {
     public static Set<String> readDownloadedTopologyIds(Map<String, Object> conf)
throws IOException {
         Set<String> stormIds = new HashSet<>();
         String path = ConfigUtils.supervisorStormDistRoot(conf);
-        Collection<String> rets = ServerUtils.readDirContents(path);
+        Collection<String> rets = ConfigUtils.readDirContents(path);
         for (String ret : rets) {
             stormIds.add(URLDecoder.decode(ret));
         }
@@ -124,7 +124,7 @@ public class SupervisorUtils {
 
     public static Collection<String> supervisorWorkerIds(Map<String, Object>
conf) {
         String workerRoot = ConfigUtils.workerRoot(conf);
-        return ServerUtils.readDirContents(workerRoot);
+        return ConfigUtils.readDirContents(workerRoot);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
index be794d5..73db899 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/Localizer.java
@@ -23,6 +23,7 @@ import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ShellUtils.ExitCodeException;
@@ -41,10 +42,11 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
+import java.util.ArrayList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -65,9 +67,8 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
  */
 public class Localizer {
   public static final Logger LOG = LoggerFactory.getLogger(Localizer.class);
-  public static final String USERCACHE = "usercache";
   public static final String FILECACHE = "filecache";
-
+  public static final String USERCACHE = "usercache";
   // sub directories to store either files or uncompressed archives respectively
   public static final String FILESDIR = "files";
   public static final String ARCHIVESDIR = "archives";
@@ -198,15 +199,6 @@ public class Localizer {
     }
   }
 
-  protected File[] readDirContents(String location) {
-    File dir = new File(location);
-    File[] files = null;
-    if (dir.exists()) {
-      files = dir.listFiles();
-    }
-    return files;
-  }
-
   // Looks for files in the directory with .current suffix
   protected File[] readCurrentBlobs(String location) {
     File dir = new File(location);
@@ -226,9 +218,8 @@ public class Localizer {
   protected void reconstructLocalizedResources() {
     try {
       LOG.info("Reconstruct localized resource: " + getUserCacheDir().getPath());
-      File[] users = readDirContents(getUserCacheDir().getPath());
-
-      if (users != null) {
+      Collection<File> users = ConfigUtils.readDirFiles(getUserCacheDir().getPath());
+      if (!(users == null || users.isEmpty())) {
         for (File userDir : users) {
           String user = userDir.getName();
           LOG.debug("looking in: {} for user: {}", userDir.getPath(), user);

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 0690dfe..16b841b 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -41,6 +41,7 @@ import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.thrift.TException;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,10 +84,10 @@ public class ServerUtils {
     public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator");
     public static final String CLASS_PATH_SEPARATOR = System.getProperty("path.separator");
     public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS"));
-    public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
     public static final String CURRENT_BLOB_SUFFIX_ID = "current";
 
     public static final String DEFAULT_CURRENT_BLOB_SUFFIX = "." + CURRENT_BLOB_SUFFIX_ID;
+    public static final String DEFAULT_BLOB_VERSION_SUFFIX = ".version";
     public static final int SIGKILL = 9;
     public static final int SIGTERM = 15;
     /**
@@ -214,29 +215,7 @@ public class ServerUtils {
     }
 
     public static long localVersionOfBlob(String localFile) {
-        File f = new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX);
-        long currentVersion = 0;
-        if (f.exists() && !(f.isDirectory())) {
-            BufferedReader br = null;
-            try {
-                br = new BufferedReader(new FileReader(f));
-                String line = br.readLine();
-                currentVersion = Long.parseLong(line);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            } finally {
-                try {
-                    if (br != null) {
-                        br.close();
-                    }
-                } catch (Exception ignore) {
-                    LOG.error("Exception trying to cleanup", ignore);
-                }
-            }
-            return currentVersion;
-        } else {
-            return -1;
-        }
+        return Utils.getVersionFromBlobVersionFile(new File(localFile + DEFAULT_BLOB_VERSION_SUFFIX));
     }
 
     public static String constructBlobWithVersionFileName(String fileName, long version)
{
@@ -309,22 +288,6 @@ public class ServerUtils {
         return Files.getOwner(FileSystems.getDefault().getPath(path)).getName();
     }
 
-    /**
-     * Returns a Collection of file names found under the given directory.
-     * @param dir a directory
-     * @return the Collection of file names
-     */
-    public static Collection<String> readDirContents(String dir) {
-        Collection<String> ret = new HashSet<>();
-        File[] files = new File(dir).listFiles();
-        if (files != null) {
-            for (File f: files) {
-                ret.add(f.getName());
-            }
-        }
-        return ret;
-    }
-
     public static Localizer createLocalizer(Map<String, Object> conf, String baseDir)
{
         return new Localizer(conf, baseDir);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5cba2319/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
index 5814c43..dd64069 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/LocalizerTest.java
@@ -46,6 +46,7 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.*;
 
+import static org.apache.storm.localizer.Localizer.USERCACHE;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -130,7 +131,7 @@ public class LocalizerTest {
   }
 
   public String constructUserCacheDir(String base, String user) {
-    return joinPath(base, Localizer.USERCACHE, user);
+    return joinPath(base, USERCACHE, user);
   }
 
   public String constructExpectedFilesDir(String base, String user) {
@@ -295,7 +296,7 @@ public class LocalizerTest {
         user1Dir);
     long timeAfter = System.nanoTime();
 
-    String expectedUserDir = joinPath(baseDir.toString(), Localizer.USERCACHE, user1);
+    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
     String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.ARCHIVESDIR);
     assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
     File keyFile = new File(expectedFileDir, key1 + ".0");
@@ -371,7 +372,7 @@ public class LocalizerTest {
         user1Dir);
     long timeAfter = System.nanoTime();
 
-    String expectedUserDir = joinPath(baseDir.toString(), Localizer.USERCACHE, user1);
+    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
     String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
     assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
     File keyFile = new File(expectedFileDir, key1);
@@ -444,7 +445,7 @@ public class LocalizerTest {
     LocalizedResource lrsrc2 = lrsrcs.get(1);
     LocalizedResource lrsrc3 = lrsrcs.get(2);
 
-    String expectedFileDir = joinPath(baseDir.toString(), Localizer.USERCACHE, user1,
+    String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
         Localizer.FILECACHE, Localizer.FILESDIR);
     assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
     File keyFile = new File(expectedFileDir, key1 + ServerUtils.DEFAULT_CURRENT_BLOB_SUFFIX);
@@ -579,11 +580,11 @@ public class LocalizerTest {
     LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false), user3,
         topo3, user3Dir);
 
-    String expectedUserDir1 = joinPath(baseDir.toString(), Localizer.USERCACHE, user1);
+    String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
     String expectedFileDirUser1 = joinPath(expectedUserDir1, Localizer.FILECACHE, Localizer.FILESDIR);
-    String expectedFileDirUser2 = joinPath(baseDir.toString(), Localizer.USERCACHE, user2,
+    String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
         Localizer.FILECACHE, Localizer.FILESDIR);
-    String expectedFileDirUser3 = joinPath(baseDir.toString(), Localizer.USERCACHE, user3,
+    String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
         Localizer.FILECACHE, Localizer.FILESDIR);
     assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
     assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
@@ -645,7 +646,7 @@ public class LocalizerTest {
     LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false), user1, topo1,
         user1Dir);
 
-    String expectedUserDir = joinPath(baseDir.toString(), Localizer.USERCACHE, user1);
+    String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
     String expectedFileDir = joinPath(expectedUserDir, Localizer.FILECACHE, Localizer.FILESDIR);
     assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
     File keyFile = new File(expectedFileDir, key1);


Mime
View raw message