drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [04/12] drill git commit: DRILL-5481: Allow to persist profiles in-memory only with a max capacity
Date Sat, 03 Jun 2017 04:45:59 GMT
DRILL-5481: Allow to persist profiles in-memory only with a max capacity

1. Introduced an InMemoryStoreProvider with the ability to maintain a max capacity
2. DrillbitContext now explicitly has a profileStoreProvider that, by default, re-uses the
general PersistentStoreProvider, unless it is InMemory, which is when #1 is used.
2. Cleanly separated out QueryProfileStoreContext
3. Converted literal values to constants within ExecConstants
4. Updated drill-module.conf for default capacity

closes #834


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ba4af86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ba4af86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ba4af86

Branch: refs/heads/master
Commit: 9ba4af860e3def8f880eef13e353a730cb3b18ea
Parents: d7bc213
Author: Kunal Khatua <kkhatua@maprtech.com>
Authored: Mon May 15 13:33:49 2017 -0700
Committer: Jinfeng Ni <jni@apache.org>
Committed: Fri Jun 2 21:43:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../org/apache/drill/exec/ops/QueryContext.java |   5 +
 .../org/apache/drill/exec/server/Drillbit.java  |  20 ++-
 .../drill/exec/server/DrillbitContext.java      |  22 ++-
 .../exec/server/QueryProfileStoreContext.java   |  79 ++++++++++
 .../server/rest/profile/ProfileResources.java   |  14 +-
 .../exec/store/sys/PersistentStoreConfig.java   |  16 ++-
 .../exec/store/sys/store/InMemoryStore.java     | 143 +++++++++++++++++++
 .../store/provider/InMemoryStoreProvider.java   |  51 +++++++
 .../org/apache/drill/exec/work/WorkManager.java |   5 +-
 .../drill/exec/work/foreman/QueryManager.java   |  32 ++---
 .../src/main/resources/drill-module.conf        |   4 +
 12 files changed, 357 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 83ffb20..ba98532 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -107,6 +107,8 @@ public interface ExecConstants {
   String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
   String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
   String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
+  String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory";
+  String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity";
   String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled";
   String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops";
   String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms";

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index df3f4f4..0dbeea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.QueryOptionManager;
@@ -209,6 +210,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext,
Schem
     return drillbitContext.getConfig();
   }
 
+  public QueryProfileStoreContext getProfileStoreContext() {
+    return drillbitContext.getProfileStoreContext();
+  }
+
   @Override
   public FunctionImplementationRegistry getFunctionRegistry() {
     return drillbitContext.getFunctionImplementationRegistry();

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index f225714..0d341df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.rest.WebServer;
 import org.apache.drill.exec.service.ServiceEngine;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
 import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
@@ -76,6 +77,7 @@ public class Drillbit implements AutoCloseable {
   private final WebServer webServer;
   private RegistrationHandle registrationHandle;
   private volatile StoragePluginRegistry storageRegistry;
+  private final PersistentStoreProvider profileStoreProvider;
 
   @VisibleForTesting
   public Drillbit(
@@ -105,6 +107,14 @@ public class Drillbit implements AutoCloseable {
       isDistributedMode = true;
     }
 
+    //Check if InMemory Profile Store, else use Default Store Provider
+    if (config.getBoolean(ExecConstants.PROFILES_STORE_INMEMORY)) {
+      profileStoreProvider = new InMemoryStoreProvider(config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
+      logger.info("Upto {} latest query profiles will be retained in-memory", config.getInt(ExecConstants.PROFILES_STORE_CAPACITY));
+    } else {
+      profileStoreProvider = storeProvider;
+    }
+
     engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode);
 
     logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
@@ -115,8 +125,11 @@ public class Drillbit implements AutoCloseable {
     logger.debug("Startup begun.");
     coord.start(10000);
     storeProvider.start();
+    if (profileStoreProvider != storeProvider) {
+      profileStoreProvider.start();
+    }
     final DrillbitEndpoint md = engine.start();
-    manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
+    manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider,
profileStoreProvider);
     final DrillbitContext drillbitContext = manager.getContext();
     storageRegistry = drillbitContext.getStorage();
     storageRegistry.init();
@@ -164,6 +177,11 @@ public class Drillbit implements AutoCloseable {
           manager,
           storageRegistry,
           context);
+
+      //Closing the profile store provider if distinct
+      if (storeProvider != profileStoreProvider) {
+        AutoCloseables.close(profileStoreProvider);
+      }
     } catch(Exception e) {
       logger.warn("Failure on close()", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 6c68ab2..b8d3e68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -65,7 +65,7 @@ public class DrillbitContext implements AutoCloseable {
   private final LogicalPlanPersistence lpPersistence;
   // operator table for standard SQL operators and functions, Drill built-in UDFs
   private final DrillOperatorTable table;
-
+  private final QueryProfileStoreContext profileStoreContext;
 
   public DrillbitContext(
       DrillbitEndpoint endpoint,
@@ -75,6 +75,19 @@ public class DrillbitContext implements AutoCloseable {
       DataConnectionCreator connectionsPool,
       WorkEventBus workBus,
       PersistentStoreProvider provider) {
+    //PersistentStoreProvider is re-used for providing Query Profile Store as well
+    this(endpoint, context, coord, controller, connectionsPool, workBus, provider, provider);
+  }
+
+  public DrillbitContext(
+      DrillbitEndpoint endpoint,
+      BootStrapContext context,
+      ClusterCoordinator coord,
+      Controller controller,
+      DataConnectionCreator connectionsPool,
+      WorkEventBus workBus,
+      PersistentStoreProvider provider,
+      PersistentStoreProvider profileStoreProvider) {
     this.classpathScan = context.getClasspathScan();
     this.workBus = workBus;
     this.controller = checkNotNull(controller);
@@ -97,6 +110,13 @@ public class DrillbitContext implements AutoCloseable {
 
     // This operator table is built once and used for all queries which do not need dynamic
UDF support.
     this.table = new DrillOperatorTable(functionRegistry, systemOptions);
+
+    //This profile store context is built from the profileStoreProvider
+    this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider,
coord);
+  }
+
+  public QueryProfileStoreContext getProfileStoreContext() {
+    return profileStoreContext;
   }
 
   public FunctionImplementationRegistry getFunctionImplementationRegistry() {

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
new file mode 100644
index 0000000..7f282d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/QueryProfileStoreContext.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig.StoreConfigBuilder;
+
+public class QueryProfileStoreContext {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryProfileStoreContext.class);
+
+  private static final String PROFILES = "profiles";
+
+  private static final String RUNNING = "running";
+
+  private final PersistentStore<UserBitShared.QueryProfile> completedProfiles;
+
+  private final TransientStore<UserBitShared.QueryInfo> runningProfiles;
+
+  private final PersistentStoreConfig<QueryProfile> profileStoreConfig;
+
+  public QueryProfileStoreContext(DrillConfig config, PersistentStoreProvider storeProvider,
+                                  ClusterCoordinator coordinator) {
+    profileStoreConfig = PersistentStoreConfig.newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE,
+        SchemaUserBitShared.QueryProfile.MERGE)
+        .name(PROFILES)
+        .blob()
+        .build();
+
+    try {
+      completedProfiles = storeProvider.getOrCreateStore(profileStoreConfig);
+    } catch (final Exception e) {
+      throw new DrillRuntimeException(e);
+    }
+
+    runningProfiles = coordinator.getOrCreateTransientStore(TransientStoreConfig
+        .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+        .name(RUNNING)
+        .build());
+  }
+
+  public PersistentStoreConfig<QueryProfile> getProfileStoreConfig() {
+    return profileStoreConfig;
+  }
+
+  public PersistentStore<QueryProfile> getCompletedProfileStore() {
+    return completedProfiles;
+  }
+
+  public TransientStore<QueryInfo> getRunningProfileStore() {
+    return runningProfiles;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 044b792..468ec56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
+import org.apache.drill.exec.server.QueryProfileStoreContext;
 import org.apache.drill.exec.server.rest.ViewableWithPermissions;
 import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
 import org.apache.drill.exec.store.sys.PersistentStore;
@@ -180,8 +181,9 @@ public class ProfileResources {
   @Produces(MediaType.APPLICATION_JSON)
   public QProfiles getProfilesJSON(@Context UriInfo uriInfo) {
     try {
-      final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
-      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+      final QueryProfileStoreContext profileStoreContext = work.getContext().getProfileStoreContext();
+      final PersistentStore<QueryProfile> completed = profileStoreContext.getCompletedProfileStore();
+      final TransientStore<QueryInfo> running = profileStoreContext.getRunningProfileStore();
 
       final List<String> errors = Lists.newArrayList();
 
@@ -258,7 +260,7 @@ public class ProfileResources {
 
     // then check remote running
     try {
-      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+      final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
       final QueryInfo info = running.get(queryId);
       if (info != null) {
         QueryProfile queryProfile = work.getContext()
@@ -275,7 +277,7 @@ public class ProfileResources {
 
     // then check blob store
     try {
-      final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+      final PersistentStore<QueryProfile> profiles = work.getContext().getProfileStoreContext().getCompletedProfileStore();
       final QueryProfile queryProfile = profiles.get(queryId);
       if (queryProfile != null) {
         checkOrThrowProfileViewAuthorization(queryProfile);
@@ -296,7 +298,7 @@ public class ProfileResources {
   @Produces(MediaType.APPLICATION_JSON)
   public String getProfileJSON(@PathParam("queryid") String queryId) {
     try {
-      return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
+      return new String(work.getContext().getProfileStoreContext().getProfileStoreConfig().getSerializer().serialize(getQueryProfile(queryId)));
     } catch (Exception e) {
       logger.debug("Failed to serialize profile for: " + queryId);
       return ("{ 'message' : 'error (unable to serialize profile)' }");
@@ -329,7 +331,7 @@ public class ProfileResources {
 
     // then check remote running
     try {
-      final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+      final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
       final QueryInfo info = running.get(queryId);
       checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
       Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2,
TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
index 00a75a2..3b5e7ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java
@@ -38,11 +38,17 @@ public class PersistentStoreConfig<V> {
   private final String name;
   private final InstanceSerializer<V> valueSerializer;
   private final PersistentStoreMode mode;
+  private final int capacity;
 
-  protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer,
PersistentStoreMode mode) {
+  protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer,
PersistentStoreMode mode, int capacity) {
     this.name = name;
     this.valueSerializer = valueSerializer;
     this.mode = mode;
+    this.capacity = capacity;
+  }
+
+  public int getCapacity() {
+    return capacity;
   }
 
   public PersistentStoreMode getMode() {
@@ -85,6 +91,7 @@ public class PersistentStoreConfig<V> {
     private String name;
     private InstanceSerializer<V> serializer;
     private PersistentStoreMode mode = PersistentStoreMode.PERSISTENT;
+    private int capacity;
 
     protected StoreConfigBuilder(InstanceSerializer<V> serializer) {
       super();
@@ -106,9 +113,14 @@ public class PersistentStoreConfig<V> {
       return this;
     }
 
+    public StoreConfigBuilder<V> setCapacity(int capacity) {
+      this.capacity = capacity;
+      return this;
+    }
+
     public PersistentStoreConfig<V> build(){
       Preconditions.checkNotNull(name);
-      return new PersistentStoreConfig<>(name, serializer, mode);
+      return new PersistentStoreConfig<>(name, serializer, mode, capacity);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
new file mode 100644
index 0000000..10da92d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/InMemoryStore.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.drill.common.concurrent.AutoCloseableLock;
+import org.apache.drill.exec.exception.VersionMismatchException;
+import org.apache.drill.exec.store.sys.BasePersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+
+import com.google.common.collect.Iterables;
+
+public class InMemoryStore<V> extends BasePersistentStore<V> {
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InMemoryPersistentStore.class);
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
+  private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
+  private final ConcurrentSkipListMap<String, V> store;
+  private int version = -1;
+  private final int capacity;
+  private final AtomicInteger currentSize = new AtomicInteger();
+
+  public InMemoryStore(int capacity) {
+    this.capacity = capacity;
+    //Allows us to trim out the oldest elements to maintain finite max size
+    this.store = new ConcurrentSkipListMap<String, V>();
+  }
+
+  @Override
+  public void delete(final String key) {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      store.remove(key);
+      version++;
+    }
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return PersistentStoreMode.BLOB_PERSISTENT;
+  }
+
+  @Override
+  public boolean contains(final String key) {
+    return contains(key, null);
+  }
+
+  @Override
+  public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      if (dataChangeVersion != null) {
+        dataChangeVersion.setVersion(version);
+      }
+      return store.containsKey(key);
+    }
+  }
+
+  @Override
+  public V get(final String key) {
+    return get(key, null);
+  }
+
+  @Override
+  public V get(final String key, final DataChangeVersion dataChangeVersion) {
+    try (AutoCloseableLock lock = readLock.open()) {
+      if (dataChangeVersion != null) {
+        dataChangeVersion.setVersion(version);
+      }
+      return store.get(key);
+    }
+  }
+
+  @Override
+  public void put(final String key, final V value) {
+    put(key, value, null);
+  }
+
+  @Override
+  public void put(final String key, final V value, final DataChangeVersion dataChangeVersion)
{
+    try (AutoCloseableLock lock = writeLock.open()) {
+      if (dataChangeVersion != null && dataChangeVersion.getVersion() != version)
{
+        throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
+      }
+      store.put(key, value);
+      if (currentSize.incrementAndGet() > capacity) {
+        //Pop Out Oldest
+        store.pollLastEntry();
+        currentSize.decrementAndGet();
+      }
+
+      version++;
+    }
+  }
+
+  @Override
+  public boolean putIfAbsent(final String key, final V value) {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      final V old = store.putIfAbsent(key, value);
+      if (old == null) {
+        version++;
+        return true;
+      }
+      return false;
+    }
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take)
{
+    try (AutoCloseableLock lock = readLock.open()) {
+      return Iterables.limit(Iterables.skip(store.entrySet(), skip), take).iterator();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try (AutoCloseableLock lock = writeLock.open()) {
+      store.clear();
+      version = -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
new file mode 100644
index 0000000..ffe7b18
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store.provider;
+
+import org.apache.drill.exec.exception.StoreException;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.InMemoryStore;
+
+public class InMemoryStoreProvider implements PersistentStoreProvider {
+
+  private int capacity;
+
+  public InMemoryStoreProvider(int capacity) {
+    this.capacity = capacity;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V>
config) throws StoreException {
+    return new InMemoryStore<>(capacity);
+  }
+
+  @Override
+  public void start() throws Exception {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index c352861..2d37b8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -102,8 +102,9 @@ public class WorkManager implements AutoCloseable {
       final Controller controller,
       final DataConnectionCreator data,
       final ClusterCoordinator coord,
-      final PersistentStoreProvider provider) {
-    dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus,
provider);
+      final PersistentStoreProvider provider,
+      final PersistentStoreProvider profilesProvider) {
+    dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus,
provider, profilesProvider);
     statusThread.start();
 
     DrillMetrics.register("drill.fragments.running",

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 77c20a5..ecbccf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -31,7 +31,6 @@ import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.store.TransientStore;
-import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -52,7 +51,6 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.store.sys.PersistentStore;
-import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
 
@@ -68,17 +66,6 @@ import com.google.common.collect.Maps;
 public class QueryManager implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
 
-  public static final PersistentStoreConfig<QueryProfile> QUERY_PROFILE = PersistentStoreConfig.
-          newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
-      .name("profiles")
-      .blob()
-      .build();
-
-  public static final TransientStoreConfig<QueryInfo> RUNNING_QUERY_INFO = TransientStoreConfig
-      .newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
-      .name("running")
-      .build();
-
   private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
   private final QueryId queryId;
   private final String stringQueryId;
@@ -93,8 +80,8 @@ public class QueryManager implements AutoCloseable {
       new IntObjectHashMap<>();
   private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
 
-  private final PersistentStore<QueryProfile> profileStore;
-  private final TransientStore<QueryInfo> transientProfiles;
+  private final PersistentStore<QueryProfile> completedProfileStore;
+  private final TransientStore<QueryInfo> runningProfileStore;
 
   // the following mutable variables are used to capture ongoing query status
   private String planText;
@@ -119,12 +106,9 @@ public class QueryManager implements AutoCloseable {
     this.foreman = foreman;
 
     stringQueryId = QueryIdHelper.getQueryId(queryId);
-    try {
-      profileStore = storeProvider.getOrCreateStore(QUERY_PROFILE);
-    } catch (final Exception e) {
-      throw new DrillRuntimeException(e);
-    }
-    transientProfiles = coordinator.getOrCreateTransientStore(RUNNING_QUERY_INFO);
+
+    this.completedProfileStore = foreman.getQueryContext().getProfileStoreContext().getCompletedProfileStore();
+    this.runningProfileStore = foreman.getQueryContext().getProfileStoreContext().getRunningProfileStore();
   }
 
   private static boolean isTerminal(final FragmentState state) {
@@ -298,7 +282,7 @@ public class QueryManager implements AutoCloseable {
       case STARTING:
       case RUNNING:
       case CANCELLATION_REQUESTED:
-        transientProfiles.put(stringQueryId, getQueryInfo());  // store as ephemeral query
profile.
+        runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query
profile.
         inTransientStore = true;
         break;
 
@@ -306,7 +290,7 @@ public class QueryManager implements AutoCloseable {
       case CANCELED:
       case FAILED:
         try {
-          transientProfiles.remove(stringQueryId);
+          runningProfileStore.remove(stringQueryId);
           inTransientStore = false;
         } catch(final Exception e) {
           logger.warn("Failure while trying to delete the estore profile for this query.",
e);
@@ -321,7 +305,7 @@ public class QueryManager implements AutoCloseable {
   void writeFinalProfile(UserException ex) {
     try {
       // TODO(DRILL-2362) when do these ever get deleted?
-      profileStore.put(stringQueryId, getQueryProfile(ex));
+      completedProfileStore.put(stringQueryId, getQueryProfile(ex));
     } catch (Exception e) {
       logger.error("Failure while storing Query Profile", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/9ba4af86/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7c095ac..5ba4526 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -141,6 +141,10 @@ drill.exec: {
       write: true
     }
   },
+  profiles.store: {
+    inmemory: false,
+    capacity: 1000
+  },
   impersonation: {
     enabled: false,
     max_chained_user_hops: 3


Mime
View raw message