drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Dunning <ted.dunn...@gmail.com>
Subject Re: [09/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 03:54:25 GMT


Sent from my iPhone

On May 13, 2013, at 18:52, jacques@apache.org wrote:

> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
> new file mode 100644
> index 0000000..0044628
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
> @@ -0,0 +1,58 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.io.IOException;
> +import java.util.Collection;
> +
> +import org.apache.drill.common.logical.data.Scan;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +import org.apache.drill.exec.store.AbstractStorageEngine;
> +import org.apache.drill.exec.store.RecordReader;
> +import org.apache.drill.exec.store.StorageEngine;
> +import org.apache.drill.exec.store.StorageEngine.ReadEntry;
> +
> +import com.google.common.collect.ListMultimap;
> +
> +public class MockStorageEngine extends AbstractStorageEngine{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
> +
> +  @Override
> +  public boolean supportsRead() {
> +    return true;
> +  }
> +
> +  @Override
> +  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
> +    return null;
> +  }
> +
> +  @Override
> +  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
> +    return null;
> +  }
> +
> +  @Override
> +  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
> +    return null;
> +  }
> +
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
> new file mode 100644
> index 0000000..639d0d2
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
> @@ -0,0 +1,75 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.Collections;
> +import java.util.List;
> +
> +import org.apache.drill.exec.physical.EndpointAffinity;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractStore;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.Store;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +@JsonTypeName("mock-store")
> +public class MockStorePOP extends AbstractStore {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
> +
> +  @JsonCreator
> +  public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
> +    super(child);
> +  }
> +
> +  public int getMaxWidth() {
> +    return 1;
> +  }
> +
> +  @Override
> +  public List<EndpointAffinity> getOperatorAffinity() {
> +    return Collections.emptyList();
> +  }
> +
> +  @Override
> +  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
> +    
> +  }
> +
> +  @Override
> +  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
> +    return new MockStorePOP(child);
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new MockStorePOP(child);
> +  }
> +
> +
> +  
> +
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
> new file mode 100644
> index 0000000..eb77eeb
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
> @@ -0,0 +1,47 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import org.apache.drill.common.expression.LogicalExpression;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +
> +public class PartitionRange {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionRange.class);
> +  
> +  private LogicalExpression start;
> +  private LogicalExpression finish;
> +  
> +  @JsonCreator
> +  public PartitionRange(@JsonProperty("start") LogicalExpression start, @JsonProperty("finish") LogicalExpression finish) {
> +    super();
> +    this.start = start;
> +    this.finish = finish;
> +  }
> +
> +  public LogicalExpression getStart() {
> +    return start;
> +  }
> +
> +  public LogicalExpression getFinish() {
> +    return finish;
> +  }
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
> new file mode 100644
> index 0000000..e869393
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
> @@ -0,0 +1,72 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.common.logical.data.NamedExpression;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractSingle;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Size;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +@JsonTypeName("project")
> +public class Project extends AbstractSingle{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Project.class);
> +
> +  private final List<NamedExpression> exprs;
> +  
> +  @JsonCreator
> +  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
> +    super(child);
> +    this.exprs = exprs;
> +  }
> +
> +  public List<NamedExpression> getExprs() {
> +    return exprs;
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
> +    return physicalVisitor.visitProject(this, value);
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
> +  }
> +  
> +  @Override
> +  public Size getSize() {
> +    //TODO: This should really change the row width...
> +    return child.getSize();
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new Project(exprs, child);
> +  }
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
> new file mode 100644
> index 0000000..ed41586
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
> @@ -0,0 +1,83 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.List;
> +
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractReceiver;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Size;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +import com.google.common.base.Preconditions;
> +
> +@JsonTypeName("random-receiver")
> +public class RandomReceiver extends AbstractReceiver{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
> +
> +  private List<DrillbitEndpoint> senders;
> +  
> +  @JsonCreator
> +  public RandomReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, @JsonProperty("senders") List<DrillbitEndpoint> senders){
> +    super(oppositeMajorFragmentId);
> +    this.senders = senders;
> +  }
> +  
> +  @Override
> +  public List<DrillbitEndpoint> getProvidingEndpoints() {
> +    return senders;
> +  }
> +
> +  @Override
> +  public boolean supportsOutOfOrderExchange() {
> +    return true;
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    //TODO: deal with receiver cost through exchange.
> +    return new OperatorCost(1,1,1,1);
> +  }
> +
> +  
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
> +    return physicalVisitor.visitRandomReceiver(this, value);
> +  }
> +
> +  @Override
> +  public Size getSize() {
> +    //TODO: deal with size info through exchange.
> +    return new Size(1,1);
> +  }
> +
> +  @Override
> +  public int getOppositeMajorFragmentId() {
> +    return 0;
> +  }
> +
> +  
> +
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
> new file mode 100644
> index 0000000..7d64dba
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
> @@ -0,0 +1,72 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.List;
> +
> +import org.apache.drill.exec.physical.base.AbstractSender;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +@JsonTypeName("range-sender")
> +public class RangeSender extends AbstractSender{
> +
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class);
> +
> +  List<EndpointPartition> partitions;
> +  
> +  @JsonCreator
> +  public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) {
> +    super(oppositeMajorFragmentId, child);
> +    this.partitions = partitions;
> +  }
> +
> +  @Override
> +  public List<DrillbitEndpoint> getDestinations() {
> +    return null;
> +  }
> +
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new RangeSender(oppositeMajorFragmentId, child, partitions);
> +  }
> +
> +
> +  public static class EndpointPartition{
> +    private final PartitionRange range;
> +    private final DrillbitEndpoint endpoint;
> +    
> +    @JsonCreator
> +    public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) {
> +      super();
> +      this.range = range;
> +      this.endpoint = endpoint;
> +    }
> +    public PartitionRange getRange() {
> +      return range;
> +    }
> +    public DrillbitEndpoint getEndpoint() {
> +      return endpoint;
> +    }
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
> new file mode 100644
> index 0000000..86a201d
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
> @@ -0,0 +1,106 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.Collections;
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
> +import org.apache.drill.exec.physical.EndpointAffinity;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractStore;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Root;
> +import org.apache.drill.exec.physical.base.Size;
> +import org.apache.drill.exec.physical.base.Store;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JacksonInject;
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +import com.google.common.base.Preconditions;
> +
> +@JsonTypeName("screen")
> +public class Screen extends AbstractStore implements Root{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Screen.class);
> +
> +  private final DrillbitEndpoint endpoint;
> +
> +  public Screen(@JsonProperty("child") PhysicalOperator child, @JacksonInject DrillbitEndpoint endpoint) {
> +    super(child);
> +    this.endpoint = endpoint;
> +  }
> +
> +  @Override
> +  public List<EndpointAffinity> getOperatorAffinity() {
> +    return Collections.singletonList(new EndpointAffinity(endpoint, 1000000000000l));
> +  }
> +
> +  @Override
> +  public int getMaxWidth() {
> +    return 1;
> +  }
> +
> +  @Override
> +  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
> +    // we actually don't have to do anything since nothing should have changed. we'll check just check that things
> +    // didn't get screwed up.
> +    if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
> +    DrillbitEndpoint endpoint = endpoints.iterator().next();
> +    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
> +    if (!endpoint.equals(this.endpoint)) {
> +      throw new PhysicalOperatorSetupException(String.format(
> +          "A Screen operator can only be assigned to its home node.  Expected endpoint %s, Actual endpoint: %s",
> +          this.endpoint, endpoint));
> +    }
> +  }
> +
> +  @Override
> +  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
> +    return new Screen(child, endpoint);
> +  }
> +
> +  @JsonIgnore
> +  public DrillbitEndpoint getEndpoint() {
> +    return endpoint;
> +  }
> +
> +  @Override
> +  public String toString() {
> +    return "Screen [endpoint=" + endpoint + ", getChild()=" + getChild() + "]";
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new Screen(child, endpoint);
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
> +    return physicalVisitor.visitScreen(this, value);
> +  }
> +
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
> new file mode 100644
> index 0000000..79d937a
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
> @@ -0,0 +1,78 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.Collections;
> +import java.util.List;
> +
> +import org.apache.drill.common.graph.GraphVisitor;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractSender;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +/**
> + * Sender that pushes all data to a single destination node.
> + */
> +@JsonTypeName("single-sender")
> +public class SingleSender extends AbstractSender {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSender.class);
> +
> +  private final DrillbitEndpoint destination;
> +  
> +  @JsonCreator
> +  public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) {
> +    super(oppositeMajorFragmentId, child);
> +    this.destination = destination;
> +  }
> +
> +  @Override
> +  @JsonIgnore
> +  public List<DrillbitEndpoint> getDestinations() {
> +    return Collections.singletonList(destination);
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    long recordSize = child.getSize().getRecordSize() * child.getSize().getRecordCount();
> +    return new OperatorCost((float) recordSize, recordSize, 0, child.getSize().getRecordCount()/(1<<16));
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new SingleSender(oppositeMajorFragmentId, child, destination);
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
> +    return physicalVisitor.visitSingleSender(this, value);
> +  }
> + 
> +
> +  public DrillbitEndpoint getDestination() {
> +    return destination;
> +  }
> + 
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
> new file mode 100644
> index 0000000..e4ece6b
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
> @@ -0,0 +1,86 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.defs.OrderDef;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.base.AbstractSingle;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Size;
> +
> +import com.fasterxml.jackson.annotation.JsonCreator;
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +@JsonTypeName("sort")
> +public class Sort extends AbstractSingle{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
> +  
> +  private final List<OrderDef> orderings;
> +  private boolean reverse = false;
> +  
> +  @JsonCreator
> +  public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
> +    super(child);
> +    this.orderings = orderings;
> +    this.reverse = reverse;
> +  }
> +
> +  public List<OrderDef> getOrderings() {
> +    return orderings;
> +  }
> +
> +  public boolean getReverse() {
> +    return reverse;
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
> +    return physicalVisitor.visitSort(this, value);
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    Size childSize = child.getSize();
> +    long n = childSize.getRecordCount();
> +    long width = childSize.getRecordSize();
> +
> +    //TODO: Magic Number, let's assume 1/10 of data can fit in memory. 
> +    int k = 10;
> +    long n2 = n/k;
> +    double cpuCost = 
> +        k * n2 * (Math.log(n2)/Math.log(2)) + // 
> +        n * (Math.log(k)/Math.log(2));
> +    double diskCost = n*width*2;
> +    
> +    return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new Sort(child, orderings, reverse);
> +  }
> +
> +    
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
> new file mode 100644
> index 0000000..56467ce
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
> @@ -0,0 +1,79 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.config;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
> +import org.apache.drill.exec.physical.base.AbstractExchange;
> +import org.apache.drill.exec.physical.base.ExchangeCost;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.PhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Receiver;
> +import org.apache.drill.exec.physical.base.Sender;
> +import org.apache.drill.exec.physical.base.Size;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.fasterxml.jackson.annotation.JsonTypeName;
> +
> +@JsonTypeName("union-exchange")
> +public class UnionExchange extends AbstractExchange{
> +
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
> +
> +  private List<DrillbitEndpoint> senderLocations;
> +  private DrillbitEndpoint destinationLocation;
> +  
> +  public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
> +    super(child);
> +  }
> +  
> +  @Override
> +  public void setupSenders(List<DrillbitEndpoint> senderLocations) {
> +    this.senderLocations = senderLocations;
> +  }
> +
> +  @Override
> +  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
> +    if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
> +    this.destinationLocation = receiverLocations.iterator().next();
> +  }
> +
> +  @Override
> +  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
> +    return new SingleSender(this.receiverMajorFragmentId, child, destinationLocation);
> +  }
> +
> +  @Override
> +  public Receiver getReceiver(int minorFragmentId) {
> +    return new RandomReceiver(this.senderMajorFragmentId, senderLocations);
> +  }
> +
> +  @Override
> +  public int getMaxSendWidth() {
> +    return Integer.MAX_VALUE;
> +  }
> +
> +  @Override
> +  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
> +    return new UnionExchange(child);
> +  }
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
> new file mode 100644
> index 0000000..9a7df56
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
> @@ -0,0 +1,31 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +public interface BatchCreator<T extends PhysicalOperator> {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
> +  
> +  public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
> new file mode 100644
> index 0000000..6592ca1
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
> @@ -0,0 +1,108 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.InvalidValueAccessor;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.vector.SelectionVector;
> +import org.apache.drill.exec.record.vector.ValueVector;
> +
> +public abstract class FilterRecordBatch implements RecordBatch {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
> +
> +  private RecordBatch incoming;
> +  private SelectionVector selectionVector;
> +  private BatchSchema schema;
> +  private FilteringRecordBatchTransformer transformer;
> +  private int outstanding;
> +
> +  public FilterRecordBatch(RecordBatch batch) {
> +    this.incoming = batch;
> +  }
> +
> +  @Override
> +  public FragmentContext getContext() {
> +    return incoming.getContext();
> +  }
> +
> +  @Override
> +  public BatchSchema getSchema() {
> +    return schema;
> +  }
> +
> +  @Override
> +  public int getRecordCount() {
> +    return 0;
> +  }
> +
> +  @Override
> +  public void kill() {
> +    incoming.kill();
> +  }
> +
> +  @Override
> +  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
> +    return null;
> +  }
> +
> +  abstract int applyFilter(SelectionVector vector, int count);
> +
> +  /**
> +   * Release all assets.
> +   */
> +  private void close() {
> +
> +  }
> +
> +  @Override
> +  public IterOutcome next() {
> +    while (true) {
> +      IterOutcome o = incoming.next();
> +      switch (o) {
> +      case OK_NEW_SCHEMA:
> +        transformer = incoming.getContext().getFilteringExpression(null);
> +        schema = transformer.getSchema();
> +        // fall through to ok.
> +      case OK:
> +
> +      case NONE:
> +      case STOP:
> +        close();
> +        return IterOutcome.STOP;
> +      }
> +
> +      if (outstanding > 0) {
> +        // move data to output location.
> +
> +        for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
> +
> +        }
> +      }
> +
> +      // make sure the bit vector is as large as the current record batch.
> +      if (selectionVector.capacity() < incoming.getRecordCount()) {
> +        selectionVector.allocateNew(incoming.getRecordCount());
> +      }
> +
> +      return null;
> +    }
> +
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
> new file mode 100644
> index 0000000..191521a
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
> @@ -0,0 +1,58 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.vector.SelectionVector;
> +
> +public abstract class FilteringRecordBatchTransformer {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
> +  
> +  final RecordBatch incoming;
> +  final SelectionVector selectionVector;
> +  final BatchSchema schema;
> +  
> +  public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector selectionVector) {
> +    super();
> +    this.incoming = incoming;
> +    this.selectionVector = selectionVector;
> +    this.schema = innerSetup();
> +  }
> +
> +  public abstract BatchSchema innerSetup();
> +  
> +  /**
> +   * Applies the filter to the selection index.  Ignores any values in the selection vector, instead creating a.
> +   * @return
> +   */
> +  public abstract int apply();
> +  
> +  /**
> +   * Applies the filter to the selection index.  Utilizes the existing selection index and only evaluates on those records.
> +   * @return
> +   */
> +  public abstract int applyWithSelection();
> +
> +  public BatchSchema getSchema() {
> +    return schema;
> +  }
> +  
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> new file mode 100644
> index 0000000..d98c107
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
> @@ -0,0 +1,102 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.Collections;
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
> +import org.apache.drill.exec.physical.base.FragmentRoot;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.physical.base.Scan;
> +import org.apache.drill.exec.physical.config.MockScanBatchCreator;
> +import org.apache.drill.exec.physical.config.MockScanPOP;
> +import org.apache.drill.exec.physical.config.RandomReceiver;
> +import org.apache.drill.exec.physical.config.Screen;
> +import org.apache.drill.exec.physical.config.SingleSender;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Lists;
> +
> +public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
> +
> +  private MockScanBatchCreator msc = new MockScanBatchCreator();
> +  private ScreenCreator sc = new ScreenCreator();
> +  private RandomReceiverCreator rrc = new RandomReceiverCreator();
> +  private SingleSenderCreator ssc = new SingleSenderCreator();
> +  private RootExec root = null;
> +  
> +  private ImplCreator(){}
> +  
> +  public RootExec getRoot(){
> +    return root;
> +  }
> +  
> +  
> +  @Override
> +  public RecordBatch visitScan(Scan<?> scan, FragmentContext context) throws ExecutionSetupException {
> +    Preconditions.checkNotNull(scan);
> +    Preconditions.checkNotNull(context);
> +    
> +    if(scan instanceof MockScanPOP){
> +      return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch> emptyList());
> +    }else{
> +      return super.visitScan(scan, context);  
> +    }
> +    
> +  }
> +
> +  @Override
> +  public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
> +    Preconditions.checkArgument(root == null);
> +    root = sc.getRoot(context, op, getChildren(op, context));
> +    return null;
> +  }
> +
> +  
> +  
> +  @Override
> +  public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
> +    root = ssc.getRoot(context, op, getChildren(op, context));
> +    return null;
> +  }
> +
> +  @Override
> +  public RecordBatch visitRandomReceiver(RandomReceiver op, FragmentContext context) throws ExecutionSetupException {
> +    return rrc.getBatch(context, op, null);
> +  }
> +
> +  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
> +    List<RecordBatch> children = Lists.newArrayList();
> +    for(PhysicalOperator child : op){
> +      children.add(child.accept(this, context));
> +    }
> +    return children;
> +  }
> +  
> +  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
> +    ImplCreator i = new ImplCreator();
> +    root.accept(i, context);
> +    if(i.root == null) throw new ExecutionSetupException("The provided fragment did not have a root node that correctly created a RootExec value.");
> +    return i.getRoot();
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
> new file mode 100644
> index 0000000..ce0cf66
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
> @@ -0,0 +1,28 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.vector.ValueVector;
> +
> +public interface OutputMutator {
> +  public void removeField(int fieldId) throws SchemaChangeException;
> +  public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
> +  public void setNewSchema() throws SchemaChangeException ;
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
> new file mode 100644
> index 0000000..9995bc2
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
> @@ -0,0 +1,29 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.lang.annotation.ElementType;
> +import java.lang.annotation.Retention;
> +import java.lang.annotation.RetentionPolicy;
> +import java.lang.annotation.Target;
> +
> +@Target({ElementType.TYPE})
> +@Retention(RetentionPolicy.RUNTIME)
> +public @interface PhysicalConfig {
> +  Class<?> value();
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
> new file mode 100644
> index 0000000..4b991f8
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
> @@ -0,0 +1,46 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.RandomReceiver;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.work.batch.IncomingBuffers;
> +import org.apache.drill.exec.work.batch.RawBatchBuffer;
> +
> +public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiverCreator.class);
> +
> +  @Override
> +  public RecordBatch getBatch(FragmentContext context, RandomReceiver receiver, List<RecordBatch> children)
> +      throws ExecutionSetupException {
> +    assert children == null || children.isEmpty();
> +    IncomingBuffers bufHolder = context.getBuffers();
> +    assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
> +    
> +    RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
> +    assert buffers.length == 1;
> +    RawBatchBuffer buffer = buffers[0];
> +    return new WireRecordBatch(context, buffer);
> +  }
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
> new file mode 100644
> index 0000000..80def05
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
> @@ -0,0 +1,31 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.base.PhysicalOperator;
> +import org.apache.drill.exec.record.RecordBatch;
> +
> +public interface RootCreator<T extends PhysicalOperator> {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
> +  
> +  public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
> new file mode 100644
> index 0000000..3f8aac7
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
> @@ -0,0 +1,40 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import org.apache.drill.exec.exception.FragmentSetupException;
> +
> +/**
> + * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
> + * output nodes and storage nodes.  They are there driving force behind the completion of a query.
> + */
> +public interface RootExec {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
> +  
> +  /**
> +   * Do the next batch of work.  
> +   * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
> +   */
> +  public boolean next();
> +  
> +  /**
> +   * Inform all children to clean up and go away.
> +   */
> +  public void stop();
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> new file mode 100644
> index 0000000..33c1e29
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
> @@ -0,0 +1,172 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import io.netty.buffer.ByteBuf;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
> +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.InvalidValueAccessor;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.SchemaBuilder;
> +import org.apache.drill.exec.record.WritableBatch;
> +import org.apache.drill.exec.record.vector.ValueVector;
> +import org.apache.drill.exec.store.RecordReader;
> +
> +import com.carrotsearch.hppc.IntObjectOpenHashMap;
> +import com.carrotsearch.hppc.procedures.IntObjectProcedure;
> +import com.google.common.collect.Lists;
> +
> +/**
> + * Record batch used for a particular scan. Operators against one or more
> + */
> +public class ScanBatch implements RecordBatch {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
> +
> +  private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
> +  private BatchSchema schema;
> +  private int recordCount;
> +  private boolean schemaChanged = true;
> +  private final FragmentContext context;
> +  private Iterator<RecordReader> readers;
> +  private RecordReader currentReader;
> +  private final Mutator mutator = new Mutator();
> +
> +  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers)
> +      throws ExecutionSetupException {
> +    this.context = context;
> +    this.readers = readers;
> +    if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
> +    this.currentReader = readers.next();
> +    this.currentReader.setup(mutator);
> +  }
> +
> +  private void schemaChanged() {
> +    schema = null;
> +    schemaChanged = true;
> +  }
> +
> +  @Override
> +  public FragmentContext getContext() {
> +    return context;
> +  }
> +
> +  @Override
> +  public BatchSchema getSchema() {
> +    return schema;
> +  }
> +
> +  @Override
> +  public int getRecordCount() {
> +    return recordCount;
> +  }
> +
> +  @Override
> +  public void kill() {
> +    releaseAssets();
> +  }
> +
> +  private void releaseAssets() {
> +    fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
> +      @Override
> +      public void apply(int key, ValueVector<?> value) {
> +        value.close();
> +      }
> +    });
> +  }
> +
> +  @SuppressWarnings("unchecked")
> +  @Override
> +  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
> +    if (fields.containsKey(fieldId)) throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
> +    ValueVector<?> vector = this.fields.lget();
> +    if (vector.getClass().isAssignableFrom(clazz)) {
> +      return (T) vector;
> +    } else {
> +      throw new InvalidValueAccessor(String.format(
> +          "You requested a field accessor of type %s for field id %d but the actual type was %s.",
> +          clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
> +    }
> +  }
> +
> +  @Override
> +  public IterOutcome next() {
> +    while ((recordCount = currentReader.next()) == 0) {
> +      try {
> +        if (!readers.hasNext()) {
> +          currentReader.cleanup();
> +          releaseAssets();
> +          return IterOutcome.NONE;
> +        }
> +        currentReader.cleanup();
> +        currentReader = readers.next();
> +        currentReader.setup(mutator);
> +      } catch (ExecutionSetupException e) {
> +        this.context.fail(e);
> +        releaseAssets();
> +        return IterOutcome.STOP;
> +      }
> +    }
> +
> +    if (schemaChanged) {
> +      schemaChanged = false;
> +      return IterOutcome.OK_NEW_SCHEMA;
> +    } else {
> +      return IterOutcome.OK;
> +    }
> +  }
> +
> +  private class Mutator implements OutputMutator {
> +    private SchemaBuilder builder = BatchSchema.newBuilder();
> +    
> +    public void removeField(int fieldId) throws SchemaChangeException {
> +      schemaChanged();
> +      ValueVector<?> v = fields.remove(fieldId);
> +      if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
> +      v.close();
> +    }
> +
> +    public void addField(int fieldId, ValueVector<?> vector) {
> +      schemaChanged();
> +      ValueVector<?> v = fields.put(fieldId, vector);
> +      vector.getField();
> +      builder.addField(vector.getField());
> +      if (v != null) v.close();
> +    }
> +
> +    @Override
> +    public void setNewSchema() throws SchemaChangeException {
> +      ScanBatch.this.schema = this.builder.build();
> +      ScanBatch.this.schemaChanged = true;
> +    }
> +
> +  }
> +
> +  @Override
> +  public WritableBatch getWritableBatch() {
> +    return WritableBatch.get(this.getRecordCount(), fields);
> +  }
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
> new file mode 100644
> index 0000000..c0711db
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
> @@ -0,0 +1,90 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.List;
> +
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.Screen;
> +import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
> +import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
> +import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
> +
> +import com.google.common.base.Preconditions;
> +
> +public class ScreenCreator implements RootCreator<Screen>{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
> +
> +  @Override
> +  public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
> +    Preconditions.checkArgument(children.size() == 1);
> +    return new ScreenRoot(context, children.iterator().next());
> +  }
> +  
> +  
> +  private static class ScreenRoot implements RootExec{
> +
> +    final RecordBatch incoming;
> +    final FragmentContext context;
> +    final UserClientConnection connection;
> +    private RecordMaterializer materializer;
> +    
> +    public ScreenRoot(FragmentContext context, RecordBatch incoming){
> +      assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
> +
> +      this.context = context;
> +      this.incoming = incoming;
> +      this.connection = context.getConnection();
> +    }
> +    
> +    @Override
> +    public boolean next() {
> +      IterOutcome outcome = incoming.next();
> +      boolean isLast = false;
> +      switch(outcome){
> +      case NONE:
> +      case STOP:
> +        connection.sendResult(materializer.convertNext(true));
> +        context.batchesCompleted.inc(1);
> +        context.recordsCompleted.inc(incoming.getRecordCount());
> +        return false;
> +        
> +      case OK_NEW_SCHEMA:
> +        materializer = new VectorRecordMaterializer(context, incoming);
> +        // fall through.
> +        // fall through
> +      case OK:
> +        connection.sendResult(materializer.convertNext(false));
> +        context.batchesCompleted.inc(1);
> +        context.recordsCompleted.inc(incoming.getRecordCount());
> +        return !isLast;
> +      default:
> +        throw new UnsupportedOperationException();
> +      }
> +    }
> +
> +    @Override
> +    public void stop() {
> +      incoming.kill();
> +    }
> +
> +    
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
> new file mode 100644
> index 0000000..60c2d78
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
> @@ -0,0 +1,89 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.ExecutionSetupException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.physical.config.SingleSender;
> +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
> +import org.apache.drill.exec.record.FragmentWritableBatch;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
> +import org.apache.drill.exec.rpc.bit.BitTunnel;
> +
> +public class SingleSenderCreator implements RootCreator<SingleSender>{
> +
> +  @Override
> +  public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
> +      throws ExecutionSetupException {
> +    assert children != null && children.size() == 1;
> +    return new SingleSenderRootExec(context, children.iterator().next(), config);
> +  }
> +  
> +  
> +  private static class SingleSenderRootExec implements RootExec{
> +    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
> +    private RecordBatch incoming;
> +    private BitTunnel tunnel;
> +    private FragmentHandle handle;
> +    private int recMajor;
> +    private FragmentContext context;
> +    
> +    public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
> +      logger.debug("Creating single sender root exec base on config: {}", config);
> +      this.incoming = batch;
> +      this.handle = context.getHandle();
> +      this.recMajor = config.getOppositeMajorFragmentId();
> +      this.tunnel = context.getCommunicator().getTunnel(config.getDestination());
> +      this.context = context;
> +    }
> +    
> +    @Override
> +    public boolean next() {
> +      IterOutcome out = incoming.next();
> +      logger.debug("Outcome of sender next {}", out);
> +      switch(out){
> +      case STOP:
> +      case NONE:
> +        FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
> +        tunnel.sendRecordBatch(context, b2);
> +        return false;
> +        
> +
> +      case OK:
> +      case OK_NEW_SCHEMA:
> +        FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
> +        tunnel.sendRecordBatch(context, batch);
> +        return true;
> +
> +      case NOT_YET:
> +      default:
> +        throw new IllegalStateException();
> +      }
> +    }
> +
> +    @Override
> +    public void stop() {
> +    }
> +    
> +    
> +    
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> new file mode 100644
> index 0000000..fc7f833
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
> @@ -0,0 +1,99 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl;
> +
> +import java.util.Iterator;
> +
> +import org.apache.drill.exec.exception.SchemaChangeException;
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
> +import org.apache.drill.exec.record.BatchSchema;
> +import org.apache.drill.exec.record.InvalidValueAccessor;
> +import org.apache.drill.exec.record.RawFragmentBatch;
> +import org.apache.drill.exec.record.RawFragmentBatchProvider;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.WritableBatch;
> +import org.apache.drill.exec.record.RecordBatch.IterOutcome;
> +import org.apache.drill.exec.record.vector.ValueVector;
> +
> +public class WireRecordBatch implements RecordBatch{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
> +
> +  private RecordBatchLoader batchLoader;
> +  private RawFragmentBatchProvider fragProvider;
> +  private FragmentContext context;
> +
> +  
> +  public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
> +    this.fragProvider = fragProvider;
> +    this.context = context;
> +    this.batchLoader = new RecordBatchLoader(context.getAllocator());
> +  }
> +
> +  @Override
> +  public FragmentContext getContext() {
> +    return context;
> +  }
> +
> +  @Override
> +  public BatchSchema getSchema() {
> +    return null;
> +  }
> +
> +  @Override
> +  public int getRecordCount() {
> +    return batchLoader.getRecordCount();
> +  }
> +
> +  @Override
> +  public void kill() {
> +    fragProvider.kill(context);
> +  }
> +
> +  @Override
> +  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
> +    return batchLoader.getValueVector(fieldId, clazz);
> +  }
> +
> +  @Override
> +  public IterOutcome next() {
> +    RawFragmentBatch batch = this.fragProvider.getNext();
> +    try{
> +      if(batch == null) return IterOutcome.NONE;
> +
> +      RecordBatchDef rbd = batch.getHeader().getDef();
> +      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
> +      if(schemaChanged){
> +        return IterOutcome.OK_NEW_SCHEMA;
> +      }else{
> +        return IterOutcome.OK;
> +      }
> +    }catch(SchemaChangeException ex){
> +      context.fail(ex);
> +      return IterOutcome.STOP;
> +    }
> +  }
> +
> +  @Override
> +  public WritableBatch getWritableBatch() {
> +    return batchLoader.getWritableBatch();
> +  }
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
> new file mode 100644
> index 0000000..187e6e9
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
> @@ -0,0 +1,46 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl.materialize;
> +
> +import io.netty.buffer.ByteBuf;
> +
> +import org.apache.drill.exec.proto.UserProtos.QueryResult;
> +import org.apache.drill.exec.record.WritableBatch;
> +
> +public class QueryWritableBatch {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
> +  
> +  private final QueryResult header;
> +  private final ByteBuf[] buffers;
> +  
> +  
> +  public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
> +    super();
> +    this.header = header;
> +    this.buffers = buffers;
> +  }
> +
> +  public ByteBuf[] getBuffers(){
> +    return buffers;
> +  }
> +
> +  public QueryResult getHeader() {
> +    return header;
> +  }
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
> new file mode 100644
> index 0000000..17c65e9
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
> @@ -0,0 +1,31 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl.materialize;
> +
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.proto.UserBitShared.QueryId;
> +import org.apache.drill.exec.proto.UserProtos.QueryResult;
> +import org.apache.drill.exec.record.MaterializedField;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.WritableBatch;
> +
> +public interface RecordMaterializer {
> +  
> +  public QueryWritableBatch convertNext(boolean isLast);
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
> new file mode 100644
> index 0000000..e2d2eb9
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
> @@ -0,0 +1,52 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.impl.materialize;
> +
> +import org.apache.drill.exec.ops.FragmentContext;
> +import org.apache.drill.exec.proto.UserBitShared.QueryId;
> +import org.apache.drill.exec.proto.UserProtos.QueryResult;
> +import org.apache.drill.exec.record.MaterializedField;
> +import org.apache.drill.exec.record.RecordBatch;
> +import org.apache.drill.exec.record.WritableBatch;
> +
> +public class VectorRecordMaterializer implements RecordMaterializer{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);
> +
> +  private QueryId queryId;
> +  private RecordBatch batch;
> +
> +  public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
> +    this.queryId = context.getHandle().getQueryId();
> +    this.batch = batch;
> +
> +    for (MaterializedField f : batch.getSchema()) {
> +      logger.debug("New Field: {}", f);
> +    }
> +  }
> +
> +  public QueryWritableBatch convertNext(boolean isLast) {
> +    WritableBatch w = batch.getWritableBatch();
> +
> +    QueryResult header = QueryResult.newBuilder() //
> +        .setQueryId(queryId) //
> +        .setRowCount(batch.getRecordCount()) //
> +        .setDef(w.getDef()).setIsLastChunk(isLast).build();
> +    QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
> +    return batch;
> +  }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
> new file mode 100644
> index 0000000..9b2cb85
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
> @@ -0,0 +1,45 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.planner;
> +
> +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
> +import org.apache.drill.exec.physical.base.Exchange;
> +import org.apache.drill.exec.planner.fragment.Wrapper;
> +
> +public abstract class AbstractOpWrapperVisitor<RET, EXCEP extends Throwable> extends
> +    AbstractPhysicalVisitor<RET, Wrapper, EXCEP> {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractOpWrapperVisitor.class);
> +
> +  @Override
> +  public final RET visitExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
> +    if (wrapper.getNode().getSendingExchange() == exchange) {
> +      return visitSendingExchange(exchange, wrapper);
> +    } else {
> +      return visitReceivingExchange(exchange, wrapper);
> +    }
> +  }
> +
> +  public RET visitSendingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
> +    return visitOp(exchange, wrapper);
> +  }
> +
> +  public RET visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
> +    return visitOp(exchange, wrapper);
> +  }
> +
> +}
> 

Mime
View raw message