drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vita...@apache.org
Subject [05/13] drill git commit: DRILL-6053: Avoid excessive locking in LocalPersistentStore
Date Mon, 26 Mar 2018 11:35:00 GMT
DRILL-6053: Avoid excessive locking in LocalPersistentStore

closes #1163


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/590a72bc
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/590a72bc
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/590a72bc

Branch: refs/heads/master
Commit: 590a72bc667f6bc373130bcae58c22c11f13edaf
Parents: 9327ca6
Author: Vlad Rozov <vrozov@apache.org>
Authored: Tue Mar 13 10:56:52 2018 -0700
Committer: Vitalii Diravka <vitalii.diravka@gmail.com>
Committed: Sat Mar 24 20:35:32 2018 +0200

----------------------------------------------------------------------
 .../org/apache/drill/common/AutoCloseables.java |   5 +
 .../common/concurrent/AutoCloseableLock.java    |   6 +-
 .../fn/registry/FunctionRegistryHolder.java     |  24 +--
 .../fn/registry/RemoteFunctionRegistry.java     |   6 +-
 .../exec/rpc/control/CustomHandlerRegistry.java |   5 +-
 .../exec/store/sys/BasePersistentStore.java     |  22 ---
 .../drill/exec/store/sys/PersistentStore.java   |  61 +-----
 .../exec/store/sys/PersistentStoreProvider.java |   5 +-
 .../org/apache/drill/exec/store/sys/Store.java  |  58 ++++++
 .../store/sys/VersionedPersistentStore.java     |  57 ++++++
 .../exec/store/sys/store/InMemoryStore.java     |  85 ++-------
 .../store/sys/store/LocalPersistentStore.java   | 187 +++++++------------
 .../sys/store/VersionedDelegatingStore.java     | 120 ++++++++++++
 .../sys/store/ZookeeperPersistentStore.java     |   3 +-
 .../ZookeeperPersistentStoreProvider.java       |  20 ++
 .../exec/testing/store/NoWriteLocalStore.java   |  74 ++------
 .../drill/exec/work/batch/IncomingBuffers.java  |   4 +-
 .../exec/store/sys/TestPStoreProviders.java     |   6 +-
 .../drill/exec/memory/AllocationManager.java    |  11 +-
 19 files changed, 401 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index fcdfe14..c12063c 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -25,6 +25,11 @@ import java.util.Collection;
  */
 public class AutoCloseables {
 
+  public interface Closeable extends AutoCloseable {
+    @Override
+    void close();
+  }
+
   public static AutoCloseable all(final Collection<? extends AutoCloseable> autoCloseables) {
     return new AutoCloseable() {
       @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
index 91d50b4..3fe5c1e 100644
--- a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
+++ b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
@@ -19,10 +19,12 @@ package org.apache.drill.common.concurrent;
 
 import java.util.concurrent.locks.Lock;
 
+import org.apache.drill.common.AutoCloseables.Closeable;
+
 /**
  * Simple wrapper class that allows Locks to be released via an try-with-resources block.
  */
-public class AutoCloseableLock implements AutoCloseable {
+public class AutoCloseableLock implements Closeable {
 
   private final Lock lock;
 
@@ -30,7 +32,7 @@ public class AutoCloseableLock implements AutoCloseable {
     this.lock = lock;
   }
 
-  public AutoCloseableLock open() {
+  public Closeable open() {
     lock.lock();
     return this;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index 3124539..1ab6e19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -22,6 +22,8 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
+
+import org.apache.drill.common.AutoCloseables.Closeable;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 
@@ -104,7 +106,7 @@ public class FunctionRegistryHolder {
    * @return local function registry version number
    */
   public long getVersion() {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       return version;
     }
   }
@@ -121,7 +123,7 @@ public class FunctionRegistryHolder {
    * @param newJars jars and list of their function holders, each contains function name, signature and holder
    */
   public void addJars(Map<String, List<FunctionHolder>> newJars, long version) {
-    try (AutoCloseableLock lock = writeLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
       for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
         String jarName = newJar.getKey();
         removeAllByJar(jarName);
@@ -141,7 +143,7 @@ public class FunctionRegistryHolder {
    * @param jarName jar name to be removed
    */
   public void removeJar(String jarName) {
-    try (AutoCloseableLock lock = writeLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
       removeAllByJar(jarName);
     }
   }
@@ -153,7 +155,7 @@ public class FunctionRegistryHolder {
    * @return list of all jar names
    */
   public List<String> getAllJarNames() {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       return Lists.newArrayList(jars.keySet());
     }
   }
@@ -167,7 +169,7 @@ public class FunctionRegistryHolder {
    * @return list of functions names associated from the jar
    */
   public List<String> getFunctionNamesByJar(String jarName) {
-    try  (AutoCloseableLock lock = readLock.open()){
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()){
       Map<String, Queue<String>> functions = jars.get(jarName);
       return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet());
     }
@@ -184,7 +186,7 @@ public class FunctionRegistryHolder {
    * @return all functions which their holders
    */
   public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       if (version != null) {
         version.set(this.version);
       }
@@ -215,7 +217,7 @@ public class FunctionRegistryHolder {
    * @return all functions which their signatures
    */
   public ListMultimap<String, String> getAllFunctionsWithSignatures() {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create();
       for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
         functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet()));
@@ -235,7 +237,7 @@ public class FunctionRegistryHolder {
    * @return list of function holders
    */
   public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       if (version != null) {
         version.set(this.version);
       }
@@ -263,7 +265,7 @@ public class FunctionRegistryHolder {
    * @return true if jar exists, else false
    */
   public boolean containsJar(String jarName) {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       return jars.containsKey(jarName);
     }
   }
@@ -275,7 +277,7 @@ public class FunctionRegistryHolder {
    * @return quantity of functions
    */
   public int functionsSize() {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       return functions.size();
     }
   }
@@ -291,7 +293,7 @@ public class FunctionRegistryHolder {
    * @return jar name
    */
   public String getJarNameByFunctionSignature(String functionName, String functionSignature) {
-    try (AutoCloseableLock lock = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       for (Map.Entry<String, Map<String, Queue<String>>> jar : jars.entrySet()) {
         Queue<String> functionSignatures = jar.getValue().get(functionName);
         if (functionSignatures != null && functionSignatures.contains(functionSignature)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 38d8fcc..df5e17f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -32,9 +32,9 @@ import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.proto.SchemaUserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.Registry;
-import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -92,7 +92,7 @@ public class RemoteFunctionRegistry implements AutoCloseable {
   private Path stagingArea;
   private Path tmpArea;
 
-  private PersistentStore<Registry> registry;
+  private VersionedPersistentStore<Registry> registry;
   private TransientStore<String> unregistration;
   private TransientStore<String> jars;
 
@@ -192,7 +192,7 @@ public class RemoteFunctionRegistry implements AutoCloseable {
           .name("udf")
           .persist()
           .build();
-      registry = storeProvider.getOrCreateStore(registrationConfig);
+      registry = storeProvider.getOrCreateVersionedStore(registrationConfig);
       registry.putIfAbsent(registry_path, Registry.getDefaultInstance());
     } catch (StoreException e) {
       throw new DrillRuntimeException("Failure while loading remote registry.", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
index 7a2bd04..97e855c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.drill.common.AutoCloseables.Closeable;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -60,7 +61,7 @@ public class CustomHandlerRegistry {
     Preconditions.checkNotNull(handler);
     Preconditions.checkNotNull(requestSerde);
     Preconditions.checkNotNull(responseSerde);
-    try (AutoCloseableLock lock = write.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = write.open()) {
       ParsingHandler<?, ?> parsingHandler = handlers.get(messageTypeId);
       if (parsingHandler != null) {
         throw new IllegalStateException(String.format(
@@ -76,7 +77,7 @@ public class CustomHandlerRegistry {
 
   public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
     final ParsingHandler<?, ?> handler;
-    try (AutoCloseableLock lock = read.open()) {
+    try (@SuppressWarnings("unused") Closeable lock = read.open()) {
       handler = handlers.get(message.getType());
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index 0640407..38309bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
-
 import java.util.Iterator;
 import java.util.Map;
 
@@ -29,24 +27,4 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> {
     return getRange(0, Integer.MAX_VALUE);
   }
 
-  /** By default contains with version will behave the same way as without version.
-   * Override this method to add version support. */
-  public boolean contains(String key, DataChangeVersion version) {
-    return contains(key);
-  }
-
-  /** By default get with version will behave the same way as without version.
-   * Override this method to add version support. */
-  @Override
-  public V get(String key, DataChangeVersion version) {
-    return get(key);
-  }
-
-  /** By default put with version will behave the same way as without version.
-   * Override this method to add version support. */
-  @Override
-  public void put(String key, V value, DataChangeVersion version) {
-    put(key, value);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index 206642a..02959aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
-
 import java.util.Iterator;
 import java.util.Map;
 
@@ -27,11 +25,7 @@ import java.util.Map;
  *
  * @param <V>  value type
  */
-public interface PersistentStore<V> extends AutoCloseable {
-  /**
-   * Returns storage {@link PersistentStoreMode mode} of this store.
-   */
-  PersistentStoreMode getMode();
+public interface PersistentStore<V> extends Store<V> {
 
   /**
    * Checks if lookup key is present in store.
@@ -42,30 +36,12 @@ public interface PersistentStore<V> extends AutoCloseable {
   boolean contains(String key);
 
   /**
-   * Checks if lookup key is present in store.
-   * Sets data change version number.
-   *
-   * @param key lookup key
-   * @param version version holder
-   * @return true if store contains lookup key, false otherwise
-   */
-  boolean contains(String key, DataChangeVersion version);
-
-  /**
    * Returns the value for the given key if exists, null otherwise.
    * @param key  lookup key
    */
   V get(String key);
 
   /**
-   * Returns the value for the given key if exists, null otherwise.
-   * Sets data change version number.
-   * @param key  lookup key
-   * @param version version holder
-   */
-  V get(String key, DataChangeVersion version);
-
-  /**
    * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}.
    *
    * @param key  lookup key
@@ -74,41 +50,6 @@ public interface PersistentStore<V> extends AutoCloseable {
   void put(String key, V value);
 
   /**
-   * Stores the (key, value) tuple in the store.
-   * If tuple already exits, stores it only if versions match,
-   * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
-   * Lifetime of the tuple depends upon store {@link #getMode mode}.
-   *
-   * @param key  lookup key
-   * @param value  value to store
-   * @param version version holder
-   */
-  void put(String key, V value, DataChangeVersion version);
-
-  /**
-   * Removes the value corresponding to the given key if exists, nothing happens otherwise.
-   * @param key  lookup key
-   */
-  void delete(String key);
-
-  /**
-   * Stores the (key, value) tuple in the store only if it does not exists.
-   *
-   * @param key  lookup key
-   * @param value  value to store
-   * @return  true if put takes place, false otherwise.
-   */
-  boolean putIfAbsent(String key, V value);
-
-  /**
-   * Returns an iterator of desired number of entries offsetting by the skip value.
-   *
-   * @param skip  number of records to skip from beginning
-   * @param take  max number of records to return
-   */
-  Iterator<Map.Entry<String, V>> getRange(int skip, int take);
-
-  /**
    * Returns an iterator of entries.
    */
   Iterator<Map.Entry<String, V>> getAll();

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
index 75b89b4..c0f7030 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.sys;
 
 import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
 
 /**
  * A factory used to create {@link PersistentStore store} instances.
@@ -33,7 +34,9 @@ public interface PersistentStoreProvider extends AutoCloseable {
    * @param <V>  store value type
    */
   <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException;
-
+  default <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException {
+    return new VersionedDelegatingStore<>(getOrCreateStore(config));
+  }
 
   /**
    * Sets up the provider.

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
new file mode 100644
index 0000000..c2b1999
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/Store.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A Store interface used to store and retrieve instances of given value type.
+ *
+ * @param <V>  value type
+ */
+public interface Store<V> extends AutoCloseable {
+  /**
+   * Returns storage {@link PersistentStoreMode mode} of this store.
+   */
+  PersistentStoreMode getMode();
+
+  /**
+   * Removes the value corresponding to the given key if exists, nothing happens otherwise.
+   * @param key  lookup key
+   */
+  void delete(String key);
+
+  /**
+   * Stores the (key, value) tuple in the store only if it does not exists.
+   *
+   * @param key  lookup key
+   * @param value  value to store
+   * @return  true if put takes place, false otherwise.
+   */
+  boolean putIfAbsent(String key, V value);
+
+  /**
+   * Returns an iterator of desired number of entries offsetting by the skip value.
+   *
+   * @param skip  number of records to skip from beginning
+   * @param take  max number of records to return
+   */
+  Iterator<Map.Entry<String, V>> getRange(int skip, int take);
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
new file mode 100644
index 0000000..24fa78e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionedPersistentStore.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
+
+/**
+ * Extension to the Store interface that supports versions
+ * @param <V>
+ */
+public interface VersionedPersistentStore<V> extends Store<V> {
+  /**
+   * Checks if lookup key is present in store.
+   * Sets data change version number.
+   *
+   * @param key lookup key
+   * @param version version holder
+   * @return true if store contains lookup key, false otherwise
+   */
+  boolean contains(String key, DataChangeVersion version);
+
+  /**
+   * Returns the value for the given key if exists, null otherwise.
+   * Sets data change version number.
+   * @param key  lookup key
+   * @param version version holder
+   */
+  V get(String key, DataChangeVersion version);
+
+  /**
+   * Stores the (key, value) tuple in the store.
+   * If tuple already exits, stores it only if versions match,
+   * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException}
+   * Lifetime of the tuple depends upon store {@link #getMode mode}.
+   *
+   * @param key  lookup key
+   * @param value  value to store
+   * @param version version holder
+   */
+  void put(String key, V value, DataChangeVersion version);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
index 10da92d..f63c4f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
@@ -19,15 +19,11 @@ package org.apache.drill.exec.store.sys.store;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.drill.common.concurrent.AutoCloseableLock;
-import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreMode;
 
 import com.google.common.collect.Iterables;
@@ -35,26 +31,19 @@ import com.google.common.collect.Iterables;
 public class InMemoryStore<V> extends BasePersistentStore<V> {
   // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryPersistentStore.class);
 
-  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-  private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
-  private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
-  private final ConcurrentSkipListMap<String, V> store;
-  private int version = -1;
+  private final ConcurrentNavigableMap<String, V> store;
   private final int capacity;
   private final AtomicInteger currentSize = new AtomicInteger();
 
   public InMemoryStore(int capacity) {
     this.capacity = capacity;
     //Allows us to trim out the oldest elements to maintain finite max size
-    this.store = new ConcurrentSkipListMap<String, V>();
+    this.store = new ConcurrentSkipListMap<>();
   }
 
   @Override
   public void delete(final String key) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      store.remove(key);
-      version++;
-    }
+    store.remove(key);
   }
 
   @Override
@@ -64,80 +53,36 @@ public class InMemoryStore<V> extends BasePersistentStore<V> {
 
   @Override
   public boolean contains(final String key) {
-    return contains(key, null);
-  }
-
-  @Override
-  public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      if (dataChangeVersion != null) {
-        dataChangeVersion.setVersion(version);
-      }
-      return store.containsKey(key);
-    }
+    return store.containsKey(key);
   }
 
   @Override
   public V get(final String key) {
-    return get(key, null);
-  }
-
-  @Override
-  public V get(final String key, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      if (dataChangeVersion != null) {
-        dataChangeVersion.setVersion(version);
-      }
-      return store.get(key);
-    }
+    return store.get(key);
   }
 
   @Override
   public void put(final String key, final V value) {
-    put(key, value, null);
-  }
-
-  @Override
-  public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
-        throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
-      }
-      store.put(key, value);
-      if (currentSize.incrementAndGet() > capacity) {
-        //Pop Out Oldest
-        store.pollLastEntry();
-        currentSize.decrementAndGet();
-      }
-
-      version++;
+    store.put(key, value);
+    if (currentSize.incrementAndGet() > capacity) {
+      //Pop Out Oldest
+      store.pollLastEntry();
+      currentSize.decrementAndGet();
     }
   }
 
   @Override
   public boolean putIfAbsent(final String key, final V value) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      final V old = store.putIfAbsent(key, value);
-      if (old == null) {
-        version++;
-        return true;
-      }
-      return false;
-    }
+    return (value != store.putIfAbsent(key, value));
   }
 
   @Override
   public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
-    }
+    return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
   }
 
   @Override
-  public void close() throws Exception {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      store.clear();
-      version = -1;
-    }
+  public void close() {
+    store.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index 313a9be..0905c0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -35,9 +35,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.collections.ImmutableEntry;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.exception.VersionMismatchException;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
@@ -59,35 +57,34 @@ import org.slf4j.LoggerFactory;
 public class LocalPersistentStore<V> extends BasePersistentStore<V> {
   private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class);
 
-  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-  private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
-  private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
-
   private final Path basePath;
   private final PersistentStoreConfig<V> config;
   private final DrillFileSystem fs;
-  private int version = -1;
 
   public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) {
-    super();
     this.basePath = new Path(base, config.getName());
     this.config = config;
     this.fs = fs;
-
     try {
-      if (!fs.mkdirs(basePath)) {
-        version++;
-      }
+      mkdirs(getBasePath());
     } catch (IOException e) {
       throw new RuntimeException("Failure setting pstore configuration path.");
     }
   }
 
+  protected Path getBasePath() {
+    return basePath;
+  }
+
   @Override
   public PersistentStoreMode getMode() {
     return PersistentStoreMode.PERSISTENT;
   }
 
+  private void mkdirs(Path path) throws IOException {
+    fs.mkdirs(path);
+  }
+
   public static Path getLogDir() {
     String drillLogDir = System.getenv("DRILL_LOG_DIR");
     if (drillLogDir == null) {
@@ -114,39 +111,37 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
 
   @Override
   public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      try {
-        // list only files with sys file suffix
-        PathFilter sysFileSuffixFilter = new PathFilter() {
-          @Override
-          public boolean accept(Path path) {
-            return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
-          }
-        };
-
-        List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
-        if (fileStatuses.isEmpty()) {
-          return Collections.emptyIterator();
-        }
-
-        List<String> files = Lists.newArrayList();
-        for (FileStatus stat : fileStatuses) {
-          String s = stat.getPath().getName();
-          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+    try {
+      // list only files with sys file suffix
+      PathFilter sysFileSuffixFilter = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX);
         }
+      };
 
-        Collections.sort(files);
+      List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter);
+      if (fileStatuses.isEmpty()) {
+        return Collections.emptyIterator();
+      }
 
-        return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
-          @Nullable
-          @Override
-          public Entry<String, V> apply(String key) {
-            return new ImmutableEntry<>(key, get(key));
-          }
-        }).iterator();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+      List<String> files = Lists.newArrayList();
+      for (FileStatus stat : fileStatuses) {
+        String s = stat.getPath().getName();
+        files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
       }
+
+      Collections.sort(files);
+
+      return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
+        @Nullable
+        @Override
+        public Entry<String, V> apply(String key) {
+          return new ImmutableEntry<>(key, get(key));
+        }
+      }).iterator();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -160,108 +155,68 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
 
   @Override
   public boolean contains(String key) {
-    return contains(key, null);
-  }
-
-  @Override
-  public boolean contains(String key, DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      try {
-        Path path = makePath(key);
-        boolean exists = fs.exists(path);
-        if (exists && dataChangeVersion != null) {
-          dataChangeVersion.setVersion(version);
-        }
-        return exists;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    try {
+      return fs.exists(makePath(key));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
   @Override
   public V get(String key) {
-    return get(key, null);
-  }
-
-  @Override
-  public V get(String key, DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      try {
-        if (dataChangeVersion != null) {
-          dataChangeVersion.setVersion(version);
-        }
-        Path path = makePath(key);
-        if (!fs.exists(path)) {
-          return null;
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      final Path path = makePath(key);
-      try (InputStream is = fs.open(path)) {
-        return config.getSerializer().deserialize(IOUtils.toByteArray(is));
-      } catch (IOException e) {
-        throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+    try {
+      Path path = makePath(key);
+      if (!fs.exists(path)) {
+        return null;
       }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    final Path path = makePath(key);
+    try (InputStream is = fs.open(path)) {
+      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
     }
   }
 
   @Override
   public void put(String key, V value) {
-    put(key, value, null);
-  }
-
-  @Override
-  public void put(String key, V value, DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
-        throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
-      }
-      try (OutputStream os = fs.create(makePath(key))) {
-        IOUtils.write(config.getSerializer().serialize(value), os);
-        version++;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+    try (OutputStream os = fs.create(makePath(key))) {
+      IOUtils.write(config.getSerializer().serialize(value), os);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
   @Override
   public boolean putIfAbsent(String key, V value) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      try {
-        Path p = makePath(key);
-        if (fs.exists(p)) {
-          return false;
-        } else {
-          try (OutputStream os = fs.create(makePath(key))) {
-            IOUtils.write(config.getSerializer().serialize(value), os);
-            version++;
-          }
-          return true;
+    try {
+      Path p = makePath(key);
+      if (fs.exists(p)) {
+        return false;
+      } else {
+        try (OutputStream os = fs.create(makePath(key))) {
+          IOUtils.write(config.getSerializer().serialize(value), os);
         }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+        return true;
       }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
   @Override
   public void delete(String key) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      try {
-        fs.delete(makePath(key), false);
-        version++;
-      } catch (IOException e) {
-        logger.error("Unable to delete data from storage.", e);
-        throw new RuntimeException(e);
-      }
+    try {
+      fs.delete(makePath(key), false);
+    } catch (IOException e) {
+      logger.error("Unable to delete data from storage.", e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
   public void close() {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
new file mode 100644
index 0000000..23eedd9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.AutoCloseables.Closeable;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
+
+/**
+ * Versioned Store that delegates operations to PersistentStore
+ * @param <V>
+ */
+public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V> {
+  private final PersistentStore<V> store;
+  private final ReadWriteLock readWriteLock;
+  private final AutoCloseableLock readLock;
+  private final AutoCloseableLock writeLock;
+  private int version;
+
+  public VersionedDelegatingStore(PersistentStore<V> store) {
+    this.store = store;
+    readWriteLock = new ReentrantReadWriteLock();
+    readLock = new AutoCloseableLock(readWriteLock.readLock());
+    writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+    version = -1;
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return store.getMode();
+  }
+
+  @Override
+  public void delete(final String key) {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+      store.delete(key);
+      version++;
+    }
+  }
+
+  @Override
+  public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+      boolean contains = store.contains(key);
+      dataChangeVersion.setVersion(version);
+      return contains;
+    }
+  }
+
+  @Override
+  public V get(final String key, final DataChangeVersion dataChangeVersion) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+      V value = store.get(key);
+      dataChangeVersion.setVersion(version);
+      return value;
+    }
+  }
+
+  @Override
+  public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+      if (dataChangeVersion.getVersion() != version) {
+        throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
+      }
+      store.put(key, value);
+      version++;
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+      if (store.putIfAbsent(key, value)) {
+        version++;
+        return true;
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
+    try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
+      return store.getRange(skip, take);
+    }
+  }
+
+  @Override
+  public void close() throws Exception
+  {
+    try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
+      store.close();
+      version = -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index a3ee58e..1f20212 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -36,12 +36,13 @@ import org.apache.drill.exec.serialization.InstanceSerializer;
 import org.apache.drill.exec.store.sys.BasePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.zookeeper.CreateMode;
 
 /**
  * Zookeeper based implementation of {@link org.apache.drill.exec.store.sys.PersistentStore}.
  */
-public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
+public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> implements VersionedPersistentStore<V> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperPersistentStore.class);
 
   private final PersistentStoreConfig<V> config;

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
index a5502cb..a2e30f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
@@ -28,7 +28,9 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
 import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
 import org.apache.hadoop.fs.Path;
 
@@ -81,6 +83,24 @@ public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvide
   }
 
   @Override
+  public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(final PersistentStoreConfig<V> config) throws StoreException {
+    switch(config.getMode()){
+      case BLOB_PERSISTENT:
+        return new VersionedDelegatingStore<>(new LocalPersistentStore<>(fs, blobRoot, config));
+      case PERSISTENT:
+        final ZookeeperPersistentStore<V> store = new ZookeeperPersistentStore<>(curator, config);
+        try {
+          store.start();
+        } catch (Exception e) {
+          throw new StoreException("unable to start zookeeper store", e);
+        }
+        return store;
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  @Override
   public void close() throws Exception {
     fs.close();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
index e36dc83..528705a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
@@ -20,30 +20,19 @@ package org.apache.drill.exec.testing.store;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import org.apache.drill.common.concurrent.AutoCloseableLock;
-import org.apache.drill.exec.exception.VersionMismatchException;
+
 import org.apache.drill.exec.store.sys.BasePersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreMode;
-import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 
 public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
-  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-  private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
-  private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
   private final ConcurrentMap<String, V> store = Maps.newConcurrentMap();
-  private int version = -1;
 
   @Override
   public void delete(final String key) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      store.remove(key);
-      version++;
-    }
+    store.remove(key);
   }
 
   @Override
@@ -53,74 +42,35 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
 
   @Override
   public boolean contains(final String key) {
-    return contains(key, null);
-  }
-
-  @Override
-  public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      if (dataChangeVersion != null) {
-        dataChangeVersion.setVersion(version);
-      }
-      return store.containsKey(key);
-    }
+    return store.containsKey(key);
   }
 
   @Override
   public V get(final String key) {
-    return get(key, null);
-  }
-
-  @Override
-  public V get(final String key, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      if (dataChangeVersion != null) {
-        dataChangeVersion.setVersion(version);
-      }
-      return store.get(key);
-    }
+    return store.get(key);
   }
 
   @Override
   public void put(final String key, final V value) {
-    put(key, value, null);
-  }
-
-  @Override
-  public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
-        throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
-      }
-      store.put(key, value);
-      version++;
-    }
+    store.put(key, value);
   }
 
   @Override
   public boolean putIfAbsent(final String key, final V value) {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      final V old = store.putIfAbsent(key, value);
-      if (old == null) {
-        version++;
-        return true;
-      }
-      return false;
+    final V old = store.putIfAbsent(key, value);
+    if (old == null) {
+      return true;
     }
+    return false;
   }
 
   @Override
   public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
-    try (AutoCloseableLock lock = readLock.open()) {
-      return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
-    }
+    return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
   }
 
   @Override
-  public void close() throws Exception {
-    try (AutoCloseableLock lock = writeLock.open()) {
-      store.clear();
-      version = -1;
-    }
+  public void close() {
+    store.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index a516fad..5182093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -86,7 +86,7 @@ public class IncomingBuffers implements AutoCloseable {
 
     // we want to make sure that we only generate local record batch reference in the case that we're not closed.
     // Otherwise we would leak memory.
-    try (AutoCloseableLock lock = sharedIncomingBatchLock.open()) {
+    try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = sharedIncomingBatchLock.open()) {
       if (closed) {
         return false;
       }
@@ -135,7 +135,7 @@ public class IncomingBuffers implements AutoCloseable {
 
   @Override
   public void close() throws Exception {
-    try (AutoCloseableLock lock = exclusiveCloseLock.open()) {
+    try (@SuppressWarnings("unused") AutoCloseables.Closeable lock = exclusiveCloseLock.open()) {
       closed = true;
       AutoCloseables.close(collectorMap.values());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index 8896fb0..73ddfe0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.coord.zk.PathUtils;
 import org.apache.drill.exec.coord.zk.ZookeeperClient;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.PersistedOptionValue;
+import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
 import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
 import org.apache.drill.test.BaseDirTestWatcher;
@@ -45,6 +46,8 @@ import org.junit.experimental.categories.Category;
 
 import java.io.File;
 
+import static org.junit.Assert.assertTrue;
+
 @Category({SlowTest.class})
 public class TestPStoreProviders extends TestWithZookeeper {
   @Rule
@@ -133,8 +136,9 @@ public class TestPStoreProviders extends TestWithZookeeper {
       try (ZookeeperPersistentStoreProvider provider =
         new ZookeeperPersistentStoreProvider(zkHelper.getConfig(), curator)) {
         PersistentStore<PersistedOptionValue> store = provider.getOrCreateStore(storeConfig);
+        assertTrue(store instanceof ZookeeperPersistentStore);
 
-        PersistedOptionValue oldOptionValue = store.get(oldName, null);
+        PersistedOptionValue oldOptionValue = ((ZookeeperPersistentStore<PersistedOptionValue>)store).get(oldName, null);
         PersistedOptionValue expectedValue = new PersistedOptionValue("true");
 
         Assert.assertEquals(expectedValue, oldOptionValue);

http://git-wip-us.apache.org/repos/asf/drill/blob/590a72bc/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index 3b5967f..e9f35cc 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.drill.common.AutoCloseables.Closeable;
 import org.apache.drill.common.HistoricalLog;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.memory.BaseAllocator.Verbosity;
@@ -109,7 +110,7 @@ public class AllocationManager {
           "A buffer can only be associated between two allocators that share the same root.");
     }
 
-    try (AutoCloseableLock read = readLock.open()) {
+    try (@SuppressWarnings("unused") Closeable read = readLock.open()) {
 
       final BufferLedger ledger = map.get(allocator);
       if (ledger != null) {
@@ -119,7 +120,7 @@ public class AllocationManager {
         return ledger;
       }
     }
-    try (AutoCloseableLock write = writeLock.open()) {
+    try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
       // we have to recheck existing ledger since a second reader => writer could be competing with us.
 
       final BufferLedger existingLedger = map.get(allocator);
@@ -242,7 +243,7 @@ public class AllocationManager {
 
       // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
       // that this won't happen by synchronizing on the allocator manager instance.
-      try (AutoCloseableLock write = writeLock.open()) {
+      try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
         if (owningLedger != this) {
           return true;
         }
@@ -320,7 +321,7 @@ public class AllocationManager {
       allocator.assertOpen();
 
       final int outcome;
-      try (AutoCloseableLock write = writeLock.open()) {
+      try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
         outcome = bufRefCnt.addAndGet(-decrement);
         if (outcome == 0) {
           lDestructionTime = System.nanoTime();
@@ -424,7 +425,7 @@ public class AllocationManager {
      * @return Amount of accounted(owned) memory associated with this ledger.
      */
     public int getAccountedSize() {
-      try (AutoCloseableLock read = readLock.open()) {
+      try (@SuppressWarnings("unused") Closeable read = readLock.open()) {
         if (owningLedger == this) {
           return size;
         } else {


Mime
View raw message