drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [12/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 01:52:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
deleted file mode 100644
index 58edf03..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.EndpointAffinity;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-
-
-public abstract class AbstractStore extends AbstractSingle implements Store{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
-
-  public AbstractStore(PhysicalOperator child) {
-    super(child);
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
-    return physicalVisitor.visitStore(this, value);
-  }
-
-
-  
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
deleted file mode 100644
index d779eb8..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Exchange.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.OperatorCost;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Exchange extends PhysicalOperator{
-
-  @JsonIgnore
-  public abstract OperatorCost getAggregateSendCost();
-
-  @JsonIgnore
-  public abstract OperatorCost getAggregateReceiveCost();
-
-  @JsonProperty("cost")
-  public abstract ExchangeCost getExchangeCost();
-
-  /**
-   * Inform this Exchange node about its sender locations.
-   * @param senderLocations
-   */
-  public abstract void setupSenders(List<DrillbitEndpoint> senderLocations);
-
-  /**
-   * Inform this Exchange node about its receiver locations.
-   * @param receiverLocations
-   */
-  public abstract void setupReceivers(List<DrillbitEndpoint> receiverLocations);
-
-  /**
-   * Get the Sender associated with the given minorFragmentId.  
-   * @param minorFragmentId The minor fragment id, must be in the range [0, fragment.width).
-   * @param child The feeding node for the requested sender.
-   * @return The materialized sender for the given arguments.
-   */
-  public abstract Sender getSender(int minorFragmentId, PhysicalOperator child);
-
-  /**
-   * Get the Receiver associated with the given minorFragmentId.
-   * @param minorFragmentId The minor fragment id, must be in the range [0, fragment.width).
-   * @return The materialized recevier for the given arguments.
-   */
-  public abstract Receiver getReceiver(int minorFragmentId);
-
-  public abstract int getMaxSendWidth();
-
-  public PhysicalOperator getChild();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
deleted file mode 100644
index f17203e..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/ExchangeCost.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.physical.OperatorCost;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class ExchangeCost {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeCost.class);
-  
-  private final OperatorCost send;
-  private final OperatorCost receive;
-  private final OperatorCost combined;
-  
-  @JsonCreator
-  public ExchangeCost(@JsonProperty("send") OperatorCost send, @JsonProperty("receive") OperatorCost receive) {
-    this.send = send;
-    this.receive = receive;
-    this.combined =  OperatorCost.combine(send,  receive);
-  }
-
-  @JsonIgnore
-  public OperatorCost getCombinedCost(){
-    return combined;
-  }
-
-  public OperatorCost getSend() {
-    return send;
-  }
-
-  public OperatorCost getReceive() {
-    return receive;
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
deleted file mode 100644
index 4557df4..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentLeaf.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-
-/**
- * A POP which relies on no other nodes within the current fragment.
- */
-public interface FragmentLeaf extends PhysicalOperator{
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
deleted file mode 100644
index 8d87d56..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/FragmentRoot.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-
-/**
- * Describes the root operation within a particular Fragment. This includes things like Sinks, and Sender nodes. 
- */
-public interface FragmentRoot extends PhysicalOperator{
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
deleted file mode 100644
index feb32ec..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/HasAffinity.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.EndpointAffinity;
-
-public interface HasAffinity extends PhysicalOperator{
-  public List<EndpointAffinity> getOperatorAffinity();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
deleted file mode 100644
index 28efb94..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Leaf.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-public interface Leaf extends PhysicalOperator{
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
deleted file mode 100644
index d8d1b64..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.graph.GraphValue;
-import org.apache.drill.common.physical.OperatorCost;
-
-import com.fasterxml.jackson.annotation.JsonIdentityInfo;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.ObjectIdGenerators;
-
-@JsonInclude(Include.NON_NULL)
-@JsonPropertyOrder({ "@id" })
-@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
-
-  public OperatorCost getCost();
-
-  /**
-   * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be
-   * executed. However, Exchange nodes cannot be executed. In order to be executed, they must be converted into their
-   * Exec sub components.
-   * 
-   * @return
-   */
-  @JsonIgnore
-  public boolean isExecutable();
-  
-  /**
-   * Provides capability to build a set of output based on traversing a query graph tree.
-   * @param physicalVisitor
-   * @return
-   */
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E;
-
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
deleted file mode 100644
index fb1fdcd..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalOperatorUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.config.CommonConstants;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.PathScanner;
-
-public class PhysicalOperatorUtil {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class);
-  
-  private PhysicalOperatorUtil(){}
-  
-  public synchronized static Class<?>[] getSubTypes(DrillConfig config){
-    Class<?>[] ops = PathScanner.scanForImplementationsArr(PhysicalOperator.class, config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
-    logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) );
-    return ops;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
deleted file mode 100644
index 2ecc6ce..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/PhysicalVisitor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import org.apache.drill.common.physical.pop.Filter;
-import org.apache.drill.common.physical.pop.PartitionToRandomExchange;
-import org.apache.drill.common.physical.pop.Project;
-import org.apache.drill.common.physical.pop.Sort;
-
-public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalVisitor.class);
-  
-  
-  public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
-  public RETURN visitScan(Scan<?> scan, EXTRA value) throws EXCEP;
-  public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
-
-  public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
-  public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
-  public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
-  public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
-  public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
-  
-  public RETURN visitUnknown(PhysicalOperator op, EXTRA value) throws EXCEP;
-  
-  public RETURN visitPartitionToRandomExchange(PartitionToRandomExchange partitionToRandom, EXTRA value) throws EXCEP; 
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
deleted file mode 100644
index db8f71f..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Receiver.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-public interface Receiver extends FragmentLeaf {
-  public abstract List<DrillbitEndpoint> getProvidingEndpoints();
-
-  /**
-   * Whether or not this receive supports out of order exchange. This provides a hint for the scheduling node on whether
-   * the receiver can start work if only a subset of all sending endpoints are currently providing data. A random
-   * receiver would supports this form of operation. A NWAY receiver would not.
-   * 
-   * @return True if this receiver supports working on a streaming/out of order input.
-   */
-  public abstract boolean supportsOutOfOrderExchange();
-  
-  
-  public int getSenderCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
deleted file mode 100644
index c4f9982..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Root.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-/**
- * Marker interface describe the root of a query plan.
- */
-public interface Root extends PhysicalOperator{
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
deleted file mode 100644
index c7b45a8..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Scan.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.physical.ReadEntry;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Scan<R extends ReadEntry> extends Leaf, HasAffinity{
-
-  @JsonProperty("entries")
-  public abstract List<R> getReadEntries();
-
-  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
-
-  public abstract Scan<?> getSpecificScan(int minorFragmentId);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
deleted file mode 100644
index 1859657..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Sender.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-
-
-
-public interface Sender extends FragmentRoot{
-  public abstract List<DrillbitEndpoint> getDestinations();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
deleted file mode 100644
index eec4a6c..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/Store.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop.base;
-
-import java.util.List;
-
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-public interface Store extends Root, HasAffinity{
-
-  public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
-  public abstract Store getSpecificStore(PhysicalOperator child, int minorFragmentId);
-  public abstract int getMaxWidth();
-  public abstract PhysicalOperator getChild();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/protobuf/Coordination.proto b/sandbox/prototype/common/src/main/protobuf/Coordination.proto
deleted file mode 100644
index f98d2c5..0000000
--- a/sandbox/prototype/common/src/main/protobuf/Coordination.proto
+++ /dev/null
@@ -1,26 +0,0 @@
-package exec;
-
-option java_package = "org.apache.drill.common.proto";
-option java_outer_classname = "CoordinationProtos";
-option optimize_for = LITE_RUNTIME;
-
-message DrillbitEndpoint{
-  optional string address = 1;
-  optional int32 user_port = 2;
-  optional int32 bit_port = 3;
-  optional Roles roles = 4;
-}
-
-message DrillServiceInstance{
-  optional string id = 1;
-  optional int64 registrationTimeUTC = 2;
-  optional DrillbitEndpoint endpoint = 3;
-}
-
-message Roles{
-	optional bool sql_query = 1 [default = true];
-	optional bool logical_plan = 2 [default = true];
-	optional bool physical_plan = 3 [default = true];
-	optional bool java_executor = 4 [default = true];
-	optional bool distributed_cache = 5 [default = true];
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
deleted file mode 100644
index 1b042c5..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockScanPOP.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractScan;
-import org.apache.drill.common.physical.pop.base.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-scan")
-public class MockScanPOP extends AbstractScan<MockScanPOP.MockScanEntry>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanPOP.class);
-  
-  private final String url;
-  
-  @JsonCreator
-  public MockScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
-    super(readEntries);
-    this.url = url;
-  }
-  
-  public String getUrl() {
-    return url;
-  }
-
-  public static class MockScanEntry implements ReadEntry{
-    public int id;
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Scan<?> getSpecificScan(int minorFragmentId) {
-    return this;
-  }
-  
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
deleted file mode 100644
index f48c539..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/MockStorePOP.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.physical.pop.base.AbstractStore;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.physical.pop.base.Store;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-store")
-public class MockStorePOP extends AbstractStore {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
-
-  @JsonCreator
-  public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
-    super(child);
-  }
-
-  public int getMaxWidth() {
-    return 1;
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
-    return this;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
deleted file mode 100644
index 0ad1f76..0000000
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePhysicalPlan.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.FileUtils;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-public class ParsePhysicalPlan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePhysicalPlan.class);
-  
-  
-  @Test 
-  public void parseSimplePlan() throws Exception{
-    DrillConfig c = DrillConfig.create();
-    ObjectReader r = c.getMapper().reader(PhysicalPlan.class);
-    ObjectWriter writer = c.getMapper().writer();
-    PhysicalPlan plan = PhysicalPlan.parse(r, Files.toString(FileUtils.getResourceAsFile("/physical_test1.json"), Charsets.UTF_8));
-    System.out.println(plan.unparse(writer));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/drill-module.conf b/sandbox/prototype/common/src/test/resources/drill-module.conf
index 86e828a..0e2c84e 100644
--- a/sandbox/prototype/common/src/test/resources/drill-module.conf
+++ b/sandbox/prototype/common/src/test/resources/drill-module.conf
@@ -1,2 +1 @@
 drill.logical.storage.packages += "org.apache.drill.storage"
-drill.physical.operator.packages += "org.apache.drill.common.physical.pop"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/common/src/test/resources/physical_test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/physical_test1.json b/sandbox/prototype/common/src/test/resources/physical_test1.json
deleted file mode 100644
index 16bc87a..0000000
--- a/sandbox/prototype/common/src/test/resources/physical_test1.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    head:{
-        type:"APACHE_DRILL_PHYSICAL",
-        version:"1",
-        generator:{
-            type:"manual"
-        }
-    },
-    graph:[
-        {
-            @id:1,
-            pop:"mock-scan",
-            url: "http://apache.org",
-            entries:[
-            	{id:1}
-            ],
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
-        },
-        {
-            @id:2,
-            child: 1,
-            pop:"filter",
-            expr: "b > 5",
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
-        },
-        {
-            @id: 3,
-            child: 2,
-            pop: "mock-store",
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
-        }
-    ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 9766df7..f5ece33 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -56,7 +56,7 @@
 			<groupId>org.apache.drill</groupId>
 			<artifactId>common</artifactId>
 			<version>1.0-SNAPSHOT</version>
-			<classifier>test</classifier>
+			<classifier>tests</classifier>
 		</dependency>
 		<dependency>
 			<groupId>com.beust</groupId>
@@ -110,7 +110,7 @@
 		<dependency>
 			<groupId>io.netty</groupId>
 			<artifactId>netty-all</artifactId>
-			<version>4.0.0.CR1</version>
+			<version>4.0.0.CR2</version>
 		</dependency>
 		<dependency>
 			<groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
deleted file mode 100644
index 82a8a85..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ByteReorder.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec;
-
-import java.util.Arrays;
-
-import com.google.common.base.Charsets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.UnsignedBytes;
-
-public class ByteReorder {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteReorder.class);
-  
-  public static void main(String[] args){
-    String[] strings = {"hello", "goodbye", "my friend"};
-    byte[][] bytes = new byte[strings.length][];
-    for(int i =0; i < strings.length; i++){
-      bytes[i] = strings[i].getBytes(Charsets.UTF_8);
-    }
-    
-    for(int i =0; i < bytes.length; i++){
-      for(int v = 0; v < bytes[i].length; v++){
-        bytes[i][v] = (byte) ~bytes[i][v];
-      }
-    }
-    
-    Arrays.sort(bytes, UnsignedBytes.lexicographicalComparator());
-
-    for(int i =0; i < bytes.length; i++){
-      for(int v = 0; v < bytes[i].length; v++){
-        bytes[i][v] = (byte) ~bytes[i][v];
-      }
-    }
-
-    for(int i =0; i < bytes.length; i++){
-      System.out.println(new String(bytes[i]));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index 2928dbe..ba2c26b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -20,8 +20,9 @@ package org.apache.drill.exec.cache;
 import java.io.Closeable;
 import java.util.List;
 
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 
@@ -29,14 +30,11 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 public interface DistributedCache extends Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
   
-  public void run(DrillbitEndpoint endpoint) throws DrillbitStartupException;
+  public void run() throws DrillbitStartupException;
   
-  public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical);
-  public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical);
+//  public void updateLocalQueueLength(int length);
+//  public List<WorkQueueStatus> getQueueLengths(); 
   
-  public void updateLocalQueueLength(int length);
-  public List<WorkQueueStatus> getQueueLengths(); 
-  
-  public PlanFragment getFragment(long fragmentId);
+  public PlanFragment getFragment(FragmentHandle handle);
   public void storeFragment(PlanFragment fragment);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 943031d..f4fdbfa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -17,20 +17,22 @@
  ******************************************************************************/
 package org.apache.drill.exec.cache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
+import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 
 import com.beust.jcommander.internal.Lists;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.hazelcast.config.Config;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
@@ -38,39 +40,36 @@ import com.hazelcast.core.IMap;
 import com.hazelcast.core.ITopic;
 import com.hazelcast.core.Message;
 import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.DataSerializable;
 
 public class HazelCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
 
   private final String instanceName;
   private HazelcastInstance instance;
-  private ITopic<WrappedWorkQueueStatus> workQueueLengths;
-  private DrillbitEndpoint endpoint;
+  private ITopic<HWorkQueueStatus> workQueueLengths;
+  private HandlePlan fragments;
   private Cache<WorkQueueStatus, Integer>  endpoints;
-  private IMap<TemplatizedLogicalPlan, TemplatizedPhysicalPlan> optimizedPlans;
   
   public HazelCache(DrillConfig config) {
     this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
   }
 
-  private class Listener implements MessageListener<WrappedWorkQueueStatus>{
+  private class Listener implements MessageListener<HWorkQueueStatus>{
 
     @Override
-    public void onMessage(Message<WrappedWorkQueueStatus> wrapped) {
+    public void onMessage(Message<HWorkQueueStatus> wrapped) {
       logger.debug("Received new queue length message.");
-      endpoints.put(wrapped.getMessageObject().status, 0);
+      endpoints.put(wrapped.getMessageObject().get(), 0);
     }
     
   }
   
-  public void run(DrillbitEndpoint endpoint) {
+  public void run() {
     Config c = new Config();
     c.setInstanceName(instanceName);
     instance = getInstanceOrCreateNew(c);
     workQueueLengths = instance.getTopic("queue-length");
-    optimizedPlans = instance.getMap("plan-optimizations");
-    this.endpoint = endpoint;
+    fragments = new HandlePlan(instance);
     endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
     workQueueLengths.addMessageListener(new Listener());
   }
@@ -83,52 +82,16 @@ public class HazelCache implements DistributedCache {
     return Hazelcast.newHazelcastInstance(c);
   }
 
-  @Override
-  public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) {
-    optimizedPlans.put(logical, physical);
-  }
-
-  @Override
-  public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical) {
-    return optimizedPlans.get(logical);
-  }
-
-  @Override
-  public void updateLocalQueueLength(int length) {
-    workQueueLengths.publish(new WrappedWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
-        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
-  }
-
-  @Override
-  public List<WorkQueueStatus> getQueueLengths() {
-    return Lists.newArrayList(endpoints.asMap().keySet());
-  }
-
-  public class WrappedWorkQueueStatus implements DataSerializable {
-
-    public WorkQueueStatus status;
-
-    public WrappedWorkQueueStatus(WorkQueueStatus status) {
-      this.status = status;
-    }
-
-    @Override
-    public void readData(DataInput arg0) throws IOException {
-      int len = arg0.readShort();
-      byte[] b = new byte[len];
-      arg0.readFully(b);
-      this.status = WorkQueueStatus.parseFrom(b);
-    }
-
-    @Override
-    public void writeData(DataOutput arg0) throws IOException {
-      byte[] b = status.toByteArray();
-      if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
-      arg0.writeShort(b.length);
-      arg0.write(b);
-    }
-
-  }
+//  @Override
+//  public void updateLocalQueueLength(int length) {
+//    workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
+//        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
+//  }
+//
+//  @Override
+//  public List<WorkQueueStatus> getQueueLengths() {
+//    return Lists.newArrayList(endpoints.asMap().keySet());
+//  }
 
   @Override
   public void close() throws IOException {
@@ -136,13 +99,13 @@ public class HazelCache implements DistributedCache {
   }
 
   @Override
-  public PlanFragment getFragment(long fragmentId) {
-    throw new UnsupportedOperationException();
+  public PlanFragment getFragment(FragmentHandle handle) {
+    return this.fragments.get(handle);
   }
 
   @Override
   public void storeFragment(PlanFragment fragment) {
-    throw new UnsupportedOperationException();
+    fragments.put(fragment.getHandle(), fragment);
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
new file mode 100644
index 0000000..ddb2a02
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.google.common.collect.Maps;
+
+public class LocalCache implements DistributedCache {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
+
+  private volatile Map<FragmentHandle, PlanFragment> handles;
+  
+  @Override
+  public void close() throws IOException {
+    handles = null;
+  }
+
+  @Override
+  public void run() throws DrillbitStartupException {
+    handles = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public PlanFragment getFragment(FragmentHandle handle) {
+    return handles.get(handle);
+  }
+
+  @Override
+  public void storeFragment(PlanFragment fragment) {
+    handles.put(fragment.getHandle(), fragment);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
new file mode 100644
index 0000000..46bb9ee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
+
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+
+public class ProtoBufImpl {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
+  
+  public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
+    public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
+    public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
+  }
+  
+  public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
+    public HFragmentHandle() {super(FragmentHandle.PARSER);}
+    public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
+  }
+  
+  public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
+    public HPlanFragment() {super(PlanFragment.PARSER);}
+    public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
+  }
+  
+  public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
+    public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
+    public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
+    public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
new file mode 100644
index 0000000..c3a9160
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.nio.DataSerializable;
+
+public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
+  
+  T value;
+  final Parser<T> parser;
+  
+  public ProtoBufWrap(Parser<T> parser){
+    this(null, parser);
+  }
+  
+  public ProtoBufWrap(T value, Parser<T> parser){
+    this.value = value;
+    this.parser = parser;
+  }
+  
+  @Override
+  public void readData(DataInput arg0) throws IOException {
+    int len = arg0.readShort();
+    byte[] b = new byte[len];
+    arg0.readFully(b);
+    this.value = parser.parseFrom(b);
+  }
+
+  @Override
+  public void writeData(DataOutput arg0) throws IOException {
+    byte[] b = value.toByteArray();
+    if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
+    arg0.writeShort(b.length);
+    arg0.write(b);
+  }
+
+  protected T get() {
+    return value;
+  }
+
+  protected void set(T value) {
+    this.value = value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
new file mode 100644
index 0000000..dac8201
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
+
+  private IMap<HK, HV> hzMap;
+  
+  public ProtoMap(HazelcastInstance instance, String mapName){
+    hzMap = instance.getMap(mapName);
+  }
+  
+  public V get(K key){
+    Preconditions.checkNotNull(key);
+    HK hk = getNewKey(key);
+    HV hv = hzMap.get(hk);
+    if(hv == null) return null;
+    return hv.get();
+  }
+  
+  public V put(K key, V value){
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+    HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
+    return oldValue.get();
+  }
+  
+  public abstract HK getNewKey(K key);
+  public abstract HV getNewValue(V key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
deleted file mode 100644
index 5ad9ef1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.cache;
-
-public class TemplatizedLogicalPlan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedLogicalPlan.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
deleted file mode 100644
index 643720c..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.cache;
-
-public class TemplatizedPhysicalPlan {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedPhysicalPlan.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index ee63213..bb7f77e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -25,24 +25,34 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.nio.NioEventLoopGroup;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.rpc.user.UserRpcConfig;
 
 /**
  * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
  */
-public class DrillClient {
-
+public class DrillClient implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
+  
   DrillConfig config;
   private UserClient client;
   private ClusterCoordinator clusterCoordinator;
@@ -56,8 +66,17 @@ public class DrillClient {
   }
 
   public DrillClient(DrillConfig config) {
+    this(config, null);
+  }
+  
+  public DrillClient(DrillConfig config, ClusterCoordinator coordinator){
     this.config = config;
+    this.clusterCoordinator = coordinator;
   }
+  
+  
+  
+
 
   /**
    * Connects the client to a Drillbit server
@@ -65,9 +84,11 @@ public class DrillClient {
    * @throws IOException
    */
   public void connect() throws Exception {
-    this.clusterCoordinator = new ZKClusterCoordinator(this.config);
-    this.clusterCoordinator.start();
-    Thread.sleep(10000);
+    if(clusterCoordinator == null){
+      this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+      this.clusterCoordinator.start(10000);
+    }
+    
     Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints();
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
@@ -75,7 +96,8 @@ public class DrillClient {
     ByteBufAllocator bb = new PooledByteBufAllocator(true);
     this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
     try {
-      this.client.connectAsClient(endpoint.getAddress(), endpoint.getUserPort());
+      logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
+      this.client.connect(endpoint);
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
@@ -97,8 +119,37 @@ public class DrillClient {
    * @return a handle for the query result
    * @throws RpcException
    */
-  public DrillRpcFuture<QueryHandle> submitPlan(String plan) throws RpcException {
-    return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).setPlan(plan).build(), null);
+  public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
+    try {
+      ListHoldingResultsListener listener = new ListHoldingResultsListener();
+      Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
+      f.get();
+      if(listener.ex != null){
+        throw listener.ex;
+      }else{
+        return listener.results;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RpcException(e);
+    }
+  }
+  
+  private class ListHoldingResultsListener extends UserResultsListener{
+    private RpcException ex;
+    private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+    
+    @Override
+    public void submissionFailed(RpcException ex) {
+      logger.debug("Submission failed.", ex);
+      this.ex = ex;
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", result.getHeader().getIsLastChunk(), result);
+      results.add(result);
+    }
+    
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index d3580b5..7fb1f5b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.coord;
 import java.io.Closeable;
 import java.util.Collection;
 
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 /**
  * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
@@ -29,7 +29,12 @@ import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 public abstract class ClusterCoordinator implements Closeable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class);
 
-  public abstract void start() throws Exception;
+  /**
+   * Start the cluster coordinator.  Millis to wait is   
+   * @param millisToWait The maximum time to wait before throwing an exception if the cluster coordination service has not successfully started.  Use 0 to wait indefinitely.
+   * @throws Exception
+   */
+  public abstract void start(long millisToWait) throws Exception;
 
   public abstract RegistrationHandle register(DrillbitEndpoint data);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
index ce0fb92..289aa3c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
@@ -17,9 +17,9 @@
  ******************************************************************************/
 package org.apache.drill.exec.coord;
 
-import org.apache.drill.common.proto.CoordinationProtos.DrillServiceInstance;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillServiceInstance;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.netflix.curator.x.discovery.ServiceInstance;
 import com.netflix.curator.x.discovery.ServiceInstanceBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
new file mode 100644
index 0000000..5886c2c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillbitEndpointSerDe.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+public class DrillbitEndpointSerDe {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitEndpointSerDe.class);
+  
+  public static class De extends StdDeserializer<DrillbitEndpoint> {
+
+    public De() {
+      super(DrillbitEndpoint.class);
+    }
+
+    @Override
+    public DrillbitEndpoint deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
+        JsonProcessingException {
+      return DrillbitEndpoint.parseFrom(jp.getBinaryValue());
+    }
+    
+    
+  }
+  
+  
+  public static class Se extends StdSerializer<DrillbitEndpoint> {
+
+    public Se() {
+      super(DrillbitEndpoint.class);
+    }
+
+    @Override
+    public void serialize(DrillbitEndpoint value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+        JsonGenerationException {
+      jgen.writeBinary(value.toByteArray());
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
new file mode 100644
index 0000000..43a5430
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.collect.Maps;
+
+public class LocalClusterCoordinator extends ClusterCoordinator{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
+
+  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+  
+  @Override
+  public void close() throws IOException {
+    endpoints = null;
+  }
+
+  @Override
+  public void start(long millis) throws Exception {
+    logger.debug("Local Cluster Coordinator started.");
+    endpoints = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public RegistrationHandle register(DrillbitEndpoint data) {
+    logger.debug("Endpoint registered {}.", data);
+    Handle h = new Handle();
+    endpoints.put(h, data);
+    return h;
+  }
+
+  @Override
+  public void unregister(RegistrationHandle handle) {
+    endpoints.remove(handle);
+  }
+
+  @Override
+  public Collection<DrillbitEndpoint> getAvailableEndpoints() {
+    return endpoints.values();
+  }
+  
+  
+  private class Handle implements RegistrationHandle{
+    UUID id = UUID.randomUUID();
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + getOuterType().hashCode();
+      result = prime * result + ((id == null) ? 0 : id.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      Handle other = (Handle) obj;
+      if (!getOuterType().equals(other.getOuterType())) return false;
+      if (id == null) {
+        if (other.id != null) return false;
+      } else if (!id.equals(other.id)) return false;
+      return true;
+    }
+
+    private LocalClusterCoordinator getOuterType() {
+      return LocalClusterCoordinator.this;
+    }
+    
+  }
+  
+}


Mime
View raw message