From commits-return-1235-apmail-drill-commits-archive=drill.apache.org@drill.apache.org Fri Jan 9 21:06:28 2015 Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6431017223 for ; Fri, 9 Jan 2015 21:06:28 +0000 (UTC) Received: (qmail 47889 invoked by uid 500); 9 Jan 2015 21:06:29 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 47861 invoked by uid 500); 9 Jan 2015 21:06:29 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 47852 invoked by uid 99); 9 Jan 2015 21:06:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jan 2015 21:06:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 460448173A5; Fri, 9 Jan 2015 21:06:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Message-Id: <16f6de8f9a514cb4a42f99917ee4bb18@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality. Date: Fri, 9 Jan 2015 21:06:29 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 1552c96f4 -> 7638dbb82 DRILL-1947: Cache PStore/EStore instances rather than recreating on each need. As part of this, make sure that PStoreConfig doesn't use identity equality. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7638dbb8 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7638dbb8 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7638dbb8 Branch: refs/heads/master Commit: 7638dbb82606c9644d6ad02210fbfd5d8f6ae090 Parents: 1552c96 Author: Jacques Nadeau Authored: Tue Jan 6 21:52:40 2015 -0800 Committer: Jacques Nadeau Committed: Fri Jan 9 08:15:53 2015 -0800 ---------------------------------------------------------------------- .../exec/store/hbase/config/HBasePStore.java | 4 ++ .../exec/store/mongo/config/MongoPStore.java | 4 ++ .../org/apache/drill/exec/server/Drillbit.java | 9 ++- .../exec/store/sys/CachingStoreProvider.java | 70 ++++++++++++++++++++ .../drill/exec/store/sys/EStoreProvider.java | 4 +- .../org/apache/drill/exec/store/sys/PStore.java | 2 + .../drill/exec/store/sys/PStoreConfig.java | 47 +++++++++++++ .../drill/exec/store/sys/PStoreProvider.java | 4 +- .../drill/exec/store/sys/PStoreRegistry.java | 2 +- .../drill/exec/store/sys/local/FilePStore.java | 4 ++ .../store/sys/local/LocalEStoreProvider.java | 23 +++---- .../drill/exec/store/sys/local/MapEStore.java | 6 +- .../store/sys/local/NoWriteLocalPStore.java | 4 ++ .../store/sys/serialize/JacksonSerializer.java | 40 +++++++++++ .../store/sys/serialize/ProtoSerializer.java | 38 +++++++++++ .../exec/store/sys/zk/ZkAbstractStore.java | 17 ++++- .../exec/store/sys/zk/ZkEStoreProvider.java | 8 +++ .../exec/store/sys/zk/ZkPStoreProvider.java | 14 ++-- 18 files changed, 265 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java index 594b9f8..17ddcb1 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePStore.java @@ -229,4 +229,8 @@ public class HBasePStore implements PStore { } + @Override + public void close() { + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java index fc5c05b..ea3a5a2 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPStore.java @@ -177,4 +177,8 @@ public class MongoPStore implements PStore, DrillMongoConstants { throw new UnsupportedOperationException(); } } + + @Override + public void close() { + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 4b9b20d..67342c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.rest.DrillRestServer; import org.apache.drill.exec.service.ServiceEngine; +import org.apache.drill.exec.store.sys.CachingStoreProvider; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.store.sys.PStoreRegistry; import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; @@ -106,7 +107,7 @@ public class Drillbit implements Closeable{ if(serviceSet != null) { this.coord = serviceSet.getCoordinator(); - this.storeProvider = new LocalPStoreProvider(config); + this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config)); } else { Runtime.getRuntime().addShutdownHook(new ShutdownThread(config)); this.coord = new ZKClusterCoordinator(config); @@ -175,7 +176,11 @@ public class Drillbit implements Closeable{ logger.warn("Failure while shutting down embedded jetty server."); } Closeables.closeQuietly(engine); - Closeables.closeQuietly(storeProvider); + try{ + storeProvider.close(); + }catch(Exception e){ + logger.warn("Failure while closing store provider.", e); + } Closeables.closeQuietly(coord); Closeables.closeQuietly(manager); Closeables.closeQuietly(context); http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java new file mode 100644 index 0000000..68440cb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import org.apache.drill.exec.store.sys.PStoreConfig.Mode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class CachingStoreProvider implements PStoreProvider, AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class); + + private final ConcurrentMap, PStore> storeCache = Maps.newConcurrentMap(); + private final PStoreProvider provider; + + public CachingStoreProvider(PStoreProvider provider) { + super(); + this.provider = provider; + } + + @SuppressWarnings("unchecked") + public PStore getStore(PStoreConfig config) throws IOException { + PStore s = storeCache.get(config); + if(s == null){ + PStore newStore = provider.getStore(config); + s = storeCache.putIfAbsent(config, newStore); + if(s == null){ + s = newStore; + }else{ + newStore.close(); + } + } + + return (PStore) s; + + } + + @Override + public void start() throws IOException { + provider.start(); + } + + @Override + public void close() throws Exception { + for(PStore store : storeCache.values()){ + store.close(); + } + storeCache.clear(); + provider.close(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java index b09c5b4..4c79a28 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java @@ -18,12 +18,10 @@ package org.apache.drill.exec.store.sys; -import java.io.IOException; /** * Interface to define the provider which return EStore. */ -public interface EStoreProvider { - public PStore getStore(PStoreConfig table) throws IOException; +public interface EStoreProvider extends PStoreProvider { } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java index 26c00ea..b629645 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys; import java.util.Map; + /** * Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order. * @param @@ -28,4 +29,5 @@ public interface PStore extends Iterable> { public void put(String key, V value); public boolean putIfAbsent(String key, V value); public void delete(String key); + public void close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java index 83c2243..bd9d977 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java @@ -116,4 +116,51 @@ public class PStoreConfig { } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + maxIteratorSize; + result = prime * result + ((mode == null) ? 0 : mode.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + PStoreConfig other = (PStoreConfig) obj; + if (maxIteratorSize != other.maxIteratorSize) { + return false; + } + if (mode != other.mode) { + return false; + } + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (valueSerializer == null) { + if (other.valueSerializer != null) { + return false; + } + } else if (!valueSerializer.equals(other.valueSerializer)) { + return false; + } + return true; + } + + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java index 6371dfa..efa223e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java @@ -17,11 +17,9 @@ */ package org.apache.drill.exec.store.sys; -import java.io.Closeable; import java.io.IOException; -public interface PStoreProvider extends AutoCloseable, Closeable{ - +public interface PStoreProvider extends AutoCloseable { public PStore getStore(PStoreConfig config) throws IOException; public void start() throws IOException; } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java index 580d20a..532e6be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java @@ -53,7 +53,7 @@ public class PStoreRegistry { logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName); Class storeProviderClass = (Class) Class.forName(storeProviderClassName); Constructor c = storeProviderClass.getConstructor(PStoreRegistry.class); - return c.newInstance(this); + return new CachingStoreProvider(c.newInstance(this)); } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { logger.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java index 416a21a..40f25e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java @@ -228,4 +228,8 @@ public class FilePStore implements PStore { } } + @Override + public void close() { + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java index 094d093..e7c2f94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.sys.local; import java.io.IOException; -import java.util.concurrent.ConcurrentMap; import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.EStoreProvider; @@ -27,24 +26,22 @@ import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreConfig.Mode; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; public class LocalEStoreProvider implements EStoreProvider{ - private ConcurrentMap, EStore> estores = Maps.newConcurrentMap(); @Override public EStore getStore(PStoreConfig storeConfig) throws IOException { Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral."); - if (! (estores.containsKey(storeConfig)) ) { - EStore p = new MapEStore(); - EStore p2 = estores.putIfAbsent(storeConfig, p); - if(p2 != null) { - return (EStore) p2; - } - return p; - } else { - return (EStore) estores.get(storeConfig); - } + return new MapEStore(); } + + @Override + public void start() throws IOException { + } + + @Override + public void close() { + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java index 2723916..96e51e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; * Implementation of EStore using ConcurrentHashMap. * @param */ -public class MapEStore implements EStore { +public class MapEStore implements EStore { ConcurrentHashMap store = new ConcurrentHashMap<>(); @Override @@ -57,4 +57,8 @@ public class MapEStore implements EStore { V out = store.putIfAbsent(key, value); return out == null; } + + @Override + public void close() { + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java index 71a41f0..c675618 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java @@ -62,4 +62,8 @@ public class NoWriteLocalPStore implements PStore{ blobMap.remove(key); } + @Override + public void close() { + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java index b8a5cdd..53452f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java @@ -43,4 +43,44 @@ public class JacksonSerializer implements PClassSerializer { public X deserialize(byte[] bytes) throws IOException { return reader.readValue(bytes); } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((reader == null) ? 0 : reader.hashCode()); + result = prime * result + ((writer == null) ? 0 : writer.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + JacksonSerializer other = (JacksonSerializer) obj; + if (reader == null) { + if (other.reader != null) { + return false; + } + } else if (!reader.equals(other.reader)) { + return false; + } + if (writer == null) { + if (other.writer != null) { + return false; + } + } else if (!writer.equals(other.writer)) { + return false; + } + return true; + } + + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java index 1ea714e..52df7a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java @@ -51,5 +51,43 @@ public class ProtoSerializer implements PClassSeri return (X) b.build(); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode()); + result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ProtoSerializer other = (ProtoSerializer) obj; + if (readSchema == null) { + if (other.readSchema != null) { + return false; + } + } else if (!readSchema.equals(other.readSchema)) { + return false; + } + if (writeSchema == null) { + if (other.writeSchema != null) { + return false; + } + } else if (!writeSchema.equals(other.writeSchema)) { + return false; + } + return true; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java index d61f3b4..01059a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java @@ -27,9 +27,8 @@ import java.util.Map.Entry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; +import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.zookeeper.CreateMode; @@ -40,7 +39,8 @@ import com.google.common.collect.Lists; * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore (Ephemeral Store) * @param */ -public abstract class ZkAbstractStore { +public abstract class ZkAbstractStore implements AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class); protected CuratorFramework framework; protected PStoreConfig config; @@ -206,4 +206,15 @@ public abstract class ZkAbstractStore { } } + + @Override + public void close() { + try{ + childrenCache.close(); + }catch(IOException e){ + logger.warn("Failure while closing out abstract store.", e); + } + } + + } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java index 1c2c3fd..7d7d475 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java @@ -40,4 +40,12 @@ public class ZkEStoreProvider implements EStoreProvider{ Preconditions.checkArgument(store.getMode() == Mode.EPHEMERAL); return new ZkEStore(curator,store); } + + @Override + public void start() throws IOException { + } + + @Override + public void close() throws Exception { + } } http://git-wip-us.apache.org/repos/asf/drill/blob/7638dbb8/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index 03d2441..f8fa2bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.sys.zk; -import java.io.File; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; @@ -26,7 +25,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; -import org.apache.drill.exec.store.sys.EStore; +import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; @@ -45,7 +44,7 @@ public class ZkPStoreProvider implements PStoreProvider { private final DrillFileSystem fs; private final Path blobRoot; - private final ZkEStoreProvider zkEStoreProvider; + private final EStoreProvider zkEStoreProvider; public ZkPStoreProvider(PStoreRegistry registry) throws DrillbitStartupException { ClusterCoordinator coord = registry.getClusterCoordinator(); @@ -66,7 +65,6 @@ public class ZkPStoreProvider implements PStoreProvider { throw new DrillbitStartupException("Failure while attempting to set up blob store.", e); } - this.zkEStoreProvider = new ZkEStoreProvider(curator); } @@ -79,11 +77,6 @@ public class ZkPStoreProvider implements PStoreProvider { } @Override - public void close() { - } - - - @Override public PStore getStore(PStoreConfig config) throws IOException { switch(config.getMode()){ case BLOB_PERSISTENT: @@ -101,4 +94,7 @@ public class ZkPStoreProvider implements PStoreProvider { public void start() { } + @Override + public void close() { + } }