drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Dunning <ted.dunn...@gmail.com>
Subject Re: [10/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.
Date Tue, 14 May 2013 03:54:01 GMT


Sent from my iPhone

On May 13, 2013, at 18:52, jacques@apache.org wrote:

> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
> new file mode 100644
> index 0000000..42a15ae
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
> @@ -0,0 +1,90 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.base;
> +
> +import java.util.List;
> +
> +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
> +
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +
> +public abstract class AbstractExchange extends AbstractSingle implements Exchange {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
> +
> +  protected int senderMajorFragmentId;
> +  protected int receiverMajorFragmentId;
> +
> +  public AbstractExchange(PhysicalOperator child) {
> +    super(child);
> +  }
> +
> +  /**
> +   * Exchanges are not executable. The Execution layer first has to set their parallelization
and convert them into
> +   * something executable
> +   */
> +  @Override
> +  public boolean isExecutable() {
> +    return false;
> +  }
> +
> +  protected abstract void setupSenders(List<DrillbitEndpoint> senderLocations)
throws PhysicalOperatorSetupException ;
> +  protected abstract void setupReceivers(List<DrillbitEndpoint> senderLocations)
throws PhysicalOperatorSetupException ;
> +  
> +  @Override
> +  public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations)
throws PhysicalOperatorSetupException {
> +    this.senderMajorFragmentId = majorFragmentId;
> +    setupSenders(senderLocations);
> +  }
> +  
> +
> +  @Override
> +  public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint>
receiverLocations) throws PhysicalOperatorSetupException {
> +    this.receiverMajorFragmentId = majorFragmentId;
> +    setupReceivers(receiverLocations);
> +  }
> +  
> +  @Override
> +  public OperatorCost getAggregateSendCost() {
> +    return getExchangeCost().getSendCost();
> +  }
> +
> +  @Override
> +  public OperatorCost getAggregateReceiveCost() {
> +    return getExchangeCost().getReceiveCost();
> +  }
> +
> +  @Override
> +  public final <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E>
physicalVisitor, X value) throws E {
> +    return physicalVisitor.visitExchange(this, value);
> +  }
> +
> +  @Override
> +  public ExchangeCost getExchangeCost(){
> +    return ExchangeCost.getSimpleEstimate(getSize());
> +  }
> +
> +  @JsonIgnore
> +  @Override
> +  public OperatorCost getCost() {
> +    return getExchangeCost().getCombinedCost();
> +  }
> +
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
> new file mode 100644
> index 0000000..f782325
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
> @@ -0,0 +1,124 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.base;
> +
> +import org.apache.drill.exec.physical.config.Filter;
> +import org.apache.drill.exec.physical.config.HashPartitionSender;
> +import org.apache.drill.exec.physical.config.HashToRandomExchange;
> +import org.apache.drill.exec.physical.config.Project;
> +import org.apache.drill.exec.physical.config.RandomReceiver;
> +import org.apache.drill.exec.physical.config.RangeSender;
> +import org.apache.drill.exec.physical.config.Screen;
> +import org.apache.drill.exec.physical.config.SingleSender;
> +import org.apache.drill.exec.physical.config.Sort;
> +import org.apache.drill.exec.physical.config.UnionExchange;
> +
> +public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements
PhysicalVisitor<T, X, E> {
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
> +
> +  @Override
> +  public T visitExchange(Exchange exchange, X value) throws E{
> +    return visitOp(exchange, value);
> +  }
> +
> +  @Override
> +  public T visitFilter(Filter filter, X value) throws E{
> +    return visitOp(filter, value);
> +  }
> +
> +  @Override
> +  public T visitProject(Project project, X value) throws E{
> +    return visitOp(project, value);
> +  }
> +
> +  @Override
> +  public T visitSort(Sort sort, X value) throws E{
> +    return visitOp(sort, value);
> +  }
> +
> +  @Override
> +  public T visitSender(Sender sender, X value) throws E {
> +    return visitOp(sender, value);
> +  }
> +
> +  @Override
> +  public T visitReceiver(Receiver receiver, X value) throws E {
> +    return visitOp(receiver, value);
> +  }
> +
> +  @Override
> +  public T visitScan(Scan<?> scan, X value) throws E{
> +    return visitOp(scan, value);
> +  }
> +
> +  @Override
> +  public T visitStore(Store store, X value) throws E{
> +    return visitOp(store, value);
> +  }
> +
> +  
> +  public T visitChildren(PhysicalOperator op, X value) throws E{
> +    for(PhysicalOperator child : op){
> +      child.accept(this, value);
> +    }
> +    return null;
> +  }
> +  
> +  @Override
> +  public T visitHashPartitionSender(HashPartitionSender op, X value) throws E {
> +    return visitSender(op, value);
> +  }
> +
> +  @Override
> +  public T visitRandomReceiver(RandomReceiver op, X value) throws E {
> +    return visitReceiver(op, value);
> +  }
> +
> +  @Override
> +  public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E {
> +    return visitExchange(op, value);
> +  }
> +
> +  @Override
> +  public T visitRangeSender(RangeSender op, X value) throws E {
> +    return visitSender(op, value);
> +  }
> +
> +  @Override
> +  public T visitScreen(Screen op, X value) throws E {
> +    return visitStore(op, value);
> +  }
> +
> +  @Override
> +  public T visitSingleSender(SingleSender op, X value) throws E {
> +    return visitSender(op, value);
> +  }
> +
> +  @Override
> +  public T visitUnionExchange(UnionExchange op, X value) throws E {
> +    return visitExchange(op, value);
> +  }
> +
> +  @Override
> +  public T visitOp(PhysicalOperator op, X value) throws E{
> +    throw new UnsupportedOperationException(String.format(
> +        "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator
type %s.", this
> +            .getClass().getCanonicalName(), op.getClass().getCanonicalName()));
> +  }
> +
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
> new file mode 100644
> index 0000000..e8ba19c
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
> @@ -0,0 +1,63 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.base;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.exec.physical.OperatorCost;
> +
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Iterators;
> +
> +public abstract class AbstractReceiver extends AbstractBase implements Receiver{
> +
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractReceiver.class);
> +
> +  private final int oppositeMajorFragmentId; 
> +  
> +  public AbstractReceiver(int oppositeMajorFragmentId){
> +    this.oppositeMajorFragmentId = oppositeMajorFragmentId;
> +  }
> +  
> +  @Override
> +  public Iterator<PhysicalOperator> iterator() {
> +    return Iterators.emptyIterator();
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor,
X value) throws E {
> +    return physicalVisitor.visitReceiver(this, value);
> +  }
> +
> +  @Override
> +  public final PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
{
> +    Preconditions.checkArgument(children.isEmpty());
> +    //rewriting is unnecessary since the inputs haven't changed.
> +    return this;
> +  }
> +
> +  @JsonProperty("sender-major-fragment")
> +  public int getOppositeMajorFragmentId() {
> +    return oppositeMajorFragmentId;
> +  }
> +
> +}
> +
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
> new file mode 100644
> index 0000000..dbde9c5
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
> @@ -0,0 +1,84 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + * 
> + * http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + ******************************************************************************/
> +package org.apache.drill.exec.physical.base;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.drill.exec.physical.OperatorCost;
> +import org.apache.drill.exec.physical.ReadEntry;
> +
> +import com.fasterxml.jackson.annotation.JsonIgnore;
> +import com.fasterxml.jackson.annotation.JsonProperty;
> +import com.google.common.collect.Iterators;
> +
> +public abstract class AbstractScan<R extends ReadEntry> extends AbstractBase implements
Scan<R>{
> +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractScan.class);
> +  
> +  protected final List<R> readEntries;
> +  private final OperatorCost cost;
> +  private final Size size;
> +  
> +  public AbstractScan(List<R> readEntries) {
> +    this.readEntries = readEntries;
> +    OperatorCost cost = new OperatorCost(0,0,0,0);
> +    Size size = new Size(0,0);
> +    for(R r : readEntries){
> +      cost = cost.add(r.getCost());
> +      size = size.add(r.getSize());
> +    }
> +    this.cost = cost;
> +    this.size = size;
> +  }
> +
> +  @Override
> +  @JsonProperty("entries")
> +  public List<R> getReadEntries() {
> +    return readEntries;
> +  }
> +  
> +  @Override
> +  public Iterator<PhysicalOperator> iterator() {
> +    return Iterators.emptyIterator();
> +  }
> +
> +  @Override
> +  public boolean isExecutable() {
> +    return true;
> +  }
> +
> +  @Override
> +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor,
X value) throws E{
> +    return physicalVisitor.visitScan(this, value);
> +  }
> +
> +  @Override
> +  public OperatorCost getCost() {
> +    return cost;
> +  }
> +
> +  @Override
> +  public Size getSize() {
> +    return size;
> +  }
> +  
> +  
> +  
> +  
> +  
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
> ----------------------------------------------------------------------
> diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
> new file mode 100644
> index 0000000..f8c22b3
> --- /dev/null
> +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java
> @@ -0,0 +1,53 @@
> +/*******************************************************************************
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See

Mime
View raw message