drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [2/4] drill git commit: DRILL-4275: create TransientStore for short-lived objects; refactor PersistentStore to introduce pagination mechanism
Date Fri, 19 Feb 2016 05:29:04 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
deleted file mode 100644
index 4c79a28..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys;
-
-
-/**
- * Interface to define the provider which return EStore.
- */
-
-public interface EStoreProvider extends PStoreProvider {
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
index 85a1c3c..5c8a641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java
@@ -21,15 +21,14 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.server.options.DrillConfigIterator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.Kind;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 
 public class OptionIterator implements Iterator<Object> {

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
deleted file mode 100644
index b629645..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import java.util.Map;
-
-
-/**
- * Interface for reading and writing values to a persistent storage provider.  Iterators are guaranteed to be returned in key order.
- * @param <V>
- */
-public interface PStore<V> extends Iterable<Map.Entry<String, V>> {
-  public V get(String key);
-  public void put(String key, V value);
-  public boolean putIfAbsent(String key, V value);
-  public void delete(String key);
-  public void close();
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
deleted file mode 100644
index bd9d977..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import org.apache.drill.exec.store.sys.serialize.JacksonSerializer;
-import org.apache.drill.exec.store.sys.serialize.PClassSerializer;
-import org.apache.drill.exec.store.sys.serialize.ProtoSerializer;
-
-import com.dyuproject.protostuff.Schema;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-
-public class PStoreConfig<V> {
-
-  private final String name;
-  private final PClassSerializer<V> valueSerializer;
-  private final Mode mode;
-  private final int maxIteratorSize;
-
-  public static enum Mode {PERSISTENT, EPHEMERAL, BLOB_PERSISTENT};
-
-  private PStoreConfig(String name, PClassSerializer<V> valueSerializer, Mode mode, int maxIteratorSize) {
-    super();
-    this.name = name;
-    this.valueSerializer = valueSerializer;
-    this.mode = mode;
-    this.maxIteratorSize = Math.abs(maxIteratorSize);
-  }
-
-  public Mode getMode() {
-    return mode;
-  }
-
-  public int getMaxIteratorSize() {
-    return maxIteratorSize;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public PClassSerializer<V> getSerializer() {
-    return valueSerializer;
-  }
-
-  public static <V extends Message, X extends Builder> PStoreConfigBuilder<V> newProtoBuilder(Schema<V> writeSchema, Schema<X> readSchema) {
-    return new PStoreConfigBuilder<V>(new ProtoSerializer<V, X>(writeSchema, readSchema));
-  }
-
-  public static <V> PStoreConfigBuilder<V> newJacksonBuilder(ObjectMapper mapper, Class<V> clazz) {
-    return new PStoreConfigBuilder<V>(new JacksonSerializer<V>(mapper, clazz));
-  }
-
-  public static class PStoreConfigBuilder<V> {
-    String name;
-    PClassSerializer<V> serializer;
-    Mode mode = Mode.PERSISTENT;
-    int maxIteratorSize = Integer.MAX_VALUE;
-
-    PStoreConfigBuilder(PClassSerializer<V> serializer) {
-      super();
-      this.serializer = serializer;
-    }
-
-    public PStoreConfigBuilder<V> name(String name) {
-      this.name = name;
-      return this;
-    }
-
-    public PStoreConfigBuilder<V> persist(){
-      this.mode = Mode.PERSISTENT;
-      return this;
-    }
-
-    public PStoreConfigBuilder<V> ephemeral(){
-      this.mode = Mode.EPHEMERAL;
-      return this;
-    }
-
-    public PStoreConfigBuilder<V> blob(){
-      this.mode = Mode.BLOB_PERSISTENT;
-      return this;
-    }
-
-    /**
-     * Set the maximum size of the iterator.  Positive numbers start from the start of the list.  Negative numbers start from the end of the list.
-     * @param size
-     * @return
-     */
-    public PStoreConfigBuilder<V> max(int size){
-      this.maxIteratorSize = size;
-      return this;
-    }
-
-    public PStoreConfig<V> build(){
-      Preconditions.checkNotNull(name);
-      return new PStoreConfig<V>(name, serializer, mode, maxIteratorSize);
-    }
-
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + maxIteratorSize;
-    result = prime * result + ((mode == null) ? 0 : mode.hashCode());
-    result = prime * result + ((name == null) ? 0 : name.hashCode());
-    result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    PStoreConfig other = (PStoreConfig) obj;
-    if (maxIteratorSize != other.maxIteratorSize) {
-      return false;
-    }
-    if (mode != other.mode) {
-      return false;
-    }
-    if (name == null) {
-      if (other.name != null) {
-        return false;
-      }
-    } else if (!name.equals(other.name)) {
-      return false;
-    }
-    if (valueSerializer == null) {
-      if (other.valueSerializer != null) {
-        return false;
-      }
-    } else if (!valueSerializer.equals(other.valueSerializer)) {
-      return false;
-    }
-    return true;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
deleted file mode 100644
index efa223e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import java.io.IOException;
-
-public interface PStoreProvider extends AutoCloseable {
-  public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException;
-  public void start() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
deleted file mode 100644
index 532e6be..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.coord.ClusterCoordinator;
-
-import com.typesafe.config.ConfigException;
-
-public class PStoreRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreRegistry.class);
-
-  private DrillConfig config;
-  private ClusterCoordinator coord;
-
-  public PStoreRegistry(ClusterCoordinator coord, DrillConfig config) {
-    this.coord = coord;
-    this.config = config;
-  }
-
-  public ClusterCoordinator getClusterCoordinator() {
-    return this.coord;
-  }
-
-  public DrillConfig getConfig() {
-    return this.config;
-  }
-
-  @SuppressWarnings("unchecked")
-  public PStoreProvider newPStoreProvider() throws ExecutionSetupException {
-    try {
-      String storeProviderClassName = config.getString(ExecConstants.SYS_STORE_PROVIDER_CLASS);
-      logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName);
-      Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>) Class.forName(storeProviderClassName);
-      Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class);
-      return new CachingStoreProvider(c.newInstance(this));
-    } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException
-        | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      logger.error(e.getMessage(), e);
-      throw new ExecutionSetupException("A System Table provider was either not specified or could not be found or instantiated", e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
new file mode 100644
index 0000000..767b1d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * An abstraction used to store and retrieve instances of given value type.
+ *
+ * @param <V>  value type
+ */
+public interface PersistentStore<V> extends AutoCloseable {
+  /**
+   * Returns storage {@link PersistentStoreMode mode} of this store.
+   */
+  PersistentStoreMode getMode();
+
+  /**
+   * Returns the value for the given key if exists, null otherwise.
+   * @param key  lookup key
+   */
+  V get(String key);
+
+  /**
+   * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}.
+   *
+   * @param key  lookup key
+   * @param value  value to store
+   */
+  void put(String key, V value);
+
+
+  /**
+   * Removes the value corresponding to the given key if exists, nothing happens otherwise.
+   * @param key  lookup key
+   */
+  void delete(String key);
+
+  /**
+   * Stores the (key, value) tuple in the store only if it does not exists.
+   *
+   * @param key  lookup key
+   * @param value  value to store
+   * @return  true if put takes place, false otherwise.
+   */
+  boolean putIfAbsent(String key, V value);
+
+  /**
+   * Returns an iterator of desired number of entries offsetting by the skip value.
+   *
+   * @param skip  number of records to skip from beginning
+   * @param take  max number of records to return
+   */
+  Iterator<Map.Entry<String, V>> getRange(int skip, int take);
+
+  /**
+   * Returns an iterator of entries.
+   */
+  Iterator<Map.Entry<String, V>> getAll();
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
new file mode 100644
index 0000000..ca319f2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import com.dyuproject.protostuff.Schema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.apache.drill.exec.serialization.JacksonSerializer;
+import org.apache.drill.exec.serialization.ProtoSerializer;
+
+
+/**
+ * An abstraction for configurations that are used to create a {@link PersistentStore store}.
+ *
+ * @param <V>  value type of which {@link PersistentStore} uses to store & retrieve instances
+ */
+public class PersistentStoreConfig<V> {
+
+  private final String name;
+  private final InstanceSerializer<V> valueSerializer;
+  private final PersistentStoreMode mode;
+
+  protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode) {
+    this.name = name;
+    this.valueSerializer = valueSerializer;
+    this.mode = mode;
+  }
+
+  public PersistentStoreMode getMode() {
+    return mode;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public InstanceSerializer<V> getSerializer() {
+    return valueSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(name, valueSerializer, mode);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof PersistentStoreConfig) {
+      final PersistentStoreConfig other = PersistentStoreConfig.class.cast(obj);
+      return Objects.equal(name, other.name)
+          && Objects.equal(valueSerializer, other.valueSerializer)
+          && Objects.equal(mode, other.mode);
+    }
+    return false;
+  }
+
+  public static <V extends Message, X extends Builder> StoreConfigBuilder<V> newProtoBuilder(Schema<V> writeSchema, Schema<X> readSchema) {
+    return new StoreConfigBuilder<>(new ProtoSerializer<>(readSchema, writeSchema));
+  }
+
+  public static <V> StoreConfigBuilder<V> newJacksonBuilder(ObjectMapper mapper, Class<V> clazz) {
+    return new StoreConfigBuilder<>(new JacksonSerializer<>(mapper, clazz));
+  }
+
+  public static class StoreConfigBuilder<V> {
+    private String name;
+    private InstanceSerializer<V> serializer;
+    private PersistentStoreMode mode = PersistentStoreMode.PERSISTENT;
+
+    protected StoreConfigBuilder(InstanceSerializer<V> serializer) {
+      super();
+      this.serializer = serializer;
+    }
+
+    public StoreConfigBuilder<V> name(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public StoreConfigBuilder<V> persist(){
+      this.mode = PersistentStoreMode.PERSISTENT;
+      return this;
+    }
+
+    public StoreConfigBuilder<V> blob(){
+      this.mode = PersistentStoreMode.BLOB_PERSISTENT;
+      return this;
+    }
+
+    public PersistentStoreConfig<V> build(){
+      Preconditions.checkNotNull(name);
+      return new PersistentStoreConfig<>(name, serializer, mode);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java
new file mode 100644
index 0000000..68d0cb6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+/**
+ * Defines operation mode of a {@link PersistentStore} instance.
+ */
+public enum PersistentStoreMode {
+  PERSISTENT,
+  BLOB_PERSISTENT
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
new file mode 100644
index 0000000..75b89b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import org.apache.drill.exec.exception.StoreException;
+
+/**
+ * A factory used to create {@link PersistentStore store} instances.
+ *
+ */
+public interface PersistentStoreProvider extends AutoCloseable {
+  /**
+   * Gets or creates a {@link PersistentStore persistent store} for the given configuration.
+   *
+   * Note that implementors have liberty to cache previous {@link PersistentStore store} instances.
+   *
+   * @param config  store configuration
+   * @param <V>  store value type
+   */
+  <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException;
+
+
+  /**
+   * Sets up the provider.
+   */
+  void start() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java
new file mode 100644
index 0000000..b117513
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+
+public class PersistentStoreRegistry<C extends ClusterCoordinator> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PersistentStoreRegistry.class);
+
+  private final DrillConfig config;
+  private final C coordinator;
+
+  public PersistentStoreRegistry(C coordinator, DrillConfig config) {
+    this.coordinator = Preconditions.checkNotNull(coordinator, "coordinator cannot be null");
+    this.config = Preconditions.checkNotNull(config, "config cannot be null");
+  }
+
+  public C getCoordinator() {
+    return this.coordinator;
+  }
+
+  public DrillConfig getConfig() {
+    return this.config;
+  }
+
+  @SuppressWarnings("unchecked")
+  public PersistentStoreProvider newPStoreProvider() throws ExecutionSetupException {
+    try {
+      String storeProviderClassName = config.getString(ExecConstants.SYS_STORE_PROVIDER_CLASS);
+      logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName);
+      Class<? extends PersistentStoreProvider> storeProviderClass = (Class<? extends PersistentStoreProvider>) Class.forName(storeProviderClassName);
+      Constructor<? extends PersistentStoreProvider> c = storeProviderClass.getConstructor(PersistentStoreRegistry.class);
+      return new CachingPersistentStoreProvider(c.newInstance(this));
+    } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException
+        | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      logger.error(e.getMessage(), e);
+      throw new ExecutionSetupException("A System Table provider was either not specified or could not be found or instantiated", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
index 8b7225e..0a9b9b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.store.sys;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.RecordDataType;
 import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.store.pojo.PojoDataType;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 
 /**
  * A {@link org.apache.drill.exec.planner.logical.DrillTable} with a defined schema

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
index 33f030b..4fb0475 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java
@@ -34,8 +34,8 @@ import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
-import org.apache.drill.exec.store.pojo.PojoDataType;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.pojo.PojoDataType;
 
 /**
  * A "storage" plugin for system tables.

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
index 8ade25c..2c3bba4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java
@@ -21,7 +21,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -36,11 +40,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
 @JsonTypeName("sys")
 public class SystemTableScan extends AbstractGroupScan implements SubScan {
   // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
index e9bc7ff..681119d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java
@@ -17,13 +17,13 @@
  */
 package org.apache.drill.exec.store.sys;
 
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
 import java.util.Iterator;
 
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
 public class ThreadsIterator implements Iterator<Object> {
 
   private boolean beforeFirst = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java
index 5620ece..9bfb700 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java
@@ -19,10 +19,8 @@ package org.apache.drill.exec.store.sys;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Properties;
-import java.util.jar.Manifest;
 
 import com.google.common.io.Resources;
 import org.apache.drill.common.util.DrillVersionInfo;

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
deleted file mode 100644
index ebee7a8..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.local;
-
-import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class FilePStore<V> implements PStore<V> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilePStore.class);
-
-
-  private final Path basePath;
-  private final PStoreConfig<V> config;
-  private final DrillFileSystem fs;
-
-  public FilePStore(DrillFileSystem fs, Path base, PStoreConfig<V> config) {
-    super();
-    this.basePath = new Path(base, config.getName());
-    this.config = config;
-    this.fs = fs;
-
-    try {
-      mkdirs(basePath);
-    } catch (IOException e) {
-      throw new RuntimeException("Failure setting pstore configuration path.");
-    }
-  }
-
-  private void mkdirs(Path path) throws IOException{
-    fs.mkdirs(path);
-  }
-
-  public static Path getLogDir(){
-    String drillLogDir = System.getenv("DRILL_LOG_DIR");
-    if (drillLogDir == null) {
-      drillLogDir = "/var/log/drill";
-    }
-    return new Path(new File(drillLogDir).getAbsoluteFile().toURI());
-  }
-
-  public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{
-    Path blobRoot = root == null ? getLogDir() : root;
-    Configuration fsConf = new Configuration();
-    if(blobRoot.toUri().getScheme() != null){
-      fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
-    }
-
-
-    DrillFileSystem fs = new DrillFileSystem(fsConf);
-    fs.mkdirs(blobRoot);
-    return fs;
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    try{
-      List<FileStatus> f = fs.list(false, basePath);
-      if (f == null || f.isEmpty()) {
-        return Collections.emptyIterator();
-      }
-      List<String> files = Lists.newArrayList();
-
-      for (FileStatus stat : f) {
-        String s = stat.getPath().getName();
-        if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
-          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
-        }
-      }
-
-      Collections.sort(files);
-      files = files.subList(0, Math.min(files.size(), config.getMaxIteratorSize()));
-      return new Iter(files.iterator());
-
-    }catch(IOException e){
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Path makePath(String name) {
-    Preconditions.checkArgument(
-        !name.contains("/") &&
-        !name.contains(":") &&
-        !name.contains(".."));
-
-    final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
-    // do this to check file name.
-    return path;
-  }
-
-  public V get(String key) {
-    try{
-      Path path = makePath(key);
-      if(!fs.exists(path)){
-        return null;
-      }
-    }catch(IOException e){
-      throw new RuntimeException(e);
-    }
-
-    final Path path = makePath(key);
-    try (InputStream is = fs.open(path)) {
-      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
-    }
-  }
-
-  public void put(String key, V value) {
-    try (OutputStream os = fs.create(makePath(key))) {
-      IOUtils.write(config.getSerializer().serialize(value), os);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    try {
-      Path p = makePath(key);
-      if (fs.exists(p)) {
-        return false;
-      } else {
-        put(key, value);
-        return true;
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public void delete(String key) {
-    try {
-      fs.delete(makePath(key), false);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private class Iter implements Iterator<Entry<String, V>>{
-
-    private Iterator<String> keys;
-    private String current;
-
-    public Iter(Iterator<String> keys) {
-      super();
-      this.keys = keys;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return keys.hasNext();
-    }
-
-    @Override
-    public Entry<String, V> next() {
-      current = keys.next();
-      return new DeferredEntry(current);
-    }
-
-    @Override
-    public void remove() {
-      delete(current);
-      keys.remove();
-    }
-
-    private class DeferredEntry implements Entry<String, V> {
-
-      private String name;
-
-
-      public DeferredEntry(String name) {
-        super();
-        this.name = name;
-      }
-
-      @Override
-      public String getKey() {
-        return name;
-      }
-
-      @Override
-      public V getValue() {
-        return get(name);
-      }
-
-      @Override
-      public V setValue(V value) {
-        throw new UnsupportedOperationException();
-      }
-
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
deleted file mode 100644
index e7c2f94..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys.local;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.EStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-
-import com.google.common.base.Preconditions;
-
-public class LocalEStoreProvider implements EStoreProvider{
-
-  @Override
-  public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
-    Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral.");
-
-    return new MapEStore<V>();
-  }
-
-  @Override
-  public void start() throws IOException {
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
index 3131290..cda1180 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,77 +17,18 @@
  */
 package org.apache.drill.exec.store.sys.local;
 
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.hadoop.fs.Path;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 
 /**
- * A really simple provider that stores data in the local file system, one value per file.
+ * Kept for possible references to old class name in configuration.
+ *
+ * @deprecated will be removed in 1.7
+ *    use {@link org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider} instead.
  */
-public class LocalPStoreProvider implements PStoreProvider {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class);
-
-  private final Path path;
-  private final boolean enableWrite;
-  private final ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores;
-  private final LocalEStoreProvider estoreProvider;
-  private final DrillFileSystem fs;
-
-  public LocalPStoreProvider(DrillConfig config) throws IOException {
-    this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
-    this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
-    this.pstores = enableWrite ? null : new ConcurrentHashMap<PStoreConfig<?>, PStore<?>>();
-    this.estoreProvider = new LocalEStoreProvider();
-    this.fs = FilePStore.getFileSystem(config, path);
-  }
-
-  public LocalPStoreProvider(PStoreRegistry registry) throws IOException {
-    this(registry.getConfig());
-  }
-
-  @Override
-  public void close() {
+public class LocalPStoreProvider extends LocalPersistentStoreProvider {
+  public LocalPStoreProvider(PersistentStoreRegistry registry) throws StoreException {
+    super(registry);
   }
-
-  @Override
-  public <V> PStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException {
-    switch(storeConfig.getMode()){
-    case EPHEMERAL:
-      return estoreProvider.getStore(storeConfig);
-    case BLOB_PERSISTENT:
-    case PERSISTENT:
-      return getPStore(storeConfig);
-    default:
-      throw new IllegalStateException();
-    }
-
-  }
-
-  private <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException {
-    if (enableWrite) {
-      return new FilePStore<V>(fs, path, storeConfig);
-    } else {
-      PStore<V> p = new NoWriteLocalPStore<V>();
-      PStore<?> p2 = pstores.putIfAbsent(storeConfig, p);
-      if(p2 != null) {
-        return (PStore<V>) p2;
-      }
-      return p;
-    }
-  }
-
-
-  @Override
-  public void start() {
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
deleted file mode 100644
index 96e51e6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys.local;
-
-import org.apache.drill.exec.store.sys.EStore;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Implementation of EStore using ConcurrentHashMap.
- * @param <V>
- */
-public class MapEStore<V> implements EStore<V> {
-  ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>();
-
-  @Override
-  public V get(String key) {
-    return store.get(key);
-  }
-
-  @Override
-  public void put(String key, V value) {
-    store.put(key, value);
-  }
-
-  @Override
-  public void delete(String key) {
-    store.remove(key);
-  }
-
-  @Override
-  public Iterator<Map.Entry<String, V>> iterator() {
-    return store.entrySet().iterator();
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    V out = store.putIfAbsent(key, value);
-    return out == null;
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
deleted file mode 100644
index c675618..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.local;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.exec.store.sys.PStore;
-
-import com.google.common.collect.Maps;
-
-public class NoWriteLocalPStore<V> implements PStore<V>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoWriteLocalPStore.class);
-
-  private ConcurrentMap<String, V> map = Maps.newConcurrentMap();
-
-  private ConcurrentMap<String, V> blobMap = Maps.newConcurrentMap();
-
-  public NoWriteLocalPStore() {
-    super();
-  }
-
-  @Override
-  public Iterator<Entry<String, V>> iterator() {
-    return map.entrySet().iterator();
-  }
-
-  @Override
-  public V get(String key) {
-    return map.get(key);
-  }
-
-  @Override
-  public void put(String key, V value) {
-    map.put(key, value);
-  }
-
-  @Override
-  public boolean putIfAbsent(String key, V value) {
-    return null == map.putIfAbsent(key, value);
-  }
-
-  @Override
-  public void delete(String key) {
-    map.remove(key);
-    blobMap.remove(key);
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
deleted file mode 100644
index 53452f3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.serialize;
-
-import java.io.IOException;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-
-public class JacksonSerializer<X> implements PClassSerializer<X> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializer.class);
-
-  private ObjectWriter writer;
-  private ObjectReader reader;
-
-  public JacksonSerializer(ObjectMapper mapper, Class<X> clazz){
-    this.reader = mapper.reader(clazz);
-    this.writer = mapper.writer();
-  }
-
-  @Override
-  public byte[] serialize(X val) throws IOException {
-    return writer.writeValueAsBytes(val);
-  }
-
-  @Override
-  public X deserialize(byte[] bytes) throws IOException {
-    return reader.readValue(bytes);
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((reader == null) ? 0 : reader.hashCode());
-    result = prime * result + ((writer == null) ? 0 : writer.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    JacksonSerializer other = (JacksonSerializer) obj;
-    if (reader == null) {
-      if (other.reader != null) {
-        return false;
-      }
-    } else if (!reader.equals(other.reader)) {
-      return false;
-    }
-    if (writer == null) {
-      if (other.writer != null) {
-        return false;
-      }
-    } else if (!writer.equals(other.writer)) {
-      return false;
-    }
-    return true;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java
deleted file mode 100644
index a3b0c1b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.serialize;
-
-import java.io.IOException;
-
-public interface PClassSerializer<X> {
-  public byte[] serialize(X val) throws IOException;
-  public X deserialize(byte[] bytes) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
deleted file mode 100644
index 52df7a4..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys.serialize;
-
-import java.io.IOException;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-
-import com.dyuproject.protostuff.JsonIOUtil;
-import com.dyuproject.protostuff.Schema;
-import com.google.protobuf.Message;
-
-public class ProtoSerializer<X, B extends Message.Builder> implements PClassSerializer<X> {
-
-  private final Schema<X> writeSchema;
-  private final Schema<B> readSchema;
-
-  public ProtoSerializer(Schema<X> writeSchema, Schema<B> readSchema) {
-    super();
-    this.writeSchema = writeSchema;
-    this.readSchema = readSchema;
-  }
-
-  @Override
-  public byte[] serialize(X val) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    JsonIOUtil.writeTo(baos, val, writeSchema, false);
-    return baos.toByteArray();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public X deserialize(byte[] bytes) throws IOException {
-    B b = readSchema.newMessage();
-    JsonIOUtil.mergeFrom(bytes, b, readSchema, false);
-    return (X) b.build();
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode());
-    result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    ProtoSerializer other = (ProtoSerializer) obj;
-    if (readSchema == null) {
-      if (other.readSchema != null) {
-        return false;
-      }
-    } else if (!readSchema.equals(other.readSchema)) {
-      return false;
-    }
-    if (writeSchema == null) {
-      if (other.writeSchema != null) {
-        return false;
-      }
-    } else if (!writeSchema.equals(other.writeSchema)) {
-      return false;
-    }
-    return true;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
new file mode 100644
index 0000000..1ef8d12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalPersistentStore<V> extends BasePersistentStore<V> {
+  private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class);
+
+  private final Path basePath;
+  private final PersistentStoreConfig<V> config;
+  private final DrillFileSystem fs;
+
+  public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) {
+    super();
+    this.basePath = new Path(base, config.getName());
+    this.config = config;
+    this.fs = fs;
+
+    try {
+      mkdirs(basePath);
+    } catch (IOException e) {
+      throw new RuntimeException("Failure setting pstore configuration path.");
+    }
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return PersistentStoreMode.PERSISTENT;
+  }
+
+  private void mkdirs(Path path) throws IOException{
+    fs.mkdirs(path);
+  }
+
+  public static Path getLogDir(){
+    String drillLogDir = System.getenv("DRILL_LOG_DIR");
+    if (drillLogDir == null) {
+      drillLogDir = "/var/log/drill";
+    }
+    return new Path(new File(drillLogDir).getAbsoluteFile().toURI());
+  }
+
+  public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{
+    Path blobRoot = root == null ? getLogDir() : root;
+    Configuration fsConf = new Configuration();
+    if(blobRoot.toUri().getScheme() != null){
+      fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
+    }
+
+
+    DrillFileSystem fs = new DrillFileSystem(fsConf);
+    fs.mkdirs(blobRoot);
+    return fs;
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+    try{
+      List<FileStatus> f = fs.list(false, basePath);
+      if (f == null || f.isEmpty()) {
+        return Collections.emptyIterator();
+      }
+      List<String> files = Lists.newArrayList();
+
+      for (FileStatus stat : f) {
+        String s = stat.getPath().getName();
+        if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+          files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+        }
+      }
+
+      Collections.sort(files);
+      return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
+        @Nullable
+        @Override
+        public Entry<String, V> apply(String key) {
+          return new ImmutableEntry<>(key, get(key));
+        }
+      }).iterator();
+    }catch(IOException e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Path makePath(String name) {
+    Preconditions.checkArgument(
+        !name.contains("/") &&
+        !name.contains(":") &&
+        !name.contains(".."));
+
+    final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+    // do this to check file name.
+    return path;
+  }
+
+  public V get(String key) {
+    try{
+      Path path = makePath(key);
+      if(!fs.exists(path)){
+        return null;
+      }
+    }catch(IOException e){
+      throw new RuntimeException(e);
+    }
+
+    final Path path = makePath(key);
+    try (InputStream is = fs.open(path)) {
+      return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+    }
+  }
+
+  public void put(String key, V value) {
+    try (OutputStream os = fs.create(makePath(key))) {
+      IOUtils.write(config.getSerializer().serialize(value), os);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    try {
+      Path p = makePath(key);
+      if (fs.exists(p)) {
+        return false;
+      } else {
+        put(key, value);
+        return true;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void delete(String key) {
+    try {
+      fs.delete(makePath(key), false);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
new file mode 100644
index 0000000..3dde4b8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.zk.PathUtils;
+import org.apache.drill.exec.coord.zk.ZookeeperClient;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * Zookeeper based implementation of {@link org.apache.drill.exec.store.sys.PersistentStore}.
+ */
+public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperPersistentStore.class);
+
+  private final PersistentStoreConfig<V> config;
+  private final ZookeeperClient client;
+
+  public ZookeeperPersistentStore(final CuratorFramework framework, final PersistentStoreConfig<V> config) throws StoreException {
+    this.config = Preconditions.checkNotNull(config);
+    this.client = new ZookeeperClient(framework, PathUtils.join("/", config.getName()), CreateMode.PERSISTENT);
+  }
+
+  public void start() throws Exception {
+    client.start();
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return config.getMode();
+  }
+
+  @Override
+  public V get(final String key) {
+    final byte[] bytes = client.get(key);
+    if (bytes == null) {
+      return null;
+    }
+    try {
+      return config.getSerializer().deserialize(bytes);
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e);
+    }
+  }
+
+  @Override
+  public void put(final String key, final V value) {
+    final InstanceSerializer<V> serializer = config.getSerializer();
+    try {
+      final byte[] bytes = serializer.serialize(value);
+      client.put(key, bytes);
+    } catch (final IOException e) {
+      throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e);
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(final String key, final V value) {
+    final V old = get(key);
+    if (old == null) {
+      try {
+        final byte[] bytes = config.getSerializer().serialize(value);
+        client.put(key, bytes);
+        return true;
+      } catch (final IOException e) {
+        throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void delete(final String key) {
+    client.delete(key);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) {
+    final Iterator<Map.Entry<String, byte[]>> entries = client.entries();
+    Iterators.advance(entries, skip);
+    return Iterators.transform(Iterators.limit(entries, take), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() {
+      @Nullable
+      @Override
+      public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) {
+        try {
+          final V value = config.getSerializer().deserialize(input.getValue());
+          return new ImmutableEntry<>(input.getKey(), value);
+        } catch (final IOException e) {
+          throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e);
+        }
+      }
+    });
+  }
+
+  @Override
+  public void close() {
+    try{
+      client.close();
+    } catch(final Exception e) {
+      logger.warn("Failure while closing out %s.", getClass().getSimpleName(), e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java
new file mode 100644
index 0000000..e497a4c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+
+public abstract class BasePersistentStoreProvider implements PersistentStoreProvider {
+  @Override
+  public void start() throws Exception { }
+
+  @Override
+  public void close() throws Exception { }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
new file mode 100644
index 0000000..99ccc8e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+
+public class CachingPersistentStoreProvider extends BasePersistentStoreProvider {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class);
+
+  private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = Maps.newConcurrentMap();
+  private final PersistentStoreProvider provider;
+
+  public CachingPersistentStoreProvider(PersistentStoreProvider provider) {
+    this.provider = provider;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException {
+    final PersistentStore<?> store = storeCache.get(config);
+    if (store == null) {
+      final PersistentStore<?> newStore = provider.getOrCreateStore(config);
+      final PersistentStore<?> finalStore = storeCache.putIfAbsent(config, newStore);
+      if (finalStore == null) {
+        return (PersistentStore<V>)newStore;
+      }
+      try {
+        newStore.close();
+      } catch (Exception ex) {
+        throw new StoreException(ex);
+      }
+    }
+
+    return (PersistentStore<V>) store;
+  }
+
+  @Override
+  public void start() throws Exception {
+    provider.start();
+  }
+
+  @Override
+  public void close() throws Exception {
+    final List<AutoCloseable> closeables = Lists.newArrayList();
+    for (final AutoCloseable store : storeCache.values()) {
+      closeables.add(store);
+    }
+    closeables.add(provider);
+    storeCache.clear();
+    AutoCloseables.close(closeables);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
new file mode 100644
index 0000000..9bf18ab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import java.io.IOException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.testing.store.NoWriteLocalStore;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A really simple provider that stores data in the local file system, one value per file.
+ */
+public class LocalPersistentStoreProvider extends BasePersistentStoreProvider {
+  private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStoreProvider.class);
+
+  private final Path path;
+  private final DrillFileSystem fs;
+  // This flag is used in testing. Ideally, tests should use a specific PersistentStoreProvider that knows
+  // how to handle this flag.
+  private final boolean enableWrite;
+
+  public LocalPersistentStoreProvider(final PersistentStoreRegistry registry) throws StoreException {
+    this(registry.getConfig());
+  }
+
+  public LocalPersistentStoreProvider(final DrillConfig config) throws StoreException {
+    this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH));
+    this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE);
+    try {
+      this.fs = LocalPersistentStore.getFileSystem(config, path);
+    } catch (IOException e) {
+      throw new StoreException("unable to get filesystem", e);
+    }
+  }
+
+  @Override
+  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> storeConfig) {
+    switch(storeConfig.getMode()){
+    case BLOB_PERSISTENT:
+    case PERSISTENT:
+      if (enableWrite) {
+        return new LocalPersistentStore<>(fs, path, storeConfig);
+      }
+      return new NoWriteLocalStore<>();
+    default:
+      throw new IllegalStateException();
+    }
+  }
+
+
+  @Override
+  public void close() throws Exception {
+    fs.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
new file mode 100644
index 0000000..58c46a7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvider {
+  private static final Logger logger = LoggerFactory.getLogger(ZookeeperPersistentStoreProvider.class);
+
+  private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot";
+
+  private final CuratorFramework curator;
+  private final DrillFileSystem fs;
+  private final Path blobRoot;
+
+  public ZookeeperPersistentStoreProvider(final PersistentStoreRegistry<ZKClusterCoordinator> registry) throws StoreException {
+    this(registry.getConfig(), registry.getCoordinator().getCurator());
+  }
+
+  @VisibleForTesting
+  public ZookeeperPersistentStoreProvider(final DrillConfig config, final CuratorFramework curator) throws StoreException {
+    this.curator = curator;
+
+    if (config.hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) {
+      blobRoot = new Path(config.getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT));
+    }else{
+      blobRoot = LocalPersistentStore.getLogDir();
+    }
+
+    try {
+      this.fs = LocalPersistentStore.getFileSystem(config, blobRoot);
+    } catch (IOException ex) {
+      throw new StoreException("unable to get filesystem", ex);
+    }
+  }
+
+  @Override
+  public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException {
+    switch(config.getMode()){
+    case BLOB_PERSISTENT:
+      return new LocalPersistentStore<>(fs, blobRoot, config);
+    case PERSISTENT:
+      final ZookeeperPersistentStore<V> store = new ZookeeperPersistentStore<>(curator, config);
+      try {
+        store.start();
+      } catch (Exception e) {
+        throw new StoreException("unable to start zookeeper store", e);
+      }
+      return store;
+    default:
+      throw new IllegalStateException();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    fs.close();
+  }
+}


Mime
View raw message