http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
new file mode 100644
index 0000000..46d4eca
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ZookeeperCacheStore.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import org.infinispan.marshall.core.MarshalledEntry;
+import org.infinispan.persistence.spi.ExternalStore;
+import org.infinispan.persistence.spi.InitializationContext;
+
+/**
+ * Stores the cached objects in zookeeper. Objects are stored in /start/cache_name/key_name
= data
+ * @param <K>
+ * @param <V>
+ */
+public class ZookeeperCacheStore<K, V> implements ExternalStore<K, V>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperCacheStore.class);
+
+ private String cacheName;
+
+ @Override
+ public void init(InitializationContext ctx) {
+ ctx.getConfiguration();
+
+ }
+
+ @Override
+ public MarshalledEntry<K, V> load(K key) {
+ return null;
+ }
+
+ @Override
+ public boolean contains(K key) {
+ return false;
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void write(MarshalledEntry<K, V> entry) {
+ }
+
+ @Override
+ public boolean delete(K key) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
new file mode 100644
index 0000000..e66cc90
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/local/LocalCache.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.local;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.cache.Counter;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+
+public class LocalCache implements DistributedCache {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
+
+ private volatile Map<FragmentHandle, PlanFragment> handles;
+ private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
+ private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
+ private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
+ private volatile ConcurrentMap<String, Counter> counters;
+ private static final BufferAllocator allocator = new TopLevelAllocator();
+
+ private static final ObjectMapper mapper = DrillConfig.create().getMapper();
+
+ @Override
+ public void close() throws IOException {
+ handles = null;
+ }
+
+ @Override
+ public void run() throws DrillbitStartupException {
+ handles = Maps.newConcurrentMap();
+ maps = Maps.newConcurrentMap();
+ multiMaps = Maps.newConcurrentMap();
+ counters = Maps.newConcurrentMap();
+ namedMaps = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public PlanFragment getFragment(FragmentHandle handle) {
+// logger.debug("looking for fragment with handle: {}", handle);
+ return handles.get(handle);
+ }
+
+ @Override
+ public void storeFragment(PlanFragment fragment) {
+// logger.debug("Storing fragment: {}", fragment);
+ handles.put(fragment.getHandle(), fragment);
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V>
clazz) {
+ DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
+ if (mmap == null) {
+ multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
+ return (DistributedMultiMap<V>) multiMaps.get(clazz);
+ } else {
+ return mmap;
+ }
+ }
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V>
clazz) {
+ DistributedMap m = maps.get(clazz);
+ if (m == null) {
+ maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
+ return (DistributedMap<V>) maps.get(clazz);
+ } else {
+ return m;
+ }
+ }
+
+
+ @Override
+ public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name,
Class<V> clazz) {
+ DistributedMap m = namedMaps.get(clazz);
+ if (m == null) {
+ namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
+ return (DistributedMap<V>) namedMaps.get(name);
+ } else {
+ return m;
+ }
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ Counter c = counters.get(name);
+ if (c == null) {
+ counters.putIfAbsent(name, new LocalCounterImpl());
+ return counters.get(name);
+ } else {
+ return c;
+ }
+ }
+
+ public static ByteArrayDataOutput serialize(DrillSerializable obj) {
+ if(obj instanceof JacksonSerializable){
+ try{
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ out.write(mapper.writeValueAsBytes(obj));
+ return out;
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
+ try {
+ obj.writeToStream(outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ outputStream.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return out;
+ }
+
+ public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V>
clazz) {
+ if(JacksonSerializable.class.isAssignableFrom(clazz)){
+ try{
+ return (V) mapper.readValue(bytes, clazz);
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+ InputStream inputStream = DataInputInputStream.constructInputStream(in);
+ try {
+ V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
+ obj.readFromStream(inputStream);
+ return obj;
+ } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException
| InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements
DistributedMultiMap<V> {
+ private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
+ private Class<V> clazz;
+
+ public LocalDistributedMultiMapImpl(Class<V> clazz) {
+ mmap = ArrayListMultimap.create();
+ this.clazz = clazz;
+ }
+
+ @Override
+ public Collection<V> get(String key) {
+ List<V> list = Lists.newArrayList();
+ for (ByteArrayDataOutput o : mmap.get(key)) {
+ list.add(deserialize(o.toByteArray(), this.clazz));
+ }
+ return list;
+ }
+
+ @Override
+ public void put(String key, DrillSerializable value) {
+ mmap.put(key, serialize(value));
+ }
+ }
+
+ public static class LocalDistributedMapImpl<V extends DrillSerializable> implements
DistributedMap<V> {
+ protected ConcurrentMap<String, ByteArrayDataOutput> m;
+ protected Class<V> clazz;
+
+ public LocalDistributedMapImpl(Class<V> clazz) {
+ m = Maps.newConcurrentMap();
+ this.clazz = clazz;
+ }
+
+ @Override
+ public V get(String key) {
+ if (m.get(key) == null) return null;
+ ByteArrayDataOutput b = m.get(key);
+ byte[] bytes = b.toByteArray();
+ return (V) deserialize(m.get(key).toByteArray(), this.clazz);
+ }
+
+ @Override
+ public void put(String key, V value) {
+ m.put(key, serialize(value));
+ }
+
+ @Override
+ public void putIfAbsent(String key, V value) {
+ m.putIfAbsent(key, serialize(value));
+ }
+
+ @Override
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
+ m.putIfAbsent(key, serialize(value));
+ logger.warn("Expiration not implemented in local map cache");
+ }
+
+ private class DeserializingTransformer implements Iterator<Map.Entry<String, V>
>{
+ private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
+
+ public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>>
inner) {
+ super();
+ this.inner = inner;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public Entry<String, V> next() {
+ return newEntry(inner.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput>
input) {
+ return new Map.Entry<String, V>(){
+
+ @Override
+ public String getKey() {
+ return input.getKey();
+ }
+
+ @Override
+ public V getValue() {
+ return deserialize(input.getValue().toByteArray(), clazz);
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+
+ };
+ }
+
+ }
+ @Override
+ public Iterator<Entry<String, V>> iterator() {
+ return new DeserializingTransformer(m.entrySet().iterator());
+ }
+ }
+
+ public static class LocalCounterImpl implements Counter {
+ private AtomicLong al = new AtomicLong();
+
+ @Override
+ public long get() {
+ return al.get();
+ }
+
+ @Override
+ public long incrementAndGet() {
+ return al.incrementAndGet();
+ }
+
+ @Override
+ public long decrementAndGet() {
+ return al.decrementAndGet();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index f105363..a0c439e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -25,13 +25,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.cache.CachedVectorContainer;
import org.apache.drill.exec.cache.Counter;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.DistributedMap;
@@ -115,9 +115,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private int recordCount;
private final IntVector partitionKeyVector;
- private final DistributedMap<VectorAccessibleSerializable> tableMap;
+ private final DistributedMap<CachedVectorContainer> tableMap;
private final Counter minorFragmentSampleCount;
- private final DistributedMultiMap<VectorAccessibleSerializable> mmap;
+ private final DistributedMultiMap<CachedVectorContainer> mmap;
private final String mapKey;
private List<VectorContainer> sampledIncomingBatches;
@@ -131,8 +131,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
this.completionFactor = pop.getCompletionFactor();
DistributedCache cache = context.getDrillbitContext().getCache();
- this.mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
- this.tableMap = cache.getMap(VectorAccessibleSerializable.class);
+ this.mmap = cache.getMultiMap(CachedVectorContainer.class);
+ this.tableMap = cache.getMap(CachedVectorContainer.class);
Preconditions.checkNotNull(tableMap);
this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
@@ -220,7 +220,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// into a serializable wrapper object, and then add to distributed map
WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(),
containerToCache, false);
- VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, oContext.getAllocator());
+ CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
mmap.put(mapKey, sampleToSave);
this.sampledIncomingBatches = builder.getHeldRecordBatches();
@@ -251,7 +251,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
return false;
}
- VectorAccessibleSerializable finalTable = null;
+ CachedVectorContainer finalTable = null;
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
@@ -301,8 +301,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Get all samples from distributed map
- SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(oContext.getAllocator(),
MAX_SORT_BYTES);
- for (VectorAccessibleSerializable w : mmap.get(mapKey)) {
+ SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(),
MAX_SORT_BYTES);
+ for (CachedVectorContainer w : mmap.get(mapKey)) {
containerBuilder.add(w.get());
}
VectorContainer allSamplesContainer = new VectorContainer();
@@ -346,7 +346,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
candidatePartitionTable.setRecordCount(copier.getOutputRecords());
WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(),
candidatePartitionTable, false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
+ CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
candidatePartitionTable.clear();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 7297dc3..0e3181d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,7 +22,7 @@ import java.io.Closeable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.cache.hazel.HazelCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index c0b82bd..2078107 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -21,17 +21,17 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.cache.local.LocalCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.LocalClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
public class RemoteServiceSet implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
-
+
private final DistributedCache cache;
private final ClusterCoordinator coordinator;
-
+
public RemoteServiceSet(DistributedCache cache, ClusterCoordinator coordinator) {
super();
this.cache = cache;
@@ -46,16 +46,21 @@ public class RemoteServiceSet implements Closeable{
public ClusterCoordinator getCoordinator() {
return coordinator;
}
-
-
+
+
@Override
public void close() throws IOException {
+ try{
cache.close();
+ }catch(Exception e){
+ if(e instanceof IOException) throw (IOException) e;
+ throw new IOException("Failure while closing cache", e);
+ }
coordinator.close();
}
public static RemoteServiceSet getLocalServiceSet(){
return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator());
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
index 99e712b..20722d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java
@@ -29,7 +29,7 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.cache.local.LocalCache;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.ops.QueryContext;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
new file mode 100644
index 0000000..13322f1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/ISpan.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.infinispan.Cache;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfiguration;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+
+import com.google.hive12.common.collect.Lists;
+
+public class ISpan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ISpan.class);
+
+
+ public static void main(String[] args) throws Exception{
+ GlobalConfiguration gc = new GlobalConfigurationBuilder().transport().defaultTransport().build();
+ Configuration c = new ConfigurationBuilder() //
+ .clustering().cacheMode(CacheMode.DIST_ASYNC) //
+ .storeAsBinary()
+ .build();
+ EmbeddedCacheManager ecm = new DefaultCacheManager(gc, c);
+
+ Cache<String, List<XT>> cache = ecm.getCache();
+ List<XT> items = Lists.newArrayList();
+ items.add(new XT(1));
+ items.add(new XT(2));
+
+ cache.put("items", items);
+ for(XT x : cache.get("items")){
+ System.out.println(x.i);
+ }
+
+
+ }
+
+ private static class XT extends AbstractDataSerializable{
+
+ int i =0;
+
+
+ public XT(int i) {
+ super();
+ this.i = i;
+ }
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ i = input.readInt();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(i);
+ }
+
+ @Override
+ public String toString() {
+ return "XT [i=" + i + "]";
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index a3d39a3..7686614 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.cache;
-import com.beust.jcommander.internal.Lists;
+import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -25,67 +25,105 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.cache.hazel.HazelCache;
+import org.apache.drill.exec.cache.infinispan.ICache;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
import org.junit.Test;
-import java.util.List;
+import com.beust.jcommander.internal.Lists;
-public class TestVectorCache extends ExecTest{
+public class TestVectorCache extends ExecTest{
- @Test
- public void testVectorCache() throws Exception {
+ private void testCache(DrillConfig config, DistributedCache dcache) throws Exception {
List<ValueVector> vectorList = Lists.newArrayList();
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- DrillConfig config = DrillConfig.create();
- Drillbit bit = new Drillbit(config, serviceSet);
- bit.run();
- DrillbitContext context = bit.getContext();
- HazelCache cache = new HazelCache(config, context.getAllocator());
- cache.run();
- MaterializedField intField = MaterializedField.create(SchemaPath.getSimplePath("int"),
Types.required(TypeProtos.MinorType.INT));
- IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
- MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
Types.required(TypeProtos.MinorType.VARBINARY));
- VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
- AllocationHelper.allocate(intVector, 4, 4);
- AllocationHelper.allocate(binVector, 4, 5);
- vectorList.add(intVector);
- vectorList.add(binVector);
-
- intVector.getMutator().setSafe(0, 0); binVector.getMutator().setSafe(0, "ZERO".getBytes());
- intVector.getMutator().setSafe(1, 1); binVector.getMutator().setSafe(1, "ONE".getBytes());
- intVector.getMutator().setSafe(2, 2); binVector.getMutator().setSafe(2, "TWO".getBytes());
- intVector.getMutator().setSafe(3, 3); binVector.getMutator().setSafe(3, "THREE".getBytes());
- intVector.getMutator().setValueCount(4);
- binVector.getMutator().setValueCount(4);
-
- VectorContainer container = new VectorContainer();
- container.addCollection(vectorList);
- container.setRecordCount(4);
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container,
false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
-
- DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
- mmap.put("vectors", wrap);
- VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
-
- VectorAccessible newContainer = newWrap.get();
- for (VectorWrapper w : newContainer) {
- ValueVector vv = w.getValueVector();
- int values = vv.getAccessor().getValueCount();
- for (int i = 0; i < values; i++) {
- Object o = vv.getAccessor().getObject(i);
- if (o instanceof byte[]) {
- System.out.println(new String((byte[])o));
- } else {
- System.out.println(o);
+ try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache cache = dcache)
{
+ bit.run();
+ cache.run();
+
+ DrillbitContext context = bit.getContext();
+
+
+ MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN),
+ Types.required(TypeProtos.MinorType.INT));
+ IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator());
+ MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
+ Types.required(TypeProtos.MinorType.VARBINARY));
+ VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator());
+ AllocationHelper.allocate(intVector, 4, 4);
+ AllocationHelper.allocate(binVector, 4, 5);
+ vectorList.add(intVector);
+ vectorList.add(binVector);
+
+ intVector.getMutator().set(0, 0);
+ binVector.getMutator().set(0, "ZERO".getBytes());
+ intVector.getMutator().set(1, 1);
+ binVector.getMutator().set(1, "ONE".getBytes());
+ intVector.getMutator().set(2, 2);
+ binVector.getMutator().set(2, "TWO".getBytes());
+ intVector.getMutator().set(3, 3);
+ binVector.getMutator().set(3, "THREE".getBytes());
+ intVector.getMutator().setValueCount(4);
+ binVector.getMutator().setValueCount(4);
+
+ VectorContainer container = new VectorContainer();
+ container.addCollection(vectorList);
+ container.setRecordCount(4);
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container,
false);
+ CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getAllocator());
+
+ DistributedMultiMap<CachedVectorContainer> mmap = cache.getMultiMap(CachedVectorContainer.class);
+ mmap.put("vectors", wrap);
+
+ CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next();
+
+ VectorAccessible newContainer = newWrap.get();
+ for (VectorWrapper<?> w : newContainer) {
+ ValueVector vv = w.getValueVector();
+ int values = vv.getAccessor().getValueCount();
+ for (int i = 0; i < values; i++) {
+ Object o = vv.getAccessor().getObject(i);
+ if (o instanceof byte[]) {
+ System.out.println(new String((byte[]) o));
+ } else {
+ System.out.println(o);
+ }
}
}
+
+ newWrap.clear();
}
+
+ }
+
+ @Test
+ public void testHazelVectorCache() throws Exception {
+ DrillConfig c = DrillConfig.create();
+ HazelCache cache = new HazelCache(c, new TopLevelAllocator());
+ cache.run();
+ testCache(c, cache);
+ cache.close();
+ }
+
+ @Test
+ public void testICache() throws Exception {
+ DrillConfig c = DrillConfig.create();
+ ICache cache = new ICache(c, new TopLevelAllocator());
+ testCache(c, cache);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
index 63bc0a9..a5dbfe5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOrphanSchema.java
@@ -23,7 +23,7 @@ import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.cache.local.LocalCache;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
index acb5929..3ccb96b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/OrphanSchema.java
@@ -18,12 +18,13 @@
package org.apache.drill.exec.store.ischema;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.cache.LocalCache;
+import org.apache.drill.exec.cache.local.LocalCache;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
@@ -40,7 +41,7 @@ public class OrphanSchema {
* @return root node of the created schema.
*/
public static SchemaPlus create() throws Exception {
-
+
final DrillConfig c = DrillConfig.create();
// Mock up a context which will allow us to create a schema.
@@ -51,7 +52,7 @@ public class OrphanSchema {
when(bitContext.getCache()).thenReturn(new LocalCache());
bitContext.getCache().run();
-
+
// Using the mock context, get the orphan schema.
StoragePluginRegistry r = new StoragePluginRegistry(bitContext);
r.init();
|