drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [1/4] drill git commit: DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism
Date Fri, 19 Feb 2016 05:29:03 GMT
Repository: drill
Updated Branches:
  refs/heads/master 9a3a5c4ff -> 8126927fd


http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
deleted file mode 100644
index 0d2fb38..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store)
- * @param <V>
- */
-public abstract class ZkAbstractStore<V> implements AutoCloseable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
-
-  protected final CuratorFramework framework;
-  protected final PStoreConfig<V> config;
-  private final PathChildrenCache childrenCache;
-  private final String prefix;
-  private final String parent;
-
-  public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config)
-      throws IOException {
-    this.parent = "/" + config.getName();
-    this.prefix = parent + "/";
-    this.framework = framework;
-    this.config = config;
-    this.childrenCache = new PathChildrenCache(framework, parent, true);
-
-    // make sure the parent node exists.
-    createOrUpdate(parent, null, CreateMode.PERSISTENT);
-    try {
-      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while initializing Zookeeper for PStore", e);
-    }
-  }
-
-  public Iterator<Entry<String, V>> iterator() {
-    try {
-      return new Iter(childrenCache.getCurrentData());
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper.", e);
-    }
-  }
-
-  protected String withPrefix(String key) {
-    Preconditions.checkArgument(!key.contains("/"),
-        "You cannot use keys that have slashes in them when using the Zookeeper SystemTable storage interface.");
-    return prefix + key;
-  }
-
-  public V get(String key) {
-    try {
-      ChildData d = childrenCache.getCurrentData(withPrefix(key));
-      if(d == null || d.getData() == null){
-        return null;
-      }
-      byte[] bytes = d.getData();
-      return config.getSerializer().deserialize(bytes);
-
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
-    }
-  }
-
-  public void put(String key, V value) {
-    try {
-      if (childrenCache.getCurrentData(withPrefix(key)) != null) {
-        framework.setData().forPath(withPrefix(key), config.getSerializer().serialize(value));
-      } else {
-        createWithPrefix(key, value);
-      }
-      childrenCache.rebuildNode(withPrefix(key));
-
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
-    }
-  }
-
-  public void delete(String key) {
-    try {
-      if (framework.checkExists().forPath(withPrefix(key)) != null) {
-        framework.delete().forPath(withPrefix(key));
-        childrenCache.rebuildNode(withPrefix(key));
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(), e);
-    }
-  }
-
-  public boolean putIfAbsent(String key, V value) {
-    try {
-      if (childrenCache.getCurrentData(withPrefix(key)) != null) {
-        return false;
-      } else {
-        createWithPrefix(key, value);
-        childrenCache.rebuildNode(withPrefix(key));
-        return true;
-      }
-
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
-  }
-
-  /**
-   * Default {@link CreateMode create mode} that will be used in create operations referred in the see also section.
-   *
-   * @see #createOrUpdate(String, Object)
-   * @see #createWithPrefix(String, Object)
-   */
-  protected abstract CreateMode getCreateMode();
-
-
-  /**
-   * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied.
-   *
-   * @param path    target path
-   * @param value   value to set, null if none available
-   *
-   * @see #getCreateMode()
-   * @see #createOrUpdate(String, Object)
-   * @see #withPrefix(String)
-   */
-  protected void createWithPrefix(String path, V value) {
-    createOrUpdate(withPrefix(path), value);
-  }
-
-  /**
-   * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and sets its value if supplied
-   * or updates its value if the node already exists.
-   *
-   * Note that if node exists, its mode will not be changed.
-   *
-   * @param path    target path
-   * @param value   value to set, null if none available
-   *
-   * @see #getCreateMode()
-   * @see #createOrUpdate(String, Object, CreateMode)
-   */
-  protected void createOrUpdate(String path, V value) {
-    createOrUpdate(path, value, getCreateMode());
-  }
-
-  /**
-   * Creates a node in zookeeper with the given mode and sets its value if supplied or updates its value if the node
-   * already exists.
-   *
-   * Note that if the node exists, its mode will not be changed.
-   *
-   * Internally, the method suppresses {@link org.apache.zookeeper.KeeperException.NodeExistsException}. It is
-   * safe to do so since the implementation is idempotent.
-   *
-   * @param path    target path
-   * @param value   value to set, null if none available
-   * @param mode    creation mode
-   * @throws RuntimeException  throws a {@link RuntimeException} wrapping the root cause.
-   */
-  protected void createOrUpdate(String path, V value, CreateMode mode) {
-    try {
-      final boolean isUpdate = value != null;
-      final byte[] valueInBytes = isUpdate ? config.getSerializer().serialize(value) : null;
-      final boolean nodeExists = framework.checkExists().forPath(path) != null;
-      if (!nodeExists) {
-        final ACLBackgroundPathAndBytesable<String> creator = framework.create().withMode(mode);
-        if (isUpdate) {
-          creator.forPath(path, valueInBytes);
-        } else {
-          creator.forPath(path);
-        }
-      } else if (isUpdate) {
-        framework.setData().forPath(path, valueInBytes);
-      }
-    } catch (KeeperException.NodeExistsException ex) {
-      logger.warn("Node already exists in Zookeeper. Skipping... -- [path: {}, mode: {}]", path, mode);
-    } catch (Exception e) {
-      final String msg = String.format("Failed to create/update Zookeeper node. [path: %s, mode: %s]", path, mode);
-      throw new RuntimeException(msg, e);
-    }
-  }
-
-  private class Iter implements Iterator<Entry<String, V>>{
-
-    private Iterator<ChildData> keys;
-    private ChildData current;
-
-    public Iter(List<ChildData> children) {
-      super();
-      List<ChildData> sortedChildren = Lists.newArrayList(children);
-      Collections.sort(sortedChildren, new Comparator<ChildData>(){
-        @Override
-        public int compare(ChildData o1, ChildData o2) {
-          return o1.getPath().compareTo(o2.getPath());
-        }});
-      this.keys = sortedChildren.iterator();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return keys.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      current = keys.next();
-      return new DeferredEntry(current);
-    }
-
-    @Override
-    public void remove() {
-      delete(keyFromPath(current));
-    }
-
-    private String keyFromPath(ChildData data){
-      String path = data.getPath();
-      return path.substring(prefix.length(), path.length());
-    }
-
-    private class DeferredEntry implements Entry<String, V>{
-
-      private ChildData data;
-
-      public DeferredEntry(ChildData data) {
-        super();
-        this.data = data;
-      }
-
-      @Override
-      public String getKey() {
-        return keyFromPath(data);
-      }
-
-      @Override
-      public V getValue() {
-        try {
-          return config.getSerializer().deserialize(data.getData());
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      public V setValue(V value) {
-        throw new UnsupportedOperationException();
-      }
-
-    }
-
-  }
-
-  @Override
-  public void close() {
-    try{
-      childrenCache.close();
-    }catch(IOException e){
-      logger.warn("Failure while closing out abstract store.", e);
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
deleted file mode 100644
index 4706287..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.zookeeper.CreateMode;
-
-/**
- * Implementation of EStore using Zookeeper's EPHEMERAL node.
- * @param <V>
- */
-public class ZkEStore<V> extends ZkAbstractStore<V> implements EStore<V> {
-
-  public ZkEStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
-    super(framework,config);
-  }
-
-  @Override
-  protected CreateMode getCreateMode() {
-    return CreateMode.EPHEMERAL;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
deleted file mode 100644
index 60277aa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.EStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-
-public class ZkEStoreProvider implements EStoreProvider{
-  private final CuratorFramework curator;
-
-  public ZkEStoreProvider(CuratorFramework curator) {
-    this.curator = curator;
-  }
-
-  @Override
-  public <V> EStore<V> getStore(PStoreConfig<V> store) throws IOException {
-    Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL);
-    return new ZkEStore<V>(curator,store);
-  }
-
-  @Override
-  public void start() throws IOException {
-  }
-
-  @Override
-  public void close() throws Exception {
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
deleted file mode 100644
index da22996..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.zk;
-
-import java.io.IOException;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.zookeeper.CreateMode;
-
-/**
- * Implementation of PStore using Zookeeper's PERSISTENT node.
- * @param <V>
- */
-public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V> {
-
-  public ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException {
-    super(framework, config);
-  }
-
-  @Override
-  protected CreateMode getCreateMode() {
-    return CreateMode.PERSISTENT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index eb5df43..8cc2fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,83 +17,19 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
-import java.io.IOException;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.sys.EStoreProvider;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.drill.exec.store.sys.local.FilePStore;
-import org.apache.hadoop.fs.Path;
-
-public class ZkPStoreProvider implements PStoreProvider {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
-
-  private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot";
-
-  private final CuratorFramework curator;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
 
-  private final DrillFileSystem fs;
-  private final Path blobRoot;
-  private final EStoreProvider zkEStoreProvider;
-
-  public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException {
-    ClusterCoordinator coord = registry.getClusterCoordinator();
-    if (!(coord instanceof ZKClusterCoordinator)) {
-      throw new DrillbitStartupException("A ZkPStoreProvider was created without a ZKClusterCoordinator.");
-    }
-    this.curator = ((ZKClusterCoordinator)registry.getClusterCoordinator()).getCurator();
-
-    if (registry.getConfig().hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) {
-      blobRoot = new Path(registry.getConfig().getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
-    }else{
-      blobRoot = FilePStore.getLogDir();
-    }
-
-    try{
-      this.fs = FilePStore.getFileSystem(registry.getConfig(), blobRoot);
-    }catch(IOException e){
-      throw new DrillbitStartupException("Failure while attempting to set up blob store.", e);
-    }
-
-    this.zkEStoreProvider = new ZkEStoreProvider(curator);
-  }
-
-  @VisibleForTesting
-  public ZkPStoreProvider(DrillConfig config, CuratorFramework curator) throws IOException {
-    this.curator = curator;
-    this.blobRoot = FilePStore.getLogDir();
-    this.fs = FilePStore.getFileSystem(config, blobRoot);
-    this.zkEStoreProvider = new ZkEStoreProvider(curator);
-  }
-
-  @Override
-  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
-    switch(config.getMode()){
-    case BLOB_PERSISTENT:
-      return new FilePStore<V>(fs, blobRoot, config);
-    case EPHEMERAL:
-      return zkEStoreProvider.getStore(config);
-    case PERSISTENT:
-      return new ZkPStore<V>(curator, config);
-    default:
-      throw new IllegalStateException();
-    }
-  }
-
-  @Override
-  public void start() {
-  }
-
-  @Override
-  public void close() {
+/**
+ * Kept for possible references to old class name in configuration.
+ *
+ * @deprecated will be removed in 1.7
+ *    use {@link ZookeeperPersistentStoreProvider} instead.
+ */
+public class ZkPStoreProvider extends ZookeeperPersistentStoreProvider {
+  public ZkPStoreProvider(PersistentStoreRegistry<ZKClusterCoordinator> registry) throws StoreException {
+    super(registry);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
new file mode 100644
index 0000000..ff14f6d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing.store;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+
+public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
+  private final ConcurrentMap<String, V> store = Maps.newConcurrentMap();
+
+  public void delete(final String key) {
+    store.remove(key);
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return PersistentStoreMode.PERSISTENT;
+  }
+
+  @Override
+  public V get(final String key) {
+    return store.get(key);
+  }
+
+  @Override
+  public void put(final String key, final V value) {
+    store.put(key, value);
+  }
+
+  @Override
+  public boolean putIfAbsent(final String key, final V value) {
+    final V old = store.putIfAbsent(key, value);
+    return value != old;
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
+    return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
+  }
+
+  @Override
+  public void close() throws Exception {
+    store.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 5fd6f1c..f2305c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.QueryManager;
@@ -101,7 +101,7 @@ public class WorkManager implements AutoCloseable {
       final Controller controller,
       final DataConnectionCreator data,
       final ClusterCoordinator coord,
-      final PStoreProvider provider) {
+      final PersistentStoreProvider provider) {
     dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider);
     statusThread.start();
 
@@ -153,6 +153,8 @@ public class WorkManager implements AutoCloseable {
         }
       }
     }
+
+    getContext().close();
   }
 
   public DrillbitContext getContext() {

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index bfc9dff..8527850 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -163,8 +163,8 @@ public class Foreman implements Runnable {
     closeFuture.addListener(closeListener);
 
     queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
-    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
-        stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
+    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
+        drillbitContext.getClusterCoordinator(), stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
 
     final OptionManager optionManager = queryContext.getOptions();
     queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
@@ -768,6 +768,12 @@ public class Foreman implements Runnable {
       bee.retireForeman(Foreman.this);
 
       try {
+        queryManager.close();
+      } catch (final Exception e) {
+        logger.warn("unable to close query manager", e);
+      }
+
+      try {
         releaseLease();
       } finally {
         isClosed = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 060cffc..39fa5cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -19,15 +19,18 @@ package org.apache.drill.exec.work.foreman;
 
 import io.netty.buffer.ByteBuf;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -45,9 +48,9 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 
@@ -59,20 +62,18 @@ import com.google.common.collect.Maps;
 /**
  * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
  */
-public class QueryManager {
+public class QueryManager implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
 
-  public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
+  public static final PersistentStoreConfig<QueryProfile> QUERY_PROFILE = PersistentStoreConfig.
           newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
       .name("profiles")
       .blob()
-      .max(100)
       .build();
 
-  public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
-          newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+  public static final TransientStoreConfig<QueryInfo> RUNNING_QUERY_INFO = TransientStoreConfig
+      .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
       .name("running")
-      .ephemeral()
       .build();
 
   private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
@@ -90,8 +91,8 @@ public class QueryManager {
       new IntObjectHashMap<>();
   private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
 
-  private final PStore<QueryProfile> profilePStore;
-  private final PStore<QueryInfo> profileEStore;
+  private final PersistentStore<QueryProfile> profileStore;
+  private final TransientStore<QueryInfo> transientProfiles;
 
   // the following mutable variables are used to capture ongoing query status
   private String planText;
@@ -104,8 +105,8 @@ public class QueryManager {
   // How many fragments have finished their execution.
   private final AtomicInteger finishedFragments = new AtomicInteger(0);
 
-  public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
-      final StateListener stateListener, final Foreman foreman) {
+  public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
+      final ClusterCoordinator coordinator, final StateListener stateListener, final Foreman foreman) {
     this.queryId =  queryId;
     this.runQuery = runQuery;
     this.stateListener = stateListener;
@@ -113,11 +114,11 @@ public class QueryManager {
 
     stringQueryId = QueryIdHelper.getQueryId(queryId);
     try {
-      profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
-      profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
-    } catch (final IOException e) {
+      profileStore = storeProvider.getOrCreateStore(QUERY_PROFILE);
+    } catch (final Exception e) {
       throw new DrillRuntimeException(e);
     }
+    transientProfiles = coordinator.getOrCreateTransientStore(RUNNING_QUERY_INFO);
   }
 
   private static boolean isTerminal(final FragmentState state) {
@@ -237,11 +238,14 @@ public class QueryManager {
     }
   }
 
+  @Override
+  public void close() throws Exception { }
+
   /*
-   * This assumes that the FragmentStatusListener implementation takes action when it hears
-   * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
-   * but log messages.
-   */
+     * This assumes that the FragmentStatusListener implementation takes action when it hears
+     * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
+     * but log messages.
+     */
   private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
     /**
      * An enum of possible signals that {@link SignalListener} listens to.
@@ -281,14 +285,14 @@ public class QueryManager {
       case STARTING:
       case RUNNING:
       case CANCELLATION_REQUESTED:
-        profileEStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
+        transientProfiles.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
         break;
 
       case COMPLETED:
       case CANCELED:
       case FAILED:
         try {
-          profileEStore.delete(stringQueryId);
+          transientProfiles.remove(stringQueryId);
         } catch(final Exception e) {
           logger.warn("Failure while trying to delete the estore profile for this query.", e);
         }
@@ -305,7 +309,7 @@ public class QueryManager {
   void writeFinalProfile(UserException ex) {
     try {
       // TODO(DRILL-2362) when do these ever get deleted?
-      profilePStore.put(stringQueryId, getQueryProfile(ex));
+      profileStore.put(stringQueryId, getQueryProfile(ex));
     } catch (Exception e) {
       logger.error("Failure while storing Query Profile", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index d6ba99a..b1b9b46 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -120,7 +120,7 @@ drill.exec: {
     affinity.factor: 1.2
   },
   sys.store.provider: {
-    class: "org.apache.drill.exec.store.sys.zk.ZkPStoreProvider",
+    class: "org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider",
     local: {
       path: "/tmp/drill",
       write: true

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 26b7464..7ab73dc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -248,7 +248,7 @@ public class BaseTestQuery extends ExecTest {
   }
 
   @AfterClass
-  public static void closeClient() throws IOException {
+  public static void closeClient() throws Exception {
     if (client != null) {
       client.close();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 24c8c63..9f39d15 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -48,7 +48,7 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.StoragePluginRegistryImpl;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.junit.Rule;
 import org.junit.rules.TestRule;
@@ -76,7 +76,7 @@ public class PlanningBase extends ExecTest{
 
   protected void testSqlPlan(String sqlCommands) throws Exception {
     final String[] sqlStrings = sqlCommands.split(";");
-    final LocalPStoreProvider provider = new LocalPStoreProvider(config);
+    final LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(config);
     provider.start();
     final ScanResult scanResult = ClassPathScanner.fromPrescan(config);
     final LogicalPlanPersistence logicalPlanPersistence = new LogicalPlanPersistence(config, scanResult);
@@ -97,7 +97,7 @@ public class PlanningBase extends ExecTest{
         result = config;
         dbContext.getOptionManager();
         result = systemOptions;
-        dbContext.getPersistentStoreProvider();
+        dbContext.getStoreProvider();
         result = provider;
         dbContext.getClasspathScan();
         result = scanResult;

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java
index 9032946..f4903d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/CodeCompilerTestFactory.java
@@ -19,19 +19,17 @@ package org.apache.drill.exec.compile;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.IOException;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.exec.server.options.SystemOptionManager;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 
 public class CodeCompilerTestFactory {
-  public static CodeCompiler getTestCompiler(DrillConfig c) throws IOException {
+  public static CodeCompiler getTestCompiler(DrillConfig c) throws Exception {
     DrillConfig config = checkNotNull(c);
     LogicalPlanPersistence persistence = new LogicalPlanPersistence(config, ClassPathScanner.fromPrescan(config));
-    LocalPStoreProvider provider = new LocalPStoreProvider(config);
+    LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(config);
     SystemOptionManager systemOptionManager = new SystemOptionManager(persistence, provider);
     return new CodeCompiler(config, systemOptionManager.init());
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java
index 8486801..9b507fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/bytecode/ReplaceMethodInvoke.java
@@ -25,10 +25,9 @@ import java.net.URL;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.compile.DrillCheckClassAdapter;
 import org.apache.drill.exec.compile.QueryClassLoader;
-import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.server.options.SystemOptionManager;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.ClassVisitor;
 import org.objectweb.asm.ClassWriter;
@@ -57,7 +56,7 @@ public class ReplaceMethodInvoke {
     check(output);
 
     final DrillConfig c = DrillConfig.forClient();
-    final SystemOptionManager m = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), new LocalPStoreProvider(c));
+    final SystemOptionManager m = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), new LocalPersistentStoreProvider(c));
     m.init();
     try (QueryClassLoader ql = new QueryClassLoader(DrillConfig.create(), m)) {
       ql.injectByteCode("org.apache.drill.Pickle$OutgoingBatch", output);

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java
new file mode 100644
index 0000000..021a0b7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEphemeralStore.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.io.IOException;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.listen.ListenerContainer;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestEphemeralStore {
+  private final static String root = "/test";
+  private final static String path = "test-key";
+  private final static String value = "testing";
+
+  private TestingServer server;
+  private CuratorFramework curator;
+  private TransientStoreConfig<String> config;
+  private ZkEphemeralStore<String> store;
+
+  static class StoreWithMockClient<V> extends ZkEphemeralStore<V> {
+    private final ZookeeperClient client = Mockito.mock(ZookeeperClient.class);
+
+    public StoreWithMockClient(final TransientStoreConfig<V> config, final CuratorFramework curator) {
+      super(config, curator);
+    }
+
+    @Override
+    protected ZookeeperClient getClient() {
+      return client;
+    }
+  }
+
+
+  @Before
+  public void setUp() throws Exception {
+    server = new TestingServer();
+    final RetryPolicy policy = new RetryNTimes(2, 1000);
+    curator = CuratorFrameworkFactory.newClient(server.getConnectString(), policy);
+
+    config = Mockito.mock(TransientStoreConfig.class);
+    Mockito
+        .when(config.getName())
+        .thenReturn(root);
+
+    Mockito
+        .when(config.getSerializer())
+        .thenReturn(new InstanceSerializer<String>() {
+          @Override
+          public byte[] serialize(final String instance) throws IOException {
+            if (instance == null) {
+              return null;
+            }
+            return instance.getBytes();
+          }
+
+          @Override
+          public String deserialize(final byte[] raw) throws IOException {
+            if (raw == null) {
+              return null;
+            }
+            return new String(raw);
+          }
+        });
+
+    store = new ZkEphemeralStore<>(config, curator);
+
+    server.start();
+    curator.start();
+    store.start();
+  }
+
+  /**
+   * This test ensures store subscribes to receive events from underlying client. Dispatcher tests ensures listeners
+   * are fired on incoming events. These two sets of tests ensure observer pattern in {@code TransientStore} works fine.
+   */
+  @Test
+  public void testStoreRegistersDispatcherAndStartsItsClient() throws Exception {
+    final StoreWithMockClient<String> store = new StoreWithMockClient<>(config, curator);
+
+    final PathChildrenCache cache = Mockito.mock(PathChildrenCache.class);
+    final ZookeeperClient client = store.getClient();
+    Mockito
+        .when(client.getCache())
+        .thenReturn(cache);
+
+    final ListenerContainer<PathChildrenCacheListener> container = Mockito.mock(ListenerContainer.class);
+    Mockito
+        .when(cache.getListenable())
+        .thenReturn(container);
+
+    store.start();
+
+    Mockito
+        .verify(container)
+        .addListener(store.dispatcher);
+
+    Mockito
+        .verify(client)
+        .start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.close();
+    curator.close();
+    server.close();
+  }
+
+  @Test
+  public void testPutAndGetWorksAntagonistacally() {
+    store.put(path, value);
+    final String actual = store.get(path);
+    Assert.assertEquals("value mismatch", value, actual);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java
new file mode 100644
index 0000000..f83ec00
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestEventDispatcher.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestEventDispatcher {
+
+  private final static String key = "some-key";
+  private final static String value = "some-data";
+  private final static byte[] data = "some-data".getBytes();
+
+  private ZkEphemeralStore<String> store;
+  private EventDispatcher<String> dispatcher;
+  private ChildData child;
+
+  @Before
+  public void setUp() {
+    store = Mockito.mock(ZkEphemeralStore.class);
+    final TransientStoreConfig<String> config = Mockito.mock(TransientStoreConfig.class);
+    Mockito
+        .when(store.getConfig())
+        .thenReturn(config);
+
+    Mockito
+        .when(config.getSerializer())
+        .thenReturn(new InstanceSerializer<String>() {
+          @Override
+          public byte[] serialize(String instance) throws IOException {
+            return instance.getBytes();
+          }
+
+          @Override
+          public String deserialize(byte[] raw) throws IOException {
+            return new String(raw);
+          }
+        });
+
+    dispatcher = new EventDispatcher<>(store);
+    child = Mockito.mock(ChildData.class);
+    Mockito
+        .when(child.getPath())
+        .thenReturn(key);
+
+    Mockito
+        .when(child.getData())
+        .thenReturn(data);
+  }
+
+  @Test
+  public void testDispatcherPropagatesEvents() throws Exception {
+    final PathChildrenCacheEvent.Type[] types = new PathChildrenCacheEvent.Type[] {
+        PathChildrenCacheEvent.Type.CHILD_ADDED,
+        PathChildrenCacheEvent.Type.CHILD_REMOVED,
+        PathChildrenCacheEvent.Type.CHILD_UPDATED
+    };
+
+    for (final PathChildrenCacheEvent.Type type:types) {
+      dispatcher.childEvent(null, new PathChildrenCacheEvent(type, child));
+
+      final TransientStoreEvent event = TransientStoreEvent.of(EventDispatcher.MAPPINGS.get(type), key, value);
+      Mockito
+          .verify(store)
+          .fireListeners(event);
+    }
+
+    Assert.assertEquals("Number of event types that dispatcher can handle is different", types.length, EventDispatcher.MAPPINGS.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java
new file mode 100644
index 0000000..d966744
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestPathUtils.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPathUtils {
+
+  @Test(expected = NullPointerException.class)
+  public void testNullSegmentThrowsNPE() {
+    PathUtils.join("", null, "");
+  }
+
+  @Test
+  public void testJoinPreservesAbsoluteOrRelativePaths() {
+    final String actual = PathUtils.join("/a", "/b", "/c");
+    final String expected = "/a/b/c";
+    Assert.assertEquals("invalid path", expected, actual);
+
+    final String actual2 = PathUtils.join("/a", "b", "c");
+    final String expected2 = "/a/b/c";
+    Assert.assertEquals("invalid path", expected2, actual2);
+
+    final String actual3 = PathUtils.join("a", "b", "c");
+    final String expected3 = "a/b/c";
+    Assert.assertEquals("invalid path", expected3, actual3);
+
+    final String actual4 = PathUtils.join("a", "", "c");
+    final String expected4 = "a/c";
+    Assert.assertEquals("invalid path", expected4, actual4);
+
+    final String actual5 = PathUtils.join("", "", "c");
+    final String expected5 = "c";
+    Assert.assertEquals("invalid path", expected5, actual5);
+
+    final String actual6 = PathUtils.join("", "", "");
+    final String expected6 = "";
+    Assert.assertEquals("invalid path", expected6, actual6);
+
+    final String actual7 = PathUtils.join("", "", "/");
+    final String expected7 = "/";
+    Assert.assertEquals("invalid path", expected7, actual7);
+
+    final String actual8 = PathUtils.join("", "", "c/");
+    final String expected8 = "c/";
+    Assert.assertEquals("invalid path", expected8, actual8);
+  }
+
+
+  @Test
+  public void testNormalizeRemovesRedundantForwardSlashes() {
+    final String actual = PathUtils.normalize("/a/b/c");
+    final String expected = "/a/b/c";
+    Assert.assertEquals("invalid path", expected, actual);
+
+    final String actual2 = PathUtils.normalize("//a//b//c");
+    final String expected2 = "/a/b/c";
+    Assert.assertEquals("invalid path", expected2, actual2);
+
+    final String actual3 = PathUtils.normalize("///");
+    final String expected3 = "/";
+    Assert.assertEquals("invalid path", expected3, actual3);
+
+    final String actual4 = PathUtils.normalize("/a");
+    final String expected4 = "/a";
+    Assert.assertEquals("invalid path", expected4, actual4);
+
+    final String actual5 = PathUtils.normalize("//////");
+    final String expected5 = "/";
+    Assert.assertEquals("invalid path", expected5, actual5);
+
+    final String actual6 = PathUtils.normalize("");
+    final String expected6 = "";
+    Assert.assertEquals("invalid path", expected6, actual6);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
new file mode 100644
index 0000000..3007566
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.zookeeper.CreateMode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestZookeeperClient {
+  private final static String root = "/test";
+  private final static String path = "test-key";
+  private final static String abspath = PathUtils.join(root, path);
+  private final static byte[] data = "testing".getBytes();
+  private final static CreateMode mode = CreateMode.PERSISTENT;
+
+  private TestingServer server;
+  private CuratorFramework curator;
+  private ZookeeperClient client;
+
+  static class ClientWithMockCache extends ZookeeperClient {
+    private final PathChildrenCache cacheMock = Mockito.mock(PathChildrenCache.class);
+
+    public ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
+      super(curator, root, mode);
+    }
+
+    @Override
+    public PathChildrenCache getCache() {
+      return cacheMock;
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    server = new TestingServer();
+    final RetryPolicy policy = new RetryNTimes(1, 1000);
+    curator = CuratorFrameworkFactory.newClient(server.getConnectString(), policy);
+    client = new ClientWithMockCache(curator, root, mode);
+
+    server.start();
+    curator.start();
+    client.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    client.close();
+    curator.close();
+    server.close();
+  }
+
+  @Test
+  public void testStartingClientEnablesCacheAndEnsuresRootNodeExists() throws Exception {
+    Assert.assertTrue("start must create the root node", client.hasPath("", true));
+
+    Mockito
+        .verify(client.getCache())
+        .start();
+  }
+
+  @Test
+  public void testHasPathWithEventualConsistencyHitsCache() {
+    final String path = "test-key";
+    final String absPath = PathUtils.join(root, path);
+
+    Mockito
+        .when(client.getCache().getCurrentData(absPath))
+        .thenReturn(null);
+
+    Assert.assertFalse(client.hasPath(path)); // test convenience method
+
+    Mockito
+        .when(client.getCache().getCurrentData(absPath))
+        .thenReturn(new ChildData(absPath, null, null));
+
+    Assert.assertTrue(client.hasPath(path, false));    // test actual method
+  }
+
+  @Test(expected = DrillRuntimeException.class)
+  public void testHasPathThrowsDrillRuntimeException() {
+    final String path = "test-key";
+    final String absPath = PathUtils.join(root, path);
+
+    Mockito
+        .when(client.getCache().getCurrentData(absPath))
+        .thenThrow(Exception.class);
+
+    client.hasPath(path);
+  }
+
+  @Test
+  public void testPutAndGetWorks() {
+    client.put(path, data);
+    final byte[] actual = client.get(path, true);
+    Assert.assertArrayEquals("data mismatch", data, actual);
+  }
+
+  @Test
+  public void testGetWithEventualConsistencyHitsCache() {
+    Mockito
+        .when(client.getCache().getCurrentData(abspath))
+        .thenReturn(null);
+
+    Assert.assertEquals("get should return null", null, client.get(path));
+
+    Mockito
+        .when(client.getCache().getCurrentData(abspath))
+        .thenReturn(new ChildData(abspath, null, data));
+
+    Assert.assertEquals("get should return data", data, client.get(path, false));
+  }
+
+  @Test
+  public void testCreate() throws Exception {
+    client.create(path);
+    Assert.assertTrue("path must exist", client.hasPath(path, true));
+
+    // ensure invoking create also rebuilds cache
+    Mockito
+        .verify(client.getCache(), Mockito.times(1))
+        .rebuildNode(abspath);
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    client.create(path);
+    Assert.assertTrue("path must exist", client.hasPath(path, true));
+    client.delete(path);
+    Assert.assertFalse("path must not exist", client.hasPath(path, true));
+
+    // ensure cache is rebuilt
+    Mockito
+        .verify(client.getCache(), Mockito.times(2))
+        .rebuildNode(abspath);
+  }
+
+
+  @Test
+  public void testEntriesReturnsRelativePaths() throws Exception {
+    final ChildData child = Mockito.mock(ChildData.class);
+    Mockito
+        .when(child.getPath())
+        .thenReturn(abspath);
+
+    Mockito
+        .when(child.getData())
+        .thenReturn(data);
+
+    final List<ChildData> children = Lists.newArrayList(child);
+    Mockito
+        .when(client.getCache().getCurrentData())
+        .thenReturn(children);
+
+    final Iterator<Map.Entry<String, byte[]>> entries = client.entries();
+
+    // returned entry must contain the given relative path
+    final Map.Entry<String, byte[]> expected = new ImmutableEntry<>(path, data);
+
+    Assert.assertEquals("entries do not match", expected, entries.next());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
index 29aaf3a..bf56eb6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
@@ -55,7 +55,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.StoragePluginRegistryImpl;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.junit.Ignore;
@@ -112,7 +112,7 @@ public class TestOptiqPlans extends ExecTest {
         controller,
         com,
         workBus,
-        new LocalPStoreProvider(config));
+        new LocalPersistentStoreProvider(config));
     final QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(),
         bitContext, QueryId.getDefaultInstance());
     final PhysicalPlanReader reader = bitContext.getPlanReader();

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
index bd3145f..bed71f9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -26,7 +26,6 @@ import java.util.List;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.scanner.ClassPathScanner;
-import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.client.DrillClient;
@@ -51,7 +50,7 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.server.options.SystemOptionManager;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Rule;
 import org.junit.Test;
@@ -73,7 +72,7 @@ public class TestHashJoin extends PopUnitTestBase {
   private final DrillConfig c = DrillConfig.create();
 
   private void testHJMockScanCommon(final DrillbitContext bitContext, UserServer.UserClientConnection connection, String physicalPlan, int expectedRows) throws Throwable {
-    final LocalPStoreProvider provider = new LocalPStoreProvider(c);
+    final LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(c);
     provider.start();
     final SystemOptionManager opt = new SystemOptionManager(PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c), provider);
     opt.init();

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
index 1f5aa22..bcd2f5e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/PStoreTestUtil.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
 public class PStoreTestUtil {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreTestUtil.class);
 
-  public static void test(PStoreProvider provider) throws Exception{
-    PStore<String> store = provider.getStore(PStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
+  public static void test(PersistentStoreProvider provider) throws Exception{
+    PersistentStore<String> store = provider.getOrCreateStore(PersistentStoreConfig.newJacksonBuilder(new ObjectMapper(), String.class).name("sys.test").build());
     String[] keys = {"first", "second"};
     String[] values = {"value1", "value2"};
     Map<String, String> expectedMap = Maps.newHashMap();
@@ -43,7 +43,7 @@ public class PStoreTestUtil {
     }
     // allow one second for puts to propagate back to cache
     {
-      Iterator<Map.Entry<String, String>> iter = store.iterator();
+      Iterator<Map.Entry<String, String>> iter = store.getAll();
       for(int i =0; i < keys.length; i++){
         Entry<String, String> e = iter.next();
         assertTrue(expectedMap.containsKey(e.getKey()));
@@ -54,15 +54,15 @@ public class PStoreTestUtil {
     }
 
     {
-      Iterator<Map.Entry<String, String>> iter = store.iterator();
+      Iterator<Map.Entry<String, String>> iter = store.getAll();
       while(iter.hasNext()){
-        iter.next();
-        iter.remove();
+        final String key = iter.next().getKey();
+        store.delete(key);
       }
     }
 
     // allow one second for deletes to propagate back to cache
 
-    assertFalse(store.iterator().hasNext());
+    assertFalse(store.getAll().hasNext());
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
index cbfbcd3..93e2497 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestPStoreProviders.java
@@ -23,18 +23,18 @@ import org.apache.curator.retry.RetryNTimes;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.TestWithZookeeper;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
-import org.apache.drill.exec.store.sys.zk.ZkPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider;
 import org.junit.Test;
 
 public class TestPStoreProviders extends TestWithZookeeper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPStoreProviders.class);
 
-  static LocalPStoreProvider provider;
+  static LocalPersistentStoreProvider provider;
 
   @Test
   public void verifyLocalStore() throws Exception {
-    try(LocalPStoreProvider provider = new LocalPStoreProvider(DrillConfig.create())){
+    try(LocalPersistentStoreProvider provider = new LocalPersistentStoreProvider(DrillConfig.create())){
       PStoreTestUtil.test(provider);
     }
   }
@@ -51,7 +51,7 @@ public class TestPStoreProviders extends TestWithZookeeper {
 
     try(CuratorFramework curator = builder.build()){
       curator.start();
-      ZkPStoreProvider provider = new ZkPStoreProvider(config, curator);
+      ZookeeperPersistentStoreProvider provider = new ZookeeperPersistentStoreProvider(config, curator);
       PStoreTestUtil.test(provider);
     }
   }


Mime
View raw message