storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srihar...@apache.org
Subject [1/3] storm git commit: STORM-2321 Handle blobstore zk key deletion in KeySequenceNumber
Date Thu, 02 Feb 2017 17:20:18 GMT
Repository: storm
Updated Branches:
  refs/heads/master 1445a955a -> 4e1572e29


STORM-2321 Handle blobstore zk key deletion in KeySequenceNumber

* If NoNodeException is thrown in getKeySequenceNumber, treat it as KeyNotFoundException
* Change callers to handle KeyNotFoundException accordingly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ff720438
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ff720438
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ff720438

Branch: refs/heads/master
Commit: ff7204385976e2de8099b5fffa858ae29db8c5a8
Parents: 4c5e34e
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Tue Jan 31 17:00:31 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Tue Jan 31 17:00:31 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/blobstore/BlobStoreUtils.java  |  4 ++--
 .../storm/blobstore/BlobSynchronizer.java       | 13 +++++++++---
 .../storm/blobstore/KeySequenceNumber.java      | 21 ++++++++++++++------
 .../storm/blobstore/LocalFsBlobStore.java       |  4 +++-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 11 +++++++---
 5 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ff720438/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index ba3ccdb..6486ba2 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -240,7 +240,7 @@ public class BlobStoreUtils {
             LOG.debug("StateInfo for update {}", stateInfo);
             Set<NimbusInfo> nimbusInfoList = getNimbodesWithLatestSequenceNumberOfBlob(zkClient,
key);
 
-            for (NimbusInfo nimbusInfo:nimbusInfoList) {
+            for (NimbusInfo nimbusInfo : nimbusInfoList) {
                 if (nimbusInfo.getHost().equals(nimbusDetails.getHost())) {
                     isListContainsCurrentNimbusInfo = true;
                     break;
@@ -251,7 +251,7 @@ public class BlobStoreUtils {
                 LOG.debug("Updating state inside zookeeper for an update");
                 createStateInZookeeper(conf, key, nimbusDetails);
             }
-        } catch (NoNodeException e) {
+        } catch (NoNodeException | KeyNotFoundException e) {
             //race condition with a delete
             return;
         } catch (Exception exp) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ff720438/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index 3d26a97..b6a500a 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.blobstore;
 
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
@@ -79,9 +80,15 @@ public class BlobSynchronizer {
             LOG.debug("Key set Blobstore-> Zookeeper-> DownloadSet {}-> {}->
{}", getBlobStoreKeySet(), getZookeeperKeySet(), keySetToDownload);
 
             for (String key : keySetToDownload) {
-                Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient,
key);
-                if(BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet))
{
-                    BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+                try {
+                    Set<NimbusInfo> nimbusInfoSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient,
key);
+                    if (BlobStoreUtils.downloadMissingBlob(conf, blobStore, key, nimbusInfoSet))
{
+                        BlobStoreUtils.createStateInZookeeper(conf, key, nimbusInfo);
+                    }
+                } catch (KeyNotFoundException e) {
+                    LOG.debug("Detected deletion for the key {} - deleting the blob instead",
key);
+                    // race condition with a delete, delete the blob in key instead
+                    blobStore.deleteBlob(key, BlobStoreUtils.getNimbusSubject());
                 }
             }
             if (zkClient !=null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ff720438/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index 2425761..adbd4c4 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,10 +18,12 @@
 
 package org.apache.storm.blobstore;
 
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,12 +132,12 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public synchronized int getKeySequenceNumber(Map conf) {
+    public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
         CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {
             // Key has not been created yet and it is the first time it is being created
-            if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
+            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
                 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                         .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE
+ "/" + key);
                 zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
@@ -148,7 +150,7 @@ public class KeySequenceNumber {
             // if all go down which is unlikely. Hence there might be a need to update the
blob if all go down.
             List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE
+ "/" + key);
             LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(),
stateInfoList);
-            if(stateInfoList.isEmpty()) {
+            if (stateInfoList.isEmpty()) {
                 return getMaxSequenceNumber(zkClient);
             }
 
@@ -156,7 +158,7 @@ public class KeySequenceNumber {
             // In all other cases check for the latest update sequence of the blob on the
nimbus
             // and assign the appropriate number. Check if all are have same sequence number,
             // if not assign the highest sequence number.
-            for (String stateInfo:stateInfoList) {
+            for (String stateInfo : stateInfoList) {
                 sequenceNumbers.add(Integer.parseInt(BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo(stateInfo)
                         .getSequenceNumber()));
             }
@@ -195,15 +197,22 @@ public class KeySequenceNumber {
                     return sequenceNumbers.first() + 1;
                 }
             }
+
+            // Normal create update sync scenario returns the greatest sequence number in
the set
+            return sequenceNumbers.last();
+        } catch (KeeperException.NoNodeException e) {
+            // there's a race condition with a delete: either blobstore or blobstoremaxsequence
+            // this should be thrown to the caller to indicate that the key is invalid now
+            throw new KeyNotFoundException(key);
         } catch(Exception e) {
+            // in other case, just set this to 0 to trigger re-sync later
             LOG.error("Exception {}", e);
+            return INITIAL_SEQUENCE_NUMBER - 1;
         } finally {
             if (zkClient != null) {
                 zkClient.close();
             }
         }
-        // Normal create update sync scenario returns the greatest sequence number in the
set
-        return sequenceNumbers.last();
     }
 
     private boolean checkIfStateContainsCurrentNimbusHost(List<String> stateInfoList,
NimbusInfo nimbusInfo) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ff720438/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index 8df981b..266d4b7 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -287,7 +287,7 @@ public class LocalFsBlobStore extends BlobStore {
     }
 
     //This additional check and download is for nimbus high availability in case you have
more than one nimbus
-    public synchronized boolean checkForBlobOrDownload(String key) {
+    public synchronized boolean checkForBlobOrDownload(String key) throws KeyNotFoundException
{
         boolean checkBlobDownload = false;
         try {
             List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -301,6 +301,8 @@ public class LocalFsBlobStore extends BlobStore {
                     }
                 }
             }
+        } catch (KeyNotFoundException e) {
+            throw e;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/ff720438/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
index a76a912..81b52e7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -553,7 +553,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return ret;
     }
     
-    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, Map<String,
Object> conf) {
+    private static int getVersionForKey(String key, NimbusInfo nimbusInfo, Map<String,
Object> conf) throws KeyNotFoundException {
         KeySequenceNumber kseq = new KeySequenceNumber(key, nimbusInfo);
         return kseq.getKeySequenceNumber(conf);
     }
@@ -2085,7 +2085,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
         LOG.debug("Creating list of key entries for blobstore inside zookeeper {} local {}",
activeKeys, activeLocalKeys);
         for (String key: activeLocalKeys) {
-            state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, nimbusInfo, conf));
+            try {
+                state.setupBlobstore(key, nimbusInfo, getVersionForKey(key, nimbusInfo, conf));
+            } catch (KeyNotFoundException e) {
+                // invalid key, remove it from blobstore
+                store.deleteBlob(key, NIMBUS_SUBJECT);
+            }
         }
     }
 
@@ -3137,7 +3142,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             LOG.debug("Created state in zookeeper {} {} {}", state, store, ni);
         } catch (Exception e) {
-            LOG.warn("Begin file upload exception", e);
+            LOG.warn("Exception while creating state in zookeeper - key: " + key, e);
             if (e instanceof TException) {
                 throw (TException)e;
             }


Mime
View raw message