http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
new file mode 100644
index 0000000..a019a77
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+
+public class TransientStoreConfigBuilder<T> {
+ private String name;
+ private InstanceSerializer<T> serializer;
+
+ protected TransientStoreConfigBuilder() { }
+
+ public String name() {
+ return name;
+ }
+
+ public TransientStoreConfigBuilder<T> name(final String name) {
+ this.name = Preconditions.checkNotNull(name);
+ return this;
+ }
+
+ public InstanceSerializer<T> serializer() {
+ return serializer;
+ }
+
+ public TransientStoreConfigBuilder<T> serializer(final InstanceSerializer<T> serializer) {
+ this.serializer = Preconditions.checkNotNull(serializer);
+ return this;
+ }
+
+ public TransientStoreConfig<T> build() {
+ return new TransientStoreConfig<>(name, serializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
new file mode 100644
index 0000000..a0b5725
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents an event created as a result of an operation over a particular (key, value) entry in a
+ * {@link TransientStore store} instance.
+ *
+ * Types of operations are enumerated in {@link TransientStoreEventType}
+ *
+ * @param <V> value type
+ */
+public class TransientStoreEvent<V> {
+ private final TransientStoreEventType type;
+ private final String key;
+ private final V value;
+
+ public TransientStoreEvent(final TransientStoreEventType type, final String key, final V value) {
+ this.type = Preconditions.checkNotNull(type);
+ this.key = Preconditions.checkNotNull(key);
+ this.value = Preconditions.checkNotNull(value);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public TransientStoreEventType getType() {
+ return type;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof TransientStoreEvent && obj.getClass().equals(getClass())) {
+ final TransientStoreEvent<V> other = (TransientStoreEvent<V>)obj;
+ return Objects.equal(type, other.type) && Objects.equal(key, other.key) && Objects.equal(value, other.value);
+ }
+ return super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(type, key, value);
+ }
+
+ public static <T> TransientStoreEvent<T>of(final TransientStoreEventType type, final String key, final T value) {
+ return new TransientStoreEvent<>(type, key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
new file mode 100644
index 0000000..51ae2c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+/**
+ * Types of store events.
+ */
+public enum TransientStoreEventType {
+ CREATE,
+ UPDATE,
+ DELETE
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
new file mode 100644
index 0000000..c3d351d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+/**
+ * Factory that is used to obtain a {@link TransientStore store} instance.
+ */
+public interface TransientStoreFactory extends AutoCloseable {
+
+ /**
+ * Returns a {@link TransientStore transient store} instance for the given configuration.
+ *
+ * Note that implementors have liberty to cache previous {@link PersistentStore store} instances.
+ *
+ * @param config store configuration
+ * @param <V> store value type
+ */
+ <V> TransientStore<V> getOrCreateStore(TransientStoreConfig<V> config);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
new file mode 100644
index 0000000..ca8fa9d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.store;
+
+/**
+ * A listener used for observing {@link TransientStore transient store} {@link TransientStoreEvent events}.
+ */
+public interface TransientStoreListener {
+
+ /**
+ * {@link TransientStore transient store} fires this method with event details upon an observed change.
+ *
+ * @param event event details
+ */
+ void onChange(TransientStoreEvent event);
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
new file mode 100644
index 0000000..580cfcd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.coord.store.TransientStoreEventType;
+
+/**
+ * An abstraction used for dispatching store {@link TransientStoreEvent events}.
+ *
+ * @param <V> value type
+ */
+public class EventDispatcher<V> implements PathChildrenCacheListener {
+ public final static Map<PathChildrenCacheEvent.Type, TransientStoreEventType> MAPPINGS = ImmutableMap
+ .<PathChildrenCacheEvent.Type, TransientStoreEventType>builder()
+ .put(PathChildrenCacheEvent.Type.CHILD_ADDED, TransientStoreEventType.CREATE)
+ .put(PathChildrenCacheEvent.Type.CHILD_REMOVED, TransientStoreEventType.DELETE)
+ .put(PathChildrenCacheEvent.Type.CHILD_UPDATED, TransientStoreEventType.UPDATE)
+ .build();
+
+ private final ZkEphemeralStore<V> store;
+
+ protected EventDispatcher(final ZkEphemeralStore<V> store) {
+ this.store = Preconditions.checkNotNull(store, "store is required");
+ }
+
+ @Override
+ public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) throws Exception {
+ final PathChildrenCacheEvent.Type original = event.getType();
+ final TransientStoreEventType mapped = MAPPINGS.get(original);
+ if (mapped != null) { // dispatch the event to listeners only if it can be mapped
+ final String path = event.getData().getPath();
+ final byte[] bytes = event.getData().getData();
+ final V value = store.getConfig().getSerializer().deserialize(bytes);
+ store.fireListeners(TransientStoreEvent.of(mapped, path, value));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
new file mode 100644
index 0000000..f01b989
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.parquet.Strings;
+
+/**
+ * A convenience class used to expedite zookeeper paths manipulations.
+ */
+public final class PathUtils {
+
+ /**
+ * Returns a normalized, combined path out of the given path segments.
+ *
+ * @param parts path segments to combine
+ * @see #normalize(String)
+ */
+ public static final String join(final String... parts) {
+ final StringBuilder sb = new StringBuilder();
+ for (final String part:parts) {
+ Preconditions.checkNotNull(part, "parts cannot contain null");
+ if (!Strings.isNullOrEmpty(part)) {
+ sb.append(part).append("/");
+ }
+ }
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ final String path = sb.toString();
+ return normalize(path);
+ }
+
+ /**
+ * Normalizes the given path eliminating repeated forward slashes.
+ *
+ * @return normalized path
+ */
+ public static final String normalize(final String path) {
+ if (Strings.isNullOrEmpty(Preconditions.checkNotNull(path))) {
+ return path;
+ }
+
+ final StringBuilder builder = new StringBuilder();
+ char last = path.charAt(0);
+ builder.append(last);
+ for (int i=1; i<path.length(); i++) {
+ char cur = path.charAt(i);
+ if (last == '/' && cur == last) {
+ continue;
+ }
+ builder.append(cur);
+ last = cur;
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index b831852..4926f9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -24,13 +24,14 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -42,15 +43,19 @@ import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
+import org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Function;
-import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
/**
* Manages cluster coordination utilizing zookeeper. *
@@ -60,15 +65,14 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private CuratorFramework curator;
private ServiceDiscovery<DrillbitEndpoint> discovery;
- private ServiceCache<DrillbitEndpoint> serviceCache;
private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
private final String serviceName;
private final CountDownLatch initialConnection = new CountDownLatch(1);
+ private final TransientStoreFactory factory;
+ private ServiceCache<DrillbitEndpoint> serviceCache;
private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
-
-
public ZKClusterCoordinator(DrillConfig config) throws IOException{
this(config, null);
}
@@ -100,11 +104,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
.build();
curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
curator.start();
- discovery = getDiscovery();
- serviceCache = discovery.
- serviceCacheBuilder()
- .name(serviceName)
- .build();
+ discovery = newDiscovery();
+ factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator));
}
public CuratorFramework getCurator() {
@@ -115,8 +116,6 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
public void start(long millisToWait) throws Exception {
logger.debug("Starting ZKClusterCoordination.");
discovery.start();
- serviceCache.start();
- serviceCache.addListener(new ZKListener());
if(millisToWait != 0) {
boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS);
@@ -127,6 +126,12 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
this.initialConnection.await();
}
+ serviceCache = discovery
+ .serviceCacheBuilder()
+ .name(serviceName)
+ .build();
+ serviceCache.addListener(new EndpointListener());
+ serviceCache.start();
updateEndpoints();
}
@@ -142,29 +147,28 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
}
- private class ZKListener implements ServiceCacheListener {
-
+ private class EndpointListener implements ServiceCacheListener {
@Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- }
+ public void stateChanged(CuratorFramework client, ConnectionState newState) { }
@Override
public void cacheChanged() {
- logger.debug("Cache changed, updating.");
+ logger.debug("Got cache changed --> updating endpoints");
updateEndpoints();
}
}
- public void close() throws IOException {
- serviceCache.close();
- discovery.close();
- curator.close();
+ public void close() throws Exception {
+ // discovery attempts to close its caches(ie serviceCache) already. however, being good citizens we make sure to
+ // explicitly close serviceCache. Not only that we make sure to close serviceCache before discovery to prevent
+ // double releasing and disallowing jvm to spit bothering warnings. simply put, we are great!
+ AutoCloseables.close(serviceCache, discovery, curator, factory);
}
@Override
public RegistrationHandle register(DrillbitEndpoint data) {
try {
- ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data);
+ ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data);
discovery.registerService(serviceInstance);
return new ZKRegistrationHandle(serviceInstance.getId());
} catch (Exception e) {
@@ -206,6 +210,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases);
}
+ @Override
+ public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfig<V> config) {
+ final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>)factory.getOrCreateStore(config);
+ return store;
+ }
private synchronized void updateEndpoints() {
try {
@@ -253,7 +262,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
}
}
- private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception {
+ protected ServiceInstance<DrillbitEndpoint> newServiceInstance(DrillbitEndpoint endpoint) throws Exception {
return ServiceInstance.<DrillbitEndpoint>builder()
.name(serviceName)
.payload(endpoint)
@@ -261,7 +270,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
}
- public ServiceDiscovery<DrillbitEndpoint> getDiscovery() {
+ protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() {
return ServiceDiscoveryBuilder
.builder(DrillbitEndpoint.class)
.basePath("/")
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
new file mode 100644
index 0000000..94e03ad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.store.BaseTransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreEvent;
+import org.apache.drill.exec.serialization.InstanceSerializer;
+import org.apache.zookeeper.CreateMode;
+
+public class ZkEphemeralStore<V> extends BaseTransientStore<V> {
+
+ @VisibleForTesting
+ protected final PathChildrenCacheListener dispatcher = new EventDispatcher<>(this);
+ private final ZookeeperClient client;
+
+ public ZkEphemeralStore(final TransientStoreConfig<V> config, final CuratorFramework curator) {
+ super(config);
+ this.client = new ZookeeperClient(curator, PathUtils.join("/", config.getName()), CreateMode.EPHEMERAL);
+ }
+
+ public void start() throws Exception {
+ getClient().getCache().getListenable().addListener(dispatcher);
+ getClient().start();
+ }
+
+ protected ZookeeperClient getClient() {
+ return client;
+ }
+
+ @Override
+ public V get(final String key) {
+ final byte[] bytes = getClient().get(key);
+ if (bytes == null) {
+ return null;
+ }
+ try {
+ return config.getSerializer().deserialize(bytes);
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e);
+ }
+ }
+
+ @Override
+ public V put(final String key, final V value) {
+ final InstanceSerializer<V> serializer = config.getSerializer();
+ try {
+ final byte[] old = getClient().get(key);
+ final byte[] bytes = serializer.serialize(value);
+ getClient().put(key, bytes);
+ if (old == null) {
+ return null;
+ }
+ return serializer.deserialize(old);
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e);
+ }
+ }
+
+ @Override
+ public V putIfAbsent(final String key, final V value) {
+ final V old = get(key);
+ if (old == null) {
+ try {
+ final byte[] bytes = config.getSerializer().serialize(value);
+ getClient().put(key, bytes);
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e);
+ }
+ }
+ return old;
+ }
+
+ @Override
+ public V remove(final String key) {
+ final V existing = get(key);
+ if (existing != null) {
+ getClient().delete(key);
+ }
+ return existing;
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> entries() {
+ return Iterators.transform(getClient().entries(), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() {
+ @Nullable
+ @Override
+ public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) {
+ try {
+ final V value = config.getSerializer().deserialize(input.getValue());
+ return new ImmutableEntry<>(input.getKey(), value);
+ } catch (final IOException e) {
+ throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public int size() {
+ return getClient().getCache().getCurrentData().size();
+ }
+
+ @Override
+ public void close() throws Exception {
+ getClient().close();
+ }
+
+ /**
+ * This method override ensures package level method visibility.
+ */
+ @Override
+ protected void fireListeners(TransientStoreEvent event) {
+ super.fireListeners(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
new file mode 100644
index 0000000..a58c376
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.coord.store.TransientStore;
+import org.apache.drill.exec.coord.store.TransientStoreConfig;
+import org.apache.drill.exec.coord.store.TransientStoreFactory;
+
+public class ZkTransientStoreFactory implements TransientStoreFactory {
+
+ private final CuratorFramework curator;
+
+ public ZkTransientStoreFactory(final CuratorFramework curator) {
+ this.curator = Preconditions.checkNotNull(curator, "curator is required");
+ }
+
+ @Override
+ public <V> ZkEphemeralStore<V> getOrCreateStore(TransientStoreConfig<V> config) {
+ final ZkEphemeralStore<V> store = new ZkEphemeralStore<>(config, curator);
+ try {
+ store.start();
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("unable to start zookeeper transient store", e);
+ }
+ return store;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
new file mode 100644
index 0000000..1c33f71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.coord.zk;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * A namespace aware Zookeeper client.
+ *
+ * The implementation only operates under the given namespace and is safe to use.
+ *
+ * Note that instance of this class holds onto resources that must be released via {@code #close()}.
+ */
+public class ZookeeperClient implements AutoCloseable {
+ private final CuratorFramework curator;
+ private final String root;
+ private final PathChildrenCache cache;
+ private final CreateMode mode;
+
+ public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) {
+ this.curator = Preconditions.checkNotNull(curator, "curator is required");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(root), "root path is required");
+ Preconditions.checkArgument(root.charAt(0) == '/', "root path must be absolute");
+ this.root = root;
+ this.mode = Preconditions.checkNotNull(mode, "mode is required");
+ this.cache = new PathChildrenCache(curator, root, true);
+ }
+
+ /**
+ * Starts the client. This call ensures the creation of the root path.
+ *
+ * @throws Exception if cache fails to start or root path creation fails.
+ * @see #close()
+ */
+ public void start() throws Exception {
+ curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created
+ getCache().start();
+ }
+
+ public PathChildrenCache getCache() {
+ return cache;
+ }
+
+ public String getRoot() {
+ return root;
+ }
+
+ public CreateMode getMode() {
+ return mode;
+ }
+
+ /**
+ * Returns true if path exists in the cache, false otherwise.
+ *
+ * Note that calls to this method are eventually consistent.
+ *
+ * @param path path to check
+ */
+ public boolean hasPath(final String path) {
+ return hasPath(path, false);
+ }
+
+ /**
+ * Checks if the given path exists.
+ *
+ * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+ * the check is eventually consistent.
+ *
+ * @param path path to check
+ * @param consistent whether the check should be consistent
+ * @return
+ */
+ public boolean hasPath(final String path, final boolean consistent) {
+ Preconditions.checkNotNull(path, "path is required");
+
+ final String target = PathUtils.join(root, path);
+ try {
+ if (consistent) {
+ return curator.checkExists().forPath(target) != null;
+ } else {
+ return getCache().getCurrentData(target) != null;
+ }
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("error while checking path on zookeeper", e);
+ }
+ }
+
+ /**
+ * Returns a value corresponding to the given path if path exists in the cache, null otherwise.
+ *
+ * Note that calls to this method are eventually consistent.
+ *
+ * @param path target path
+ */
+ public byte[] get(final String path) {
+ return get(path, false);
+ }
+
+ /**
+ * Returns the value corresponding to the given key, null otherwise.
+ *
+ * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
+ * the check is eventually consistent.
+ *
+ * @param path target path
+ */
+ public byte[] get(final String path, final boolean consistent) {
+ Preconditions.checkNotNull(path, "path is required");
+
+ final String target = PathUtils.join(root, path);
+ if (consistent) {
+ try {
+ return curator.getData().forPath(target);
+ } catch (final Exception ex) {
+ throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex);
+ }
+ } else {
+ final ChildData data = getCache().getCurrentData(target);
+ if (data != null) {
+ return data.getData();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates the given path without placing any data in.
+ *
+ * @param path target path
+ */
+ public void create(final String path) {
+ Preconditions.checkNotNull(path, "path is required");
+
+ final String target = PathUtils.join(root, path);
+ try {
+ curator.create().withMode(mode).forPath(target);
+ getCache().rebuildNode(target);
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("unable to put ", e);
+ }
+ }
+
+ /**
+ * Puts the given byte sequence into the given path.
+ *
+ * If path does not exists, this call creates it.
+ *
+ * @param path target path
+ * @param data data to store
+ */
+ public void put(final String path, final byte[] data) {
+ Preconditions.checkNotNull(path, "path is required");
+ Preconditions.checkNotNull(data, "data is required");
+
+ final String target = PathUtils.join(root, path);
+ try {
+ // we make a consistent read to ensure this call won't fail upon consecutive calls on the same path
+ // before cache is updated
+ if (hasPath(path, true)) {
+ curator.setData().forPath(target, data);
+ } else {
+ curator.create().withMode(mode).forPath(target, data);
+ }
+ getCache().rebuildNode(target);
+
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("unable to put ", e);
+ }
+ }
+
+ /**
+ * Deletes the given node residing at the given path
+ *
+ * @param path target path to delete
+ */
+ public void delete(final String path) {
+ Preconditions.checkNotNull(path, "path is required");
+
+ final String target = PathUtils.join(root, path);
+ try {
+ curator.delete().forPath(target);
+ getCache().rebuildNode(target);
+ } catch (final Exception e) {
+ throw new DrillRuntimeException(String.format("unable to delete node at %s", target), e);
+ }
+ }
+
+ /**
+ * Returns an iterator of (key, value) pairs residing under {@link #getRoot() root} path.
+ */
+ public Iterator<Map.Entry<String, byte[]>> entries() {
+ final String prefix = PathUtils.join(root, "/");
+ return Iterables.transform(getCache().getCurrentData(), new Function<ChildData, Map.Entry<String, byte[]>>() {
+ @Nullable
+ @Override
+ public Map.Entry<String, byte[]> apply(final ChildData data) {
+ // normalize key name removing the root prefix. resultant key must be a relative path, not beginning with a '/'.
+ final String key = data.getPath().replace(prefix, "");
+ return new ImmutableEntry<>(key, data.getData());
+ }
+ }).iterator();
+ }
+
+ @Override
+ public void close() throws Exception {
+ getCache().close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
new file mode 100644
index 0000000..506d485
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class StoreException extends DrillException {
+ public StoreException() {
+ super();
+ }
+
+ public StoreException(Throwable cause) {
+ super(cause);
+ }
+
+ public StoreException(String message) {
+ super(message);
+ }
+
+ public StoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StoreException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
new file mode 100644
index 0000000..f44d835
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.IOException;
+
+public interface InstanceSerializer<T> {
+ byte[] serialize(T instance) throws IOException;
+ T deserialize(byte[] raw) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
new file mode 100644
index 0000000..676929d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Objects;
+
+public class JacksonSerializer<T> implements InstanceSerializer<T> {
+ private final ObjectReader reader;
+ private final ObjectWriter writer;
+
+ public JacksonSerializer(final ObjectMapper mapper, final Class<T> klazz) {
+ this.reader = mapper.readerFor(klazz);
+ this.writer = mapper.writer();
+ }
+
+ @Override
+ public T deserialize(final byte[] raw) throws IOException {
+ return reader.readValue(raw);
+ }
+
+ @Override
+ public byte[] serialize(final T instance) throws IOException {
+ return writer.writeValueAsBytes(instance);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof JacksonSerializer && obj.getClass().equals(getClass())) {
+ final JacksonSerializer<T> other = (JacksonSerializer<T>)obj;
+ return Objects.equal(reader, other.reader) && Objects.equal(writer, other.writer);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(reader, writer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
new file mode 100644
index 0000000..e3ee5f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import com.dyuproject.protostuff.JsonIOUtil;
+import com.dyuproject.protostuff.Schema;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+
+public class ProtoSerializer<T, B extends Message.Builder> implements InstanceSerializer<T> {
+ private final Schema<B> readSchema;
+ private final Schema<T> writeSchema;
+
+ public ProtoSerializer(final Schema<B> readSchema, final Schema<T> writeSchema) {
+ this.readSchema = Preconditions.checkNotNull(readSchema);
+ this.writeSchema = Preconditions.checkNotNull(writeSchema);
+ }
+
+ @Override
+ public T deserialize(final byte[] raw) throws IOException {
+ final B builder = readSchema.newMessage();
+ JsonIOUtil.mergeFrom(raw, builder, readSchema, false);
+ return (T)builder.build();
+ }
+
+ @Override
+ public byte[] serialize(final T instance) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonIOUtil.writeTo(out, instance, writeSchema, false);
+ return out.toByteArray();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(readSchema, writeSchema);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof ProtoSerializer && obj.getClass().equals(getClass())) {
+ final ProtoSerializer<T, B> other = (ProtoSerializer<T, B>)obj;
+ return Objects.equal(readSchema, other.readSchema) && Objects.equal(writeSchema, other.writeSchema);
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c781493..441fa91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -37,10 +37,10 @@ import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.CachingStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.store.sys.PStoreRegistry;
-import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.apache.zookeeper.Environment;
@@ -63,7 +63,7 @@ public class Drillbit implements AutoCloseable {
private final ClusterCoordinator coord;
private final ServiceEngine engine;
- private final PStoreProvider storeProvider;
+ private final PersistentStoreProvider storeProvider;
private final WorkManager manager;
private final BootStrapContext context;
private final WebServer webServer;
@@ -93,10 +93,10 @@ public class Drillbit implements AutoCloseable {
if (serviceSet != null) {
coord = serviceSet.getCoordinator();
- storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
+ storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
} else {
coord = new ZKClusterCoordinator(config);
- storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider();
+ storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
}
logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index aa6a0da..1af6d11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -39,11 +39,11 @@ import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import com.codahale.metrics.MetricRegistry;
-public class DrillbitContext {
+public class DrillbitContext implements AutoCloseable {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
private final BootStrapContext context;
@@ -57,7 +57,7 @@ public class DrillbitContext {
private final WorkEventBus workBus;
private final FunctionImplementationRegistry functionRegistry;
private final SystemOptionManager systemOptions;
- private final PStoreProvider provider;
+ private final PersistentStoreProvider provider;
private final CodeCompiler compiler;
private final ScanResult classpathScan;
private final LogicalPlanPersistence lpPersistence;
@@ -70,7 +70,7 @@ public class DrillbitContext {
Controller controller,
DataConnectionCreator connectionsPool,
WorkEventBus workBus,
- PStoreProvider provider) {
+ PersistentStoreProvider provider) {
this.classpathScan = context.getClasspathScan();
this.workBus = workBus;
this.controller = checkNotNull(controller);
@@ -152,7 +152,7 @@ public class DrillbitContext {
return reader;
}
- public PStoreProvider getPersistentStoreProvider() {
+ public PersistentStoreProvider getStoreProvider() {
return provider;
}
@@ -180,4 +180,8 @@ public class DrillbitContext {
return classpathScan;
}
+ @Override
+ public void close() throws Exception {
+ getOptionManager().close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 72eb306..06bb686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -17,15 +17,12 @@
*/
package org.apache.drill.exec.server;
-import java.io.Closeable;
-import java.io.IOException;
-
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
import org.apache.drill.exec.memory.BufferAllocator;
-public class RemoteServiceSet implements Closeable {
+public class RemoteServiceSet implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
private final ClusterCoordinator coordinator;
@@ -41,7 +38,7 @@ public class RemoteServiceSet implements Closeable {
}
@Override
- public void close() throws IOException {
+ public void close() throws Exception {
coordinator.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
index a2b2e93..8753a51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java
@@ -23,11 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.sys.PersistentStore;
/**
* An {@link OptionValue option value} is used by an {@link OptionManager} to store a run-time setting. This setting,
* for example, could affect a query in execution stage. Instances of this class are JSON serializable and can be stored
- * in a {@link org.apache.drill.exec.store.sys.PStore persistent store} (see {@link SystemOptionManager#options}), or
+ * in a {@link PersistentStore persistent store} (see {@link SystemOptionManager#options}), or
* in memory (see {@link InMemoryOptionManager#options}).
*/
@JsonInclude(Include.NON_NULL)
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e54b914..8b14076 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections.IteratorUtils;
import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -34,9 +35,9 @@ import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionValue.OptionType;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.util.AssertionUtil;
import static com.google.common.base.Preconditions.checkArgument;
@@ -46,7 +47,7 @@ import static com.google.common.base.Preconditions.checkArgument;
* Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
* persist between restarts.
*/
-public class SystemOptionManager extends BaseOptionManager {
+public class SystemOptionManager extends BaseOptionManager implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class);
private static final CaseInsensitiveMap<OptionValidator> VALIDATORS;
@@ -143,19 +144,19 @@ public class SystemOptionManager extends BaseOptionManager {
VALIDATORS = CaseInsensitiveMap.newImmutableMap(tmp);
}
- private final PStoreConfig<OptionValue> config;
+ private final PersistentStoreConfig<OptionValue> config;
- private final PStoreProvider provider;
+ private final PersistentStoreProvider provider;
/**
* Persistent store for options that have been changed from default.
* NOTE: CRUD operations must use lowercase keys.
*/
- private PStore<OptionValue> options;
+ private PersistentStore<OptionValue> options;
- public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PStoreProvider provider) {
+ public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PersistentStoreProvider provider) {
this.provider = provider;
- this.config = PStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class)
+ this.config = PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class)
.name("sys.options")
.build();
}
@@ -166,10 +167,10 @@ public class SystemOptionManager extends BaseOptionManager {
* @return this option manager
* @throws IOException
*/
- public SystemOptionManager init() throws IOException {
- options = provider.getStore(config);
+ public SystemOptionManager init() throws Exception {
+ options = provider.getOrCreateStore(config);
// if necessary, deprecate and replace options from persistent store
- for (final Entry<String, OptionValue> option : options) {
+ for (final Entry<String, OptionValue> option : Lists.newArrayList(options.getAll())) {
final String name = option.getKey();
final OptionValidator validator = VALIDATORS.get(name);
if (validator == null) {
@@ -215,7 +216,7 @@ public class SystemOptionManager extends BaseOptionManager {
buildList.put(entry.getKey(), entry.getValue().getDefault());
}
// override if changed
- for (final Map.Entry<String, OptionValue> entry : options) {
+ for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
buildList.put(entry.getKey(), entry.getValue());
}
return buildList.values().iterator();
@@ -260,7 +261,7 @@ public class SystemOptionManager extends BaseOptionManager {
public void deleteAllOptions(OptionType type) {
checkArgument(type == OptionType.SYSTEM, "OptionType must be SYSTEM.");
final Set<String> names = Sets.newHashSet();
- for (final Map.Entry<String, OptionValue> entry : options) {
+ for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) {
names.add(entry.getKey());
}
for (final String name : names) {
@@ -272,4 +273,9 @@ public class SystemOptionManager extends BaseOptionManager {
public OptionList getOptionList() {
return (OptionList) IteratorUtils.toList(iterator());
}
+
+ @Override
+ public void close() throws Exception {
+ options.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 300c617..d8533b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.server.rest.profile.ProfileResources;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.glassfish.hk2.api.Factory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
@@ -80,7 +80,7 @@ public class DrillRestServer extends ResourceConfig {
protected void configure() {
bind(workManager).to(WorkManager.class);
bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
- bind(workManager.getContext().getPersistentStoreProvider()).to(PStoreProvider.class);
+ bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class);
bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class);
bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index 2af9cac..3266eda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -73,7 +73,7 @@ public class StorageResources {
public List<PluginConfigWrapper> getStoragePluginsJSON() {
List<PluginConfigWrapper> list = Lists.newArrayList();
- for (Map.Entry<String, StoragePluginConfig> entry : storage.getStore()) {
+ for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(storage.getStore().getAll())) {
PluginConfigWrapper plugin = new PluginConfigWrapper(entry.getKey(), entry.getValue());
list.add(plugin);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 0c04c9e..ddc9da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.server.rest.profile;
-import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,7 +35,10 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.SecurityContext;
import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
@@ -44,8 +46,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.rest.ViewableWithPermissions;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.QueryManager;
@@ -58,6 +60,8 @@ import com.google.common.collect.Lists;
public class ProfileResources {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
+ public final static int MAX_PROFILES = 100;
+
@Inject WorkManager work;
@Inject DrillUserPrincipal principal;
@Inject SecurityContext sc;
@@ -119,8 +123,12 @@ public class ProfileResources {
}
- private PStoreProvider provider(){
- return work.getContext().getPersistentStoreProvider();
+ protected PersistentStoreProvider getProvider() {
+ return work.getContext().getStoreProvider();
+ }
+
+ protected ClusterCoordinator getCoordinator() {
+ return work.getContext().getClusterCoordinator();
}
@XmlRootElement
@@ -146,38 +154,37 @@ public class ProfileResources {
@Path("/profiles.json")
@Produces(MediaType.APPLICATION_JSON)
public QProfiles getProfilesJSON() {
- PStore<QueryProfile> completed = null;
- PStore<QueryInfo> running = null;
try {
- completed = provider().getStore(QueryManager.QUERY_PROFILE);
- running = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
- } catch (IOException e) {
- logger.debug("Failed to get profiles from persistent or ephemeral store.");
- return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
- }
+ final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+ final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
- List<ProfileInfo> runningQueries = Lists.newArrayList();
+ final List<ProfileInfo> runningQueries = Lists.newArrayList();
- for (Map.Entry<String, QueryInfo> entry : running) {
- QueryInfo profile = entry.getValue();
- if (principal.canManageProfileOf(profile.getUser())) {
- runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
- profile.getQuery(), profile.getState().name(), profile.getUser()));
+ for (final Map.Entry<String, QueryInfo> entry: Lists.newArrayList(running.entries())) {
+ final QueryInfo profile = entry.getValue();
+ if (principal.canManageProfileOf(profile.getUser())) {
+ runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
+ profile.getQuery(), profile.getState().name(), profile.getUser()));
+ }
}
- }
- Collections.sort(runningQueries, Collections.reverseOrder());
+ Collections.sort(runningQueries, Collections.reverseOrder());
- List<ProfileInfo> finishedQueries = Lists.newArrayList();
- for (Map.Entry<String, QueryProfile> entry : completed) {
- QueryProfile profile = entry.getValue();
- if (principal.canManageProfileOf(profile.getUser())) {
- finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
- profile.getQuery(), profile.getState().name(), profile.getUser()));
+ List<ProfileInfo> finishedQueries = Lists.newArrayList();
+ for (Map.Entry<String, QueryProfile> entry : Lists.newArrayList(completed.getRange(0, MAX_PROFILES))) {
+ QueryProfile profile = entry.getValue();
+ if (principal.canManageProfileOf(profile.getUser())) {
+ finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(),
+ profile.getQuery(), profile.getState().name(), profile.getUser()));
+ }
}
+
+ return new QProfiles(runningQueries, finishedQueries);
+ } catch (Exception e) {
+ logger.debug("Failed to get profiles from persistent or ephemeral store.");
+ return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
}
- return new QProfiles(runningQueries, finishedQueries);
}
@GET
@@ -188,7 +195,7 @@ public class ProfileResources {
return ViewableWithPermissions.create("/rest/profile/list.ftl", sc, profiles);
}
- private QueryProfile getQueryProfile(String queryId) throws IOException {
+ private QueryProfile getQueryProfile(String queryId) {
QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
// first check local running
@@ -200,9 +207,9 @@ public class ProfileResources {
}
// then check remote running
- try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
- QueryInfo info = runningQueries.get(queryId);
+ try {
+ final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+ final QueryInfo info = running.get(queryId);
if (info != null) {
QueryProfile queryProfile = work.getContext()
.getController()
@@ -217,11 +224,15 @@ public class ProfileResources {
}
// then check blob store
- PStore<QueryProfile> profiles = provider().getStore(QueryManager.QUERY_PROFILE);
- QueryProfile queryProfile = profiles.get(queryId);
- if (queryProfile != null) {
- checkOrThrowProfileViewAuthorization(queryProfile);
- return queryProfile;
+ try {
+ final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE);
+ final QueryProfile queryProfile = profiles.get(queryId);
+ if (queryProfile != null) {
+ checkOrThrowProfileViewAuthorization(queryProfile);
+ return queryProfile;
+ }
+ } catch (final Exception e) {
+ throw new DrillRuntimeException("error while retrieving profile", e);
}
throw UserException.validationError()
@@ -236,7 +247,7 @@ public class ProfileResources {
public String getProfileJSON(@PathParam("queryid") String queryId) {
try {
return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
- } catch (IOException e) {
+ } catch (Exception e) {
logger.debug("Failed to serialize profile for: " + queryId);
return ("{ 'message' : 'error (unable to serialize profile)' }");
}
@@ -245,7 +256,7 @@ public class ProfileResources {
@GET
@Path("/profiles/{queryid}")
@Produces(MediaType.TEXT_HTML)
- public Viewable getProfile(@PathParam("queryid") String queryId) throws IOException {
+ public Viewable getProfile(@PathParam("queryid") String queryId){
ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId));
return ViewableWithPermissions.create("/rest/profile/profile.ftl", sc, wrapper);
}
@@ -254,7 +265,7 @@ public class ProfileResources {
@GET
@Path("/profiles/cancel/{queryid}")
@Produces(MediaType.TEXT_PLAIN)
- public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+ public String cancelQuery(@PathParam("queryid") String queryId) {
QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
@@ -267,9 +278,9 @@ public class ProfileResources {
}
// then check remote running
- try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
- QueryInfo info = runningQueries.get(queryId);
+ try {
+ final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO);
+ final QueryInfo info = running.get(queryId);
checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
if(a.getOk()){
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index c7d364b..b6eed2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -26,7 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PersistentStore;
public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable {
final String SYS_PLUGIN = "sys";
@@ -104,7 +104,7 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag
* Get the PStore for this StoragePluginRegistry. (Used in the management layer.)
* @return PStore for StoragePlugin configuration objects.
*/
- PStore<StoragePluginConfig> getStore();
+ PersistentStore<StoragePluginConfig> getStore();
/**
* Return StoragePlugin rule sets.
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
index fefa183..e680502 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java
@@ -46,6 +46,7 @@ import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.logical.DrillRuleSets;
import org.apache.drill.exec.planner.logical.StoragePlugins;
@@ -54,8 +55,8 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.SystemTablePlugin;
import org.apache.drill.exec.store.sys.SystemTablePluginConfig;
@@ -82,7 +83,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
private DrillbitContext context;
private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
- private final PStore<StoragePluginConfig> pluginSystemTable;
+ private final PersistentStore<StoragePluginConfig> pluginSystemTable;
private final LogicalPlanPersistence lpPersistence;
private final ScanResult classpathScan;
private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
@@ -93,12 +94,12 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
this.classpathScan = checkNotNull(context.getClasspathScan());
try {
this.pluginSystemTable = context //
- .getPersistentStoreProvider() //
- .getStore(PStoreConfig //
+ .getStoreProvider() //
+ .getOrCreateStore(PersistentStoreConfig //
.newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) //
.name(PSTORE_NAME) //
.build());
- } catch (IOException | RuntimeException e) {
+ } catch (StoreException | RuntimeException e) {
logger.error("Failure while loading storage plugin registry.", e);
throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e);
}
@@ -120,7 +121,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
});
}
- public PStore<StoragePluginConfig> getStore() {
+ public PersistentStore<StoragePluginConfig> getStore() {
return pluginSystemTable;
}
@@ -137,7 +138,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
* Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into
* the system table.
*/
- if (!pluginSystemTable.iterator().hasNext()) {
+ if (!pluginSystemTable.getAll().hasNext()) {
// bootstrap load the config since no plugins are stored.
logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
@@ -162,7 +163,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
}
Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
- for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) {
+ for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(pluginSystemTable.getAll())) {
String name = entry.getKey();
StoragePluginConfig config = entry.getValue();
if (config.isEnabled()) {
@@ -385,7 +386,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
Set<String> currentPluginNames = Sets.newHashSet(plugins.names());
// iterate through the plugin instances in the persistence store adding
// any new ones and refreshing those whose configuration has changed
- for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) {
+ for (Map.Entry<String, StoragePluginConfig> config : Lists.newArrayList(pluginSystemTable.getAll())) {
if (config.getValue().isEnabled()) {
getPlugin(config.getKey());
currentPluginNames.remove(config.getKey());
@@ -460,6 +461,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry {
public synchronized void close() throws Exception {
ephemeralPlugins.invalidateAll();
plugins.close();
+ pluginSystemTable.close();
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
new file mode 100644
index 0000000..248c3cb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.common.collections.ImmutableEntry;
+
+public abstract class BasePersistentStore<V> implements PersistentStore<V> {
+
+ @Override
+ public Iterator<Map.Entry<String, V>> getAll() {
+ return getRange(0, Integer.MAX_VALUE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
deleted file mode 100644
index 68440cb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.sys;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-public class CachingStoreProvider implements PStoreProvider, AutoCloseable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class);
-
- private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap();
- private final PStoreProvider provider;
-
- public CachingStoreProvider(PStoreProvider provider) {
- super();
- this.provider = provider;
- }
-
- @SuppressWarnings("unchecked")
- public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException {
- PStore<?> s = storeCache.get(config);
- if(s == null){
- PStore<?> newStore = provider.getStore(config);
- s = storeCache.putIfAbsent(config, newStore);
- if(s == null){
- s = newStore;
- }else{
- newStore.close();
- }
- }
-
- return (PStore<V>) s;
-
- }
-
- @Override
- public void start() throws IOException {
- provider.start();
- }
-
- @Override
- public void close() throws Exception {
- for(PStore<?> store : storeCache.values()){
- store.close();
- }
- storeCache.clear();
- provider.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
deleted file mode 100644
index 2d04957..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.sys;
-
-
-/**
- * Interfaces to define EStore, which is keep track of status/information for running queries. The information
- * would be gone, if the query is completed, or the foreman drillbit is not responding.
- * @param <V>
- */
-public interface EStore <V> extends PStore<V> {
-}
|