drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [11/38] DRILL-257: Move SQL parsing to server side. Switch to Avatica based JDBC driver. Update QuerySubmitter to support SQL queries. Update SqlAccesors to support getObject() Remove ref, clean up SQL packages some. Various performance fixes. Updating
Date Tue, 04 Mar 2014 08:07:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/SimpleEvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/SimpleEvaluationVisitor.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/SimpleEvaluationVisitor.java
deleted file mode 100644
index 1b04880..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/SimpleEvaluationVisitor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.antlr.runtime.ANTLRStringStream;
-import org.antlr.runtime.CommonTokenStream;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.IfExpression;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.parser.ExprLexer;
-import org.apache.drill.common.expression.parser.ExprParser;
-import org.apache.drill.common.expression.visitors.AggregateChecker;
-import org.apache.drill.common.expression.visitors.ConstantChecker;
-import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.eval.fn.FunctionArguments;
-import org.apache.drill.exec.ref.eval.fn.FunctionEvaluatorRegistry;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
-import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
-
-public class SimpleEvaluationVisitor extends SimpleExprVisitor<BasicEvaluator>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleEvaluationVisitor.class);
-
-  private RecordPointer record;
-  private List<AggregatingEvaluator> aggregators = new ArrayList<AggregatingEvaluator>();
-  
-  public SimpleEvaluationVisitor(RecordPointer record) {
-    super();
-    this.record = record;
-  }
-
-  public List<AggregatingEvaluator> getAggregators() {
-    return aggregators;
-  }
-
-  @Override
-  public BasicEvaluator visitFunctionCall(FunctionCall call) {
-    List<BasicEvaluator> evals = new ArrayList<BasicEvaluator>();
-    boolean includesAggregates = false;
-    boolean onlyConstants = true;
-    for(LogicalExpression e : call){
-      if(AggregateChecker.isAggregating(e)) includesAggregates = true;
-      if(!ConstantChecker.onlyIncludesConstants(e)) onlyConstants = false;
-      evals.add(e.accept(this, null));
-    }
-    FunctionArguments args = new FunctionArguments(onlyConstants, includesAggregates, evals, call);
-
-    if(call.getDefinition().isAggregating()){
-      if(args.includesAggregates()) throw new SetupException(String.format("The call for %s contains one or more arguments that also contain aggregating functions.  An aggregating function cannot contain aggregating expressions.", call.getDefinition()));
-      BasicEvaluator e = FunctionEvaluatorRegistry.getEvaluator(call.getDefinition().getName(), args, record);
-      if(!(e instanceof AggregatingEvaluator ) ){
-        throw new SetupException(String.format("Function %s is an aggregating function.  However, the provided evaluator (%s) is not an aggregating evaluator.", call.getDefinition(), e.getClass()));
-      }
-      
-      aggregators.add( (AggregatingEvaluator) e);
-      return e;
-    }else{
-      BasicEvaluator e = FunctionEvaluatorRegistry.getEvaluator(call.getDefinition().getName(), args, record);
-      return e;
-    }
-  }
-
-  @Override
-  public BasicEvaluator visitIfExpression(IfExpression ifExpr) {
-    return new IfEvaluator(ifExpr, this, record);
-  }
-
-  @Override
-  public BasicEvaluator visitSchemaPath(SchemaPath path) {
-    return new FieldEvaluator(path, record);
-  }
-
-  @Override
-  public BasicEvaluator visitLongConstant(LongExpression longExpr) {
-    return new LongScalar(longExpr.getLong());
-  }
-
-  @Override
-  public BasicEvaluator visitDoubleConstant(DoubleExpression dExpr) {
-    return new DoubleScalar(dExpr.getDouble());
-  }
-
-  @Override
-  public BasicEvaluator visitBooleanConstant(BooleanExpression e) {
-    return new BooleanScalar(e.getBoolean());
-  }
-
-  @Override
-  public BasicEvaluator visitQuotedStringConstant(QuotedString e) {
-    return new StringScalar(e.value);
-  }
-  
-  
-  @Override
-  public BasicEvaluator visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
-    throw new UnsupportedOperationException();
-  }
-
-  public static void main(String[] args) throws Exception {
-    String expr = "if( a == 1) then 4 else 2 end";
-    ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
-    CommonTokenStream tokens = new CommonTokenStream(lexer);
-    ExprParser parser = new ExprParser(tokens);
-    LogicalExpression e = parser.parse().e;
-    RecordPointer r = new UnbackedRecord();
-    r.addField(new SchemaPath("a", ExpressionPosition.UNKNOWN), new IntegerScalar(3));
-    SimpleEvaluationVisitor builder = new SimpleEvaluationVisitor(r);
-    BasicEvaluator eval = e.accept(builder, null);
-    DataValue v = eval.eval();
-    System.out.println(v);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
deleted file mode 100644
index 1b1b3af..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/ComparisonEvaluators.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.BaseBasicEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.exceptions.RecordException;
-import org.apache.drill.exec.ref.values.ComparableValue;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
-
-public class ComparisonEvaluators {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComparisonEvaluators.class);
-  
-  @FunctionEvaluator("equal")
-  public static class EqualEvaluator extends BaseBasicEvaluator{
-    private final BasicEvaluator left;
-    private final BasicEvaluator right;
-    
-    public EqualEvaluator(RecordPointer record, FunctionArguments args){
-      super(args.isOnlyConstants(), record);
-      left = args.getEvaluator(0);
-      right = args.getEvaluator(1);
-    }
-    
-    @Override
-    public BooleanScalar eval() {
-      return new BooleanScalar(left.eval().equals(right.eval()));
-    }
-
-  }
-
-  public static boolean isComparable(DataValue a, DataValue b) {
-      return a instanceof ComparableValue && b instanceof ComparableValue && ((ComparableValue) a).supportsCompare(b);
-  }
-  
-  private abstract static class ComparisonEvaluator extends BaseBasicEvaluator{
-    private final BasicEvaluator left;
-    private final BasicEvaluator right;
-    
-    public ComparisonEvaluator(RecordPointer record, FunctionArguments args){
-      super(args.isOnlyConstants(), record);
-      left = args.getEvaluator(0);
-      right = args.getEvaluator(1);
-    }
-    
-    public abstract boolean valid(int i);
-    
-    @Override
-    public BooleanScalar eval() {
-      DataValue a = left.eval();
-      DataValue b = right.eval();
-      
-      if(isComparable(a, b)){
-        int i = ((ComparableValue)a).compareTo(b);
-        return new BooleanScalar(valid( i));
-      }else{
-        throw new RecordException(String.format("Values cannot be compared.  A %s cannot be compared to a %s.", a, b), null);
-      }
-    }
-  }
-  
-  @FunctionEvaluator("less than")
-  public static class LessThan extends ComparisonEvaluator{
-
-    public LessThan(RecordPointer record, FunctionArguments args) {
-      super(record, args);
-    }
-
-    @Override
-    public boolean valid(int i) {
-      return i == -1;
-    }
-    
-  }
-  
-  @FunctionEvaluator("greater than")
-  public static class GreaterThan extends ComparisonEvaluator{
-
-    public GreaterThan(RecordPointer record, FunctionArguments args) {
-      super(record, args);
-    }
-
-    @Override
-    public boolean valid(int i) {
-      return i == 1;
-    }
-    
-  }
-  
-  @FunctionEvaluator("greater than or equal to")
-  public static class GreaterOrEqualTo extends ComparisonEvaluator{
-
-    public GreaterOrEqualTo(RecordPointer record, FunctionArguments args) {
-      super(record, args);
-    }
-
-    @Override
-    public boolean valid(int i) {
-      return i >= 0;
-    }
-    
-  }
-  
-  @FunctionEvaluator("less than or equal to")
-  public static class LessThanOrEqualTo extends ComparisonEvaluator{
-
-    public LessThanOrEqualTo(RecordPointer record, FunctionArguments args) {
-      super(record, args);
-    }
-
-    @Override
-    public boolean valid(int i) {
-      return i <= 0;
-    }
-    
-  }
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionArguments.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionArguments.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionArguments.java
deleted file mode 100644
index 47c0405..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionArguments.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-
-import com.carrotsearch.hppc.ObjectIntOpenHashMap;
-
-public class FunctionArguments {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionArguments.class);
-  
-  private final boolean onlyConstants;
-  private final boolean includesAggregates;
-  private final List<BasicEvaluator> ev;  
-  private ObjectIntOpenHashMap<String> nameToPosition = new ObjectIntOpenHashMap<String>();
-  
-  public FunctionArguments(boolean onlyConstants, boolean includesAggregates, List<BasicEvaluator> evaluators, FunctionCall call){
-    this.onlyConstants = onlyConstants;
-    this.includesAggregates = includesAggregates;
-    String[] names = call.getDefinition().getArgumentNames();
-    for(int i =0; i < names.length; i++){
-      if(names[i] != null){
-        nameToPosition.put(names[i], i);
-      }
-    }
-    ev = evaluators;
-  }
-  
-  
-  public boolean includesAggregates() {
-    return includesAggregates;
-  }
-
-
-  public boolean isOnlyConstants(){
-    return onlyConstants;
-  }
-  
-  public BasicEvaluator getEvaluator(String name){
-    if(!nameToPosition.containsKey(name)) throw new RuntimeException("Unknown Item provided.");
-    int i = nameToPosition.lget();
-    return getEvaluator(i);
-  }
-  
-  public BasicEvaluator getEvaluator(int argIndex){
-    BasicEvaluator eval = ev.get(argIndex);
-    if(eval == null) throw new RuntimeException("Unknown Item provided.");
-    return eval;
-    
-  }
-  
-  /**
-   * Return this single argument evaluator in this set of evaluators.  If there isn't exactly one, throw a SetupException
-   * @return The single evaluator.
-   */
-  public BasicEvaluator getOnlyEvaluator() throws SetupException{
-    if(ev.size() != 1) throw new SetupException(String.format("Looking for a single argument.  Received %d arguments.", ev.size()));
-    return ev.get(0);
-  }
-  
-  public int size(){
-    return ev.size();
-  }
-  
-  public BasicEvaluator[] getArgsAsArray(){
-    return ev.toArray(new BasicEvaluator[size()]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluator.java
deleted file mode 100644
index 03cee68..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface FunctionEvaluator {
-  String value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluatorRegistry.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluatorRegistry.java
deleted file mode 100644
index 363304a..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/FunctionEvaluatorRegistry.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.reflections.Reflections;
-import org.reflections.scanners.ResourcesScanner;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.scanners.TypeAnnotationsScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.reflections.util.FilterBuilder;
-
-public class FunctionEvaluatorRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionEvaluatorRegistry.class);
-
-  static Map<String, Constructor<? extends BasicEvaluator>> map;
-
-  static {
-    final String scanPackage = "org.apache.drill.exec";
-
-    String s = FilterBuilder.Include.prefix(scanPackage);
-
-    Reflections r = new Reflections(new ConfigurationBuilder().filterInputsBy(new FilterBuilder().include(s))
-        .setUrls(ClasspathHelper.forPackage(scanPackage))
-        .setScanners(new SubTypesScanner(), new TypeAnnotationsScanner(), new ResourcesScanner()));
-
-    Set<Class<? extends BasicEvaluator>> providerClasses = r.getSubTypesOf(BasicEvaluator.class);
-    Map<String, Constructor<? extends BasicEvaluator>> funcs = new HashMap<String, Constructor<? extends BasicEvaluator>>();
-    for (Class<? extends BasicEvaluator> c : providerClasses) {
-      try {
-        FunctionEvaluator annotation = c.getAnnotation(FunctionEvaluator.class);
-        if (annotation == null) {
-          // only basic evaluator marked with a function evaluator interface will be examinged. 
-          continue;
-        }
-
-        Constructor<? extends BasicEvaluator> constructor = c.getConstructor(RecordPointer.class, FunctionArguments.class);
-//        if (!constructor.isAccessible()) {
-//          logger.warn("Unable to register Basic Evaluator {} because is not acccessible.", c);
-//          continue;
-//        }
-
-        funcs.put(annotation.value(), constructor);
-      } catch (NoSuchMethodException e) {
-        logger.warn(
-                "Unable to register Basic Evaluator {} because it does not have a constructor that accepts arguments of [SimpleRecord, ArgumentEvaluators] as its arguments.",
-                c);
-      } catch (SecurityException e) {
-        logger.warn("Unable to register Basic Evaluator {} because of security exception.", c, e);
-      }
-    }
-
-    map = Collections.unmodifiableMap(funcs);
-  }
-
-  public static BasicEvaluator getEvaluator(String name, FunctionArguments args, RecordPointer record) {
-    Constructor<? extends BasicEvaluator> c = map.get(name);
-    if (c == null) throw new SetupException(String.format("Unable to find requested basic evaluator %s.", name));
-    try {
-      try {
-        BasicEvaluator e = c.newInstance(record, args);
-        return e;
-      } catch (InvocationTargetException e) {
-        Throwable ex = e.getCause();
-        if (ex instanceof SetupException) {
-          throw (SetupException) ex;
-        } else {
-          if(ex instanceof RuntimeException){
-            throw (RuntimeException) ex;
-          }else{
-            throw new SetupException(String.format("Failure while attempting to create a new evaluator of type '%s'.", name));
-          }
-        }
-      }
-    } catch (RuntimeException | IllegalAccessException | InstantiationException ex) {
-      throw new SetupException(String.format("Failure while attempting to create a new evaluator of type '%s'.", name),
-          ex);
-    }
-
-  }
-
-  public static void main(String[] args) {
-    System.out.println("loaded.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/MathEvaluators.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/MathEvaluators.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/MathEvaluators.java
deleted file mode 100644
index 347a4f3..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/MathEvaluators.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.BaseBasicEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.exceptions.RecordException;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.NumericValue;
-import org.apache.drill.exec.ref.values.NumericValue.NumericType;
-import org.apache.drill.exec.ref.values.ScalarValues;
-
-public class MathEvaluators {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MathEvaluators.class);
-  
-  @FunctionEvaluator("add")
-  public static class AddEvaluator extends BaseBasicEvaluator{
-    private final BasicEvaluator args[];
-
-    public AddEvaluator(RecordPointer record, FunctionArguments args){
-      super(args.isOnlyConstants(), record);
-      this.args = args.getArgsAsArray();
-    }
-    
-    @Override
-    public NumericValue eval() {
-      NumericValue[] values = new NumericValue[args.length];
-      for(int i =0; i < values.length; i++){
-        DataValue v = args[i].eval();
-        if(Types.isNumericType(v.getDataType())){
-          values[i] = v.getAsNumeric();
-        }
-      }
-      return NumericValue.add(values);
-    }
-
-    
-  }
-  
-  @FunctionEvaluator("multiply")
-  public static class MultiplyE extends BaseBasicEvaluator{
-    private final BasicEvaluator args[];
-
-    public MultiplyE(RecordPointer record, FunctionArguments args){
-      super(args.isOnlyConstants(), record);
-      this.args = args.getArgsAsArray();
-    }
-    
-    @Override
-    public NumericValue eval() {
-      long l = 1;
-      double d = 1;
-      boolean isFloating = false;
-      
-      for(int i =0; i < args.length; i++){
-        final DataValue v = args[i].eval();
-//        logger.debug("DataValue {}", v);
-        if(Types.isNumericType(v.getDataType())){
-          NumericValue n = v.getAsNumeric();
-          NumericType nt = n.getNumericType();
-//          logger.debug("Numeric Type: {}", nt);
-          if(isFloating || nt == NumericType.FLOAT || nt == NumericType.DOUBLE){
-            if(!isFloating){
-              d = l;
-              isFloating = true;
-            }
-            d *= n.getAsDouble();
-          }else{
-            l *= n.getAsLong();
-          }
-          
-        }else{
-          throw new RecordException(String.format("Unable to multiply a value of  %s.", v), null);
-        }
-      }
-      
-      NumericValue out = null;
-      if(isFloating){
-        out = new ScalarValues.DoubleScalar(d);
-      }else{
-        out = new ScalarValues.LongScalar(l);
-      }
-      
-      return out;
-    }
-
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/StringEvaluators.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/StringEvaluators.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/StringEvaluators.java
deleted file mode 100644
index 5a3ee02..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/StringEvaluators.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.BaseBasicEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.ValueReader;
-import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
-
-public class StringEvaluators {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringEvaluators.class);
-
-  @FunctionEvaluator("regex_like")
-  public static class RegexEvaluator extends BaseBasicEvaluator{
-    private final Matcher matcher;
-    private final BasicEvaluator eval;
-    
-    public RegexEvaluator(RecordPointer record, FunctionArguments args){
-      super(args.isOnlyConstants(), record);
-      matcher = Pattern.compile(ValueReader.getString(args.getEvaluator("pattern").eval())).matcher("");
-      eval = args.getEvaluator("value");
-    }
-    
-    @Override
-    public BooleanScalar eval() {
-      matcher.reset(ValueReader.getChars(eval.eval()));
-      return new BooleanScalar(matcher.find());
-    }
-  }
-  
-  
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/AggregatingWrapperEvaluator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/AggregatingWrapperEvaluator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/AggregatingWrapperEvaluator.java
deleted file mode 100644
index b56e5f4..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/AggregatingWrapperEvaluator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn.agg;
-
-import java.util.List;
-
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-
-public class AggregatingWrapperEvaluator implements AggregatingEvaluator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AggregatingWrapperEvaluator.class);
-
-  private List<AggregatingEvaluator> aggs;
-  private BasicEvaluator topEvaluator;
-  
-  public AggregatingWrapperEvaluator(List<AggregatingEvaluator> aggs, BasicEvaluator topEvaluator) {
-    super();
-    this.aggs = aggs;
-    this.topEvaluator = topEvaluator;
-  }
-
-  @Override
-  public DataValue eval() {
-    return topEvaluator.eval();
-  }
-
-  @Override
-  public boolean isConstant() {
-    return topEvaluator.isConstant();
-  }
-
-  @Override
-  public void addRecord() {
-    for(AggregatingEvaluator a : aggs){
-      a.addRecord();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/CountAggregator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/CountAggregator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/CountAggregator.java
deleted file mode 100644
index 247807b..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/CountAggregator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn.agg;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.eval.fn.FunctionArguments;
-import org.apache.drill.exec.ref.eval.fn.FunctionEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ScalarValues;
-
-@FunctionEvaluator("count")
-public class CountAggregator implements AggregatingEvaluator{
-  
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CountAggregator.class);
-  
-  boolean isConstant;
-  long l = 0;
-  boolean checkForNull = false;
-  BasicEvaluator child;
-  
-  public CountAggregator(RecordPointer inputRecord, FunctionArguments e) {
-    isConstant = e.isOnlyConstants();
-    if(!e.getOnlyEvaluator().isConstant()){
-      checkForNull = true;
-      child = e.getOnlyEvaluator();
-    }
-  }
-  
-  
-  @Override
-  public void addRecord() {
-    if(checkForNull){
-      if(child.eval() != DataValue.NULL_VALUE){
-        l++;
-      }
-    }else{
-      l++;
-    }
-  }
-
-  @Override
-  public DataValue eval() {
-    DataValue v = new ScalarValues.LongScalar(l);
-    l = 0;
-    return v; 
-  }
-
-
-  @Override
-  public boolean isConstant() {
-    return isConstant;
-  }
-
-
- 
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/SumAggregator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/SumAggregator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/SumAggregator.java
deleted file mode 100644
index f1e01c0..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/eval/fn/agg/SumAggregator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.eval.fn.agg;
-
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.eval.fn.FunctionArguments;
-import org.apache.drill.exec.ref.eval.fn.FunctionEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.NumericValue;
-import org.apache.drill.exec.ref.values.ScalarValues;
-
-@FunctionEvaluator("sum")
-public class SumAggregator implements AggregatingEvaluator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SumAggregator.class);
-  private boolean constantsOnly;
-  private long l = 0;
-  private double d = 0;
-
-  boolean integer = true;
-  private BasicEvaluator child;
-
-  public SumAggregator(RecordPointer inputRecord, FunctionArguments e) {
-    this.child = e.getOnlyEvaluator();
-    this.constantsOnly = e.isOnlyConstants();
-  }
-
-  @Override
-  public void addRecord() {
-    DataValue dv = child.eval();
-    NumericValue v = dv.getAsNumeric();
-    if (integer) {
-      
-      switch (v.getNumericType()) {
-      case DOUBLE:
-      case FLOAT:
-        integer = false;
-        d = l; // loss of precision
-        d += v.getAsDouble();
-        break;
-      case INT:
-      case LONG:
-        l += v.getAsLong();
-        return;
-      default:
-        throw new UnsupportedOperationException();
-      }
-    }else{
-      switch (v.getNumericType()) {
-      case DOUBLE:
-      case FLOAT:
-      case INT:
-      case LONG:
-        integer = false;
-        d += v.getAsDouble();
-        return;
-      default:
-        throw new UnsupportedOperationException();
-      }
-      
-    }
-  }
-
-  @Override
-  public DataValue eval() {
-    DataValue v;
-    if (integer) {
-      v = new ScalarValues.LongScalar(l);
-    } else {
-      v = new ScalarValues.DoubleScalar(d);
-    }
-    reset();
-    return v;
-  }
-
-  private void reset() {
-    l = 0;
-    d = 0;
-    integer = true;
-  }
-
-  @Override
-  public boolean isConstant() {
-    return constantsOnly;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/MajorException.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/MajorException.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/MajorException.java
deleted file mode 100644
index 393c825..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/MajorException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.exceptions;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-
-public class MajorException extends DrillRuntimeException{
-  @SuppressWarnings("unused") static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MajorException.class);
-
-  public MajorException() {
-    super();
-  }
-
-  public MajorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-  public MajorException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public MajorException(String message) {
-    super(message);
-  }
-
-  public MajorException(Throwable cause) {
-    super(cause);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/RecordException.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/RecordException.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/RecordException.java
deleted file mode 100644
index 4f8ccd6..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/RecordException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.exceptions;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ref.RecordPointer;
-
-public class RecordException extends DrillRuntimeException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordException.class);
-  
-  private RecordPointer problemRecord;
-  
-  public RecordException(String message, RecordPointer problemRecord) {
-    super(message);
-    this.problemRecord = problemRecord;
-  }
-
-  public RecordException(String message, RecordPointer problemRecord, Throwable cause) {
-    super(message, cause);
-    this.problemRecord = problemRecord;
-  }
-
-  public RecordPointer getProblemRecord(){
-    return problemRecord;
-  }
-
-  public void setProblemRecord(RecordPointer problemRecord) {
-    this.problemRecord = problemRecord;
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/SetupException.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/SetupException.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/SetupException.java
deleted file mode 100644
index cf44c82..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/exceptions/SetupException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.exceptions;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-
-public class SetupException extends DrillRuntimeException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetupException.class);
-
-  public SetupException() {
-    super();
-  }
-
-  public SetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-  public SetupException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public SetupException(String message) {
-    super(message);
-  }
-
-  public SetupException(Throwable cause) {
-    super(cause);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/AbstractBlockingOperator.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/AbstractBlockingOperator.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/AbstractBlockingOperator.java
deleted file mode 100644
index abbdc5c..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/AbstractBlockingOperator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import org.apache.drill.common.logical.data.SingleInputOperator;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RecordIterator.NextOutcome;
-
-
-public abstract class AbstractBlockingOperator<T extends SingleInputOperator> extends SingleInputROPBase<T> {
-
-  public AbstractBlockingOperator(T config) {
-    super(config);
-  }
-
-  private RecordIterator incoming;
-  protected RecordPointer inputRecord;
-  protected final ProxySimpleRecord outputRecord = new ProxySimpleRecord();
-
-  @Override
-  public void setInput(RecordIterator incoming) {
-    this.incoming = incoming;
-    inputRecord = incoming.getRecordPointer();
-  }
-
-  protected abstract void consumeRecord();
-  protected abstract RecordIterator doWork();
-
-  private RecordIterator consumeData(){
-    while (incoming.next() != NextOutcome.NONE_LEFT)  {
-      consumeRecord();
-    }
-    return doWork();
-  }
-  
-  @Override
-  protected RecordIterator getIteratorInternal() {
-    return new BlockingIterator();
-  }
-
-  private class BlockingIterator implements RecordIterator{
-    private RecordIterator iter;
-    
-    public BlockingIterator(){
-    }
-    
-    @Override
-    public NextOutcome next() {
-      if(this.iter == null){
-        this.iter = consumeData();
-      }
-      return this.iter.next();
-    }
-    
-    @Override
-    public ROP getParent() {
-      return AbstractBlockingOperator.this;
-    }
-    
-    @Override
-    public RecordPointer getRecordPointer() {
-      return outputRecord;
-    }
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BaseSinkROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BaseSinkROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BaseSinkROP.java
deleted file mode 100644
index 4b46f41..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BaseSinkROP.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import java.io.IOException;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.drill.common.logical.data.SinkOperator;
-import org.apache.drill.exec.ref.RunOutcome;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
-import org.apache.drill.exec.ref.RecordIterator.NextOutcome;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-
-public abstract class BaseSinkROP<T extends SinkOperator> extends SingleInputROPBase<T> implements SinkROP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseSinkROP.class);
-  
-  protected RecordIterator iter;
-  protected RecordPointer record;
-  
-  public BaseSinkROP(T config) {
-    super(config);
-  }
-  
-  @Override
-  protected void setupEvals(EvaluatorFactory builder) throws SetupException {
-    try {
-      setupSink();
-    } catch (IOException e) {
-      throw new SetupException(String.format("failure setting up %s sink rop.", this.getClass()), e);
-    }
-  }
-  
-  @Override
-  protected void setInput(RecordIterator incoming) {
-    iter = incoming;
-    record = incoming.getRecordPointer();
-  }
-
-  @Override
-  public RecordIterator getIteratorInternal() {
-    throw new UnsupportedOperationException("A ReferenceSink");
-  }
-  
-  @Override
-  public RunOutcome run(StatusHandle handle) {
-    Throwable exception = null;
-    final int runsize = 1000;
-    int recordCount = 0;
-    OutcomeType outcome = OutcomeType.FAILED;
-    long pos = -1; 
-    try{
-    while(true){
-      boolean more = true;
-      for(;recordCount < runsize; recordCount++){
-        NextOutcome r = iter.next();
-        if(r == NextOutcome.NONE_LEFT){
-          more = false;
-          break;
-        }else{
-          pos = sinkRecord(record);
-        }
-      }
-      handle.progress(pos, recordCount);
-      if(!handle.okToContinue()){
-        logger.debug("Told to cancel, breaking run.");
-        outcome = OutcomeType.CANCELED;
-        break;
-      }else if(!more){
-        outcome = OutcomeType.SUCCESS;
-        logger.debug("Breaking because no more records were found.");
-        break;
-      }else{
-        logger.debug("No problems, doing next progress iteration.");
-      }
-      
-    }
-    }catch(Exception e){
-      exception = e ;
-    }
-    
-    cleanup(outcome);
-    return new RunOutcome(outcome, pos, recordCount, exception);
-    
-  }
-
-  /**
-   * 
-   * @param r RecordPointer to record
-   * @return The approximate amount of bytes written.
-   * @throws IOException
-   */
-  public abstract long sinkRecord(RecordPointer r) throws IOException;
-  protected abstract void setupSink() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BoundaryListener.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BoundaryListener.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BoundaryListener.java
deleted file mode 100644
index 4d56885..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/BoundaryListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import org.apache.drill.common.expression.SchemaPath;
-
-public interface BoundaryListener {
-  public SchemaPath[] getOrderedBoundaryPaths();
-  public void markBoundaryAsCrossed();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
deleted file mode 100644
index 030efbb..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.CollapsingAggregate;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.AggregatingEvaluator;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.ScalarValues;
-
-public class CollapsingAggregateROP extends SingleInputROPBase<CollapsingAggregate> implements BoundaryListener {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CollapsingAggregateROP.class);
-
-  private SchemaPath[] boundaryPaths;
-  private BasicEvaluator targetEvaluator;
-  private boolean foundTarget = false;
-  private DataValue currentValue;
-  private DataValue previousValue;
-  private BasicEvaluator boundaryKey;
-  private RecordIterator incoming;
-  private BasicEvaluator[] carryovers;
-  private FieldReference[] carryoverNames;
-  private DataValue[] carryoverValues;
-  private AggregatingEvaluator[] aggs;
-  private SchemaPath[] aggNames;
-  private boolean boundaryCrossed = false;
-  private final boolean targetMode;
-  
-  public CollapsingAggregateROP(CollapsingAggregate config) {
-    super(config);
-    targetMode = config.getTarget() != null;
-  }
-  
-  @Override
-  protected void setupEvals(EvaluatorFactory builder) {
-    
-    if(config.getWithin() != null){
-      boundaryPaths = new SchemaPath[]{config.getWithin()};
-      boundaryKey = builder.getBasicEvaluator(record, config.getWithin());  
-    }else{
-      boundaryPaths = new SchemaPath[0];
-      boundaryKey = new ScalarValues.IntegerScalar(0);
-    }
-    
-    aggs = new AggregatingEvaluator[config.getAggregations().length];
-    carryovers = new BasicEvaluator[config.getCarryovers().length];
-    carryoverNames = new FieldReference[config.getCarryovers().length];
-    carryoverValues = new DataValue[config.getCarryovers().length];
-
-    if(targetMode){
-      targetEvaluator = builder.getBasicEvaluator(record, config.getTarget());
-    }
-    aggNames = new SchemaPath[aggs.length];
-    for(int i =0; i < aggs.length; i++){
-      aggs[i] = builder.getAggregatingOperator(record, config.getAggregations()[i].getExpr());
-      aggNames[i] = config.getAggregations()[i].getRef();
-    }
-    
-    for(int i =0; i < carryovers.length; i++){
-      carryovers[i] = builder.getBasicEvaluator(record, config.getCarryovers()[i]);
-      carryoverNames[i] = config.getCarryovers()[i];
-    }
-    
-    
-    
-  }
- 
-  @Override
-  protected void setInput(RecordIterator incoming) {
-    this.incoming = incoming;
-  }
-
-  @Override
-  protected RecordIterator getIteratorInternal() {
-    return new AggregatingIterator();
-  }
-
-  private boolean checkBoundaryCrossing() {
-    if (boundaryCrossed) {
-      boundaryCrossed = false;
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void markBoundaryAsCrossed() {
-    this.boundaryCrossed = true;
-  }
-
-  
-
-  
-  private class AggregatingIterator implements RecordIterator {
-    private boolean remainder = false;
-    private boolean more = true;
-    private UnbackedRecord outputRecord = new UnbackedRecord();
-    public AggregatingIterator() {
-    }
-
-    private NextOutcome readNext(){
-
-      // copy over the previous values.
-      previousValue = currentValue;
-      
-      // increment the parent forward.
-      NextOutcome n = incoming.next();
-
-      boolean changed = false;
-      
-      // read boundary values unless no new values were loaded.
-      if(n != NextOutcome.NONE_LEFT){
-        currentValue = boundaryKey.eval();
-        if(!currentValue.equals(previousValue)) changed = true;
-  
-        // skip first boundary.
-        if(previousValue == null) changed = false;
-        
-      }else{
-        
-        changed = true;
-      }
-      
-      if(changed){
-        markBoundaryAsCrossed();
-      }
-      
-      return n;
-    }
-    
-    private void consumeCurrent(){
-      for(int x = 0; x < aggs.length; x++){
-        aggs[x].addRecord();  
-      }
-      
-      // if we're in target mode and this row matches the target criteria, we're going to copy carry over values and mark foundTarget = true.
-      if(targetMode){
-        DataValue v = targetEvaluator.eval();
-        if(v.getDataType().getMinorType() == MinorType.BIT && v.getAsBooleanValue().getBoolean()){
-          foundTarget = true;
-          for(int i =0 ; i < carryovers.length; i++){
-            carryoverValues[i] = carryovers[i].eval();
-          }
-        }
-      }else{
-        for(int i =0 ; i < carryovers.length; i++){
-          carryoverValues[i] = carryovers[i].eval();
-        }
-      }
-    }
-    
-    /** 
-     * Write the output of the operation to the unbacked record.
-     * @return Whether or not the record should be included in the output.
-     */
-    private boolean writeOutputRecord(){
-      outputRecord.clear();
-      for(int x = 0; x < aggs.length; x++){
-        DataValue dv = aggs[x].eval();
-//        logger.debug("Adding Aggregated Values named {} with value {}", outputNames[x], dv);
-        outputRecord.addField(aggNames[x], dv);
-      }
-      
-      // Add the carryover keys.
-      if(targetMode){
-        if(foundTarget){
-          for(int y = 0; y < carryoverNames.length; y++){
-            outputRecord.addField(carryoverNames[y], carryoverValues[y]);
-          }
-          foundTarget = false;
-          return true;
-        }else{
-          return false;
-        }
-      }else{
-        for(int y = 0; y < carryoverNames.length; y++){
-          outputRecord.addField(carryoverNames[y], carryoverValues[y]);
-        }
-        return true;
-      }
-      
-
-      
-      
-    }
-    
-    @Override
-    public NextOutcome next() {
-      outside: while(true){
-      NextOutcome whatNext = null;
-      
-      // if we don't have more and there are no more values, exit.
-      if (!more) return NextOutcome.NONE_LEFT;
-      
-      while (true) {
-        
-        // we shouldn't increment the iterator since we have to consume our remainder.
-        if(!remainder){
-          whatNext = readNext();  
-
-          // if we've just crossed a boundary, we should output the previous values.
-          if(checkBoundaryCrossing()){
-            boolean wroteSomething = writeOutputRecord();
-            
-            // if there is no future input, we'll flag as !more.  Otherwise, will inform that there are pending records that should be consumed.
-            if(whatNext == NextOutcome.NONE_LEFT){
-              more = false;
-            }else{
-              remainder = true;
-            }
-            
-            // if we didn't write something, we'll retry the outside loop
-            if(!wroteSomething) continue outside;
-            
-            // always return a next outcome of true since we've just output a record.
-            return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
-            
-          }
-        }
-        
-        // we don't consume the current record until after we've output a record associated with a boundary (as necessary).
-        consumeCurrent();
-        remainder = false;
-      }
-      
-      }
-    }
-
-    @Override
-    public ROP getParent() {
-      return CollapsingAggregateROP.this;
-    }
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return outputRecord;
-    }
-
-  }
-
-  @Override
-  public SchemaPath[] getOrderedBoundaryPaths() {
-    return boundaryPaths;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ConstantROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ConstantROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ConstantROP.java
deleted file mode 100644
index 1cfe8a8..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/ConstantROP.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import java.util.Iterator;
-
-import org.apache.drill.common.logical.data.Constant;
-import org.apache.drill.exec.ref.IteratorRegistry;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.rse.JSONRecordReader;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-public class ConstantROP extends ROPBase<Constant>{
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanROP.class);
-
-    private UnbackedRecord record;
-
-    public ConstantROP(Constant config) {
-        super(config);
-        record = new UnbackedRecord();
-    }
-
-
-    @Override
-    protected void setupIterators(IteratorRegistry registry) throws SetupException {
-       // try{
-            super.setupIterators(registry);
-            // need to assign reader
-           // throw new IOException();
-        //}catch(IOException e){
-            //throw new SetupException("Failure while setting up reader.");
-        //}
-    }
-
-
-    @Override
-    protected RecordIterator getIteratorInternal() {
-        return new ConstantIterator(ConstantROP.this.config.getContent().getRoot());
-    }
-
-
-    @Override
-    public void cleanup(OutcomeType outcome) {
-        super.cleanup(outcome);
-    }
-
-
-    class ConstantIterator implements RecordIterator {
-
-        Iterator<JsonNode> jsonIter;
-
-        ConstantIterator(JsonNode json) {
-            jsonIter = json.elements();
-        }
-
-        public RecordPointer getRecordPointer(){
-            return record;
-        }
-
-        public NextOutcome next(){
-            if ( ! jsonIter.hasNext()){
-                return NextOutcome.NONE_LEFT;
-            }
-            JsonNode contentJSON = ConstantROP.this.config.getContent().getRoot();
-            if (contentJSON.isArray())
-            { // list of constant records was specified
-                JsonNode node;
-                node = jsonIter.next();
-                convertJsonToRP(node, record);
-                return NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
-            }
-            else{
-                convertJsonToRP(contentJSON, record);
-                return NextOutcome.NONE_LEFT;
-            }
-        }
-
-        private void convertJsonToRP(JsonNode node, RecordPointer rp){
-            record.clear();
-            record.merge(JSONRecordReader.convert(node));
-        }
-
-        public ROP getParent(){
-            return ConstantROP.this;
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
deleted file mode 100644
index 5c2a908..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/DataWriter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import java.io.IOException;
-
-public interface DataWriter {
-  public void startRecord() throws IOException;
-  public void writeArrayStart(int length) throws IOException;
-  public void writeArrayElementStart() throws IOException;
-  public void writeArrayElementEnd() throws IOException;
-  public void writeArrayEnd() throws IOException;
-  
-  public void writeMapStart() throws IOException;
-  public void writeMapKey(CharSequence seq) throws IOException;
-  public void writeMapValueStart() throws IOException;
-  public void writeMapValueEnd() throws IOException;
-  public void writeMapEnd() throws IOException;
-  
-  public void writeBoolean(boolean b) throws IOException;
-  public void writeSInt32(int value) throws IOException;
-  public void writeSInt64(long value) throws IOException;
-  public void writeBytes(byte[] bytes) throws IOException;
-  public void writeSFloat64(double value) throws IOException;
-  public void writeSFloat32(float value) throws IOException;
-  public void writeNullValue() throws IOException;
-  public void writeCharSequence(CharSequence value) throws IOException;
-  public void endRecord() throws IOException;
-  public void finish() throws IOException;
-  
-  public enum ConverterType{
-    JSON
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
deleted file mode 100644
index ff1fd05..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FilterROP.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import org.apache.drill.common.logical.data.Filter;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BooleanEvaluator;
-
-public class FilterROP extends SingleInputROPBase<Filter>{
-
-  private FilterIterator iter;
-  private BooleanEvaluator filterEval;
-  
-  public FilterROP(Filter config) {
-    super(config);
-  }
-
-  @Override
-  protected void setupEvals(EvaluatorFactory builder) {
-    filterEval = builder.getBooleanEvaluator(record, config.getExpr());
-  }
-
-  @Override
-  public void setInput(RecordIterator incoming) {
-    iter = new FilterIterator(incoming);
-  }
-
-  @Override
-  public RecordIterator getIteratorInternal() {
-    return iter;
-  }
-  
-  private class FilterIterator implements RecordIterator{
-    RecordIterator incoming;
-
-    public FilterIterator(RecordIterator incoming) {
-      this.incoming = incoming;
-    }
-
-    @Override
-    public NextOutcome next() {
-      NextOutcome r;
-      while(true){
-        r = incoming.next();
-        if(r == NextOutcome.NONE_LEFT) return NextOutcome.NONE_LEFT;
-        if(filterEval.eval()) return r;
-      }
-    }
-
-    @Override
-    public ROP getParent() {
-      return FilterROP.this;
-    }
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return record;
-    }
-    
-    
-    
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java
deleted file mode 100644
index db7c916..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/FlattenROP.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Flatten;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.eval.EvaluatorTypes.BasicEvaluator;
-import org.apache.drill.exec.ref.values.BaseArrayValue;
-import org.apache.drill.exec.ref.values.DataValue;
-import org.apache.drill.exec.ref.values.SimpleArrayValue;
-
-
-public class FlattenROP extends SingleInputROPBase<Flatten> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenROP.class);
-
-  private RecordPointer outputRecord = new UnbackedRecord();
-  private BasicEvaluator evaluator;
-  private RecordIterator iter;
-
-  public FlattenROP(Flatten config) {
-    super(config);
-  }
-
-  @Override
-  protected void setInput(RecordIterator incoming) {
-    this.iter = new FlattenIterator(incoming);
-  }
-
-  @Override
-  protected RecordIterator getIteratorInternal() {
-    return iter;
-  }
-
-  @Override
-  protected void setupEvals(EvaluatorFactory builder) {
-    evaluator = builder.getBasicEvaluator(record, config.getExpr());
-  }
-
-  private class ArrayValueIterator {
-    private BaseArrayValue arrayValue;
-    private int currentIndex = 0;
-
-    public ArrayValueIterator(BaseArrayValue arrayValue) {
-      this.arrayValue = arrayValue;
-    }
-
-    public ArrayValueIterator() {
-      this(new SimpleArrayValue());
-    }
-
-    public DataValue next() {
-      DataValue v = null;
-      if (currentIndex < arrayValue.size()) {
-        v = arrayValue.getByArrayIndex(currentIndex);
-      }
-
-      currentIndex++;
-      return v;
-
-    }
-  }
-
-  private class FlattenIterator implements RecordIterator {
-    RecordIterator incoming;
-    NextOutcome currentOutcome;
-    int currentIndex = 0;
-    ArrayValueIterator arrayValueIterator = new ArrayValueIterator();
-
-    public FlattenIterator(RecordIterator incoming) {
-      super();
-      this.incoming = incoming;
-    }
-
-    @Override
-    public RecordPointer getRecordPointer() {
-      return outputRecord;
-    }
-
-    @Override
-    public NextOutcome next() {
-      DataValue v;
-      if ((v = arrayValueIterator.next()) != null)  //if we are already iterating through a sub-array, keep going
-        return mergeValue(v);
-      else //otherwise, get the next record
-        currentOutcome = incoming.next();
-
-
-      if (currentOutcome != NextOutcome.NONE_LEFT) {
-        if (evaluator.eval().getDataType().getMode() == DataMode.REPEATED) {
-          arrayValueIterator = new ArrayValueIterator(evaluator.eval().getAsContainer().getAsArray());
-
-          while ((v = arrayValueIterator.next()) != null) {
-            return mergeValue(v);
-          }
-        } else {
-          outputRecord.copyFrom(record);
-          outputRecord.addField(config.getName(), evaluator.eval());
-          if(config.isDrop())
-            outputRecord.removeField((SchemaPath)config.getExpr());
-        }
-      }
-      return currentOutcome;
-    }
-
-    // helper function to merge one of the values from a sub array into the parent record
-    private NextOutcome mergeValue(DataValue v) {
-      outputRecord.copyFrom(record);
-      outputRecord.addField(config.getName(), v);
-      if(config.isDrop())
-        outputRecord.removeField((SchemaPath)config.getExpr());
-      return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
-    }
-
-    @Override
-    public ROP getParent() {
-      return FlattenROP.this;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b3460af8/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
----------------------------------------------------------------------
diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
deleted file mode 100644
index 2b46f1a..0000000
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/JoinROP.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ref.rops;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
-import org.apache.drill.exec.ref.IteratorRegistry;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-import org.apache.drill.exec.ref.UnbackedRecord;
-import org.apache.drill.exec.ref.eval.EvaluatorFactory;
-import org.apache.drill.exec.ref.eval.fn.ComparisonEvaluators;
-import org.apache.drill.exec.ref.exceptions.SetupException;
-import org.apache.drill.exec.ref.values.ComparableValue;
-import org.apache.drill.exec.ref.values.DataValue;
-
-import java.util.List;
-
-public class JoinROP extends ROPBase<Join> {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinROP.class);
-
-    private RecordIterator left;
-    private RecordIterator right;
-    private UnbackedRecord record;
-    private EvaluatorFactory factory;
-
-    public JoinROP(Join config) {
-        super(config);
-        record = new UnbackedRecord();
-    }
-
-    @Override
-    protected void setupIterators(IteratorRegistry builder) {
-        left = Iterables.getOnlyElement(builder.getOperator(config.getLeft()));
-        right = Iterables.getOnlyElement(builder.getOperator(config.getRight()));
-    }
-
-    @Override
-    protected void setupEvals(EvaluatorFactory builder) throws SetupException {
-        factory = builder;
-    }
-
-    @Override
-    protected RecordIterator getIteratorInternal() {
-        return createIteratorFromJoin(config.getJointType());
-    }
-
-    private RecordIterator createIteratorFromJoin(Join.JoinType type) {
-        switch (type) {
-            case LEFT:
-                return new LeftIterator();
-            case INNER:
-                return new InnerIterator();
-            case OUTER:
-                return new OuterIterator();
-            default:
-                throw new UnsupportedOperationException("Type not supported: " + type);
-        }
-    }
-
-    private class RecordBuffer {
-        final boolean schemaChanged;
-        final RecordPointer pointer;
-        boolean hasJoined = false;
-
-        private RecordBuffer(RecordPointer pointer, boolean schemaChanged) {
-            this.pointer = pointer;
-            this.schemaChanged = schemaChanged;
-        }
-
-        public void setHasJoined(boolean hasJoined) {
-            this.hasJoined = hasJoined;
-        }
-    }
-
-    abstract class JoinIterator implements RecordIterator {
-        protected List<RecordBuffer> buffer;
-        protected int curIdx = 0;
-        protected int bufferLength = 0;
-
-        protected abstract int setupBuffer();
-
-        protected int setupBufferForIterator(RecordIterator iterator) {
-            int count = 0;
-            NextOutcome outcome = iterator.next();
-            while (outcome != NextOutcome.NONE_LEFT) {
-                buffer.add(new RecordBuffer(
-                        iterator.getRecordPointer().copy(),
-                        outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED)
-                );
-                ++count;
-                outcome = iterator.next();
-            }
-            return count;
-        }
-
-        @Override
-        public RecordPointer getRecordPointer() {
-            return record;
-        }
-
-        public NextOutcome next() {
-            if (buffer == null) {
-                buffer = Lists.newArrayList();
-                setupBuffer();
-                bufferLength = buffer.size();
-            }
-            return getNext();
-        }
-
-        public abstract NextOutcome getNext();
-
-        protected void setOutputRecord(RecordPointer... inputs) {
-            boolean first = true;
-            for(RecordPointer input : inputs) {
-                if(input == null) {
-                    continue;
-                }
-
-                if(first) {
-                    first = false;
-                    record.copyFrom(input);
-                } else {
-                    record.merge(input);
-                }
-            }
-        }
-
-        public boolean eval(DataValue leftVal, DataValue rightVal, String relationship) {
-            // Skip join if no comparison can be made
-            if (!ComparisonEvaluators.isComparable(leftVal, rightVal)) {
-                return false;
-            }
-
-            //Somehow utilize ComparisonEvaluators?
-            switch (relationship) {
-                case "!=":
-                    return !leftVal.equals(rightVal);
-                case "==":
-                    return leftVal.equals(rightVal);
-                case "<":
-                    return ((ComparableValue) leftVal).compareTo(rightVal) < 0;
-                case "<=":
-                    return ((ComparableValue) leftVal).compareTo(rightVal) <= 0;
-                case ">":
-                    return ((ComparableValue) leftVal).compareTo(rightVal) > 0;
-                case ">=":
-                    return ((ComparableValue) leftVal).compareTo(rightVal) >= 0;
-                default:
-                    throw new DrillRuntimeException("Relationship not supported: " + relationship);
-            }
-        }
-
-        @Override
-        public ROP getParent() {
-            return JoinROP.this;
-        }
-    }
-
-    class InnerIterator extends JoinIterator {
-        NextOutcome rightOutcome;
-
-        @Override
-        protected int setupBuffer() {
-            return setupBufferForIterator(left);
-        }
-
-        @Override
-        public NextOutcome getNext() {
-            final RecordPointer rightPointer = right.getRecordPointer();
-            while (true) {
-                if (curIdx == 0) {
-                    rightOutcome = right.next();
-
-                    if (rightOutcome == NextOutcome.NONE_LEFT) {
-                        break;
-                    }
-                }
-
-                final RecordBuffer bufferObj = buffer.get(curIdx++);
-                Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
-                    @Override
-                    public boolean apply(JoinCondition condition) {
-                        return eval(factory.getBasicEvaluator(rightPointer, condition.getRight()).eval(),
-                                factory.getBasicEvaluator(bufferObj.pointer, condition.getLeft()).eval(), condition.getRelationship());
-                    }
-                });
-
-                if (curIdx >= bufferLength) {
-                   curIdx = 0;
-                }
-
-                if (option.isPresent()) {
-                    setOutputRecord(rightPointer, bufferObj.pointer);
-                    return (bufferObj.schemaChanged || rightOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
-                            NextOutcome.INCREMENTED_SCHEMA_CHANGED :
-                            NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
-                }
-
-            }
-
-            return NextOutcome.NONE_LEFT;
-        }
-    }
-
-    class LeftIterator extends JoinIterator {
-        private NextOutcome leftOutcome;
-
-        @Override
-        protected int setupBuffer() {
-            return setupBufferForIterator(right);
-        }
-
-        @Override
-        public NextOutcome getNext() {
-            final RecordPointer leftPointer = left.getRecordPointer();
-            boolean isFound = true;
-            if(curIdx >= bufferLength) {
-                return NextOutcome.NONE_LEFT;
-            }
-
-            while (true) {
-                if (curIdx == 0) {
-                    if (!isFound) {
-                        setOutputRecord(leftPointer);
-                        return leftOutcome;
-                    }
-
-                    leftOutcome = left.next();
-
-                    if (leftOutcome == NextOutcome.NONE_LEFT) {
-                        break;
-                    }
-
-                    isFound = false;
-                }
-
-                final RecordBuffer bufferObj = buffer.get(curIdx++);
-                Optional<JoinCondition> option = Iterables.tryFind(Lists.newArrayList(config.getConditions()), new Predicate<JoinCondition>() {
-                    @Override
-                    public boolean apply(JoinCondition condition) {
-                        return eval(factory.getBasicEvaluator(leftPointer, condition.getLeft()).eval(),
-                                factory.getBasicEvaluator(bufferObj.pointer, condition.getRight()).eval(), condition.getRelationship());
-                    }
-                });
-
-                if (option.isPresent()) {
-                    setOutputRecord(leftPointer, bufferObj.pointer);
-                    bufferObj.setHasJoined(true);
-                    return (bufferObj.schemaChanged || leftOutcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED) ?
-                            NextOutcome.INCREMENTED_SCHEMA_CHANGED :
-                            NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
-                }
-
-                if (curIdx >= bufferLength) {
-                    curIdx = 0;
-                }
-            }
-
-            return NextOutcome.NONE_LEFT;
-        }
-    }
-
-    class OuterIterator extends LeftIterator {
-        boolean innerJoinCompleted = false;
-
-        @Override
-        public NextOutcome getNext() {
-            if (innerJoinCompleted && curIdx >= bufferLength) {
-                return NextOutcome.NONE_LEFT;
-            }
-
-            if (!innerJoinCompleted) {
-                NextOutcome outcome = super.getNext();
-                if (outcome != NextOutcome.NONE_LEFT) {
-                    return outcome;
-                } else {
-                    innerJoinCompleted = true;
-                    curIdx = 0;
-                }
-            }
-
-            if (innerJoinCompleted) {
-                while (curIdx < bufferLength) {
-                    RecordBuffer recordBuffer = buffer.get(curIdx++);
-                    if (!recordBuffer.hasJoined) {
-                        setOutputRecord(recordBuffer.pointer, null);
-                        return recordBuffer.schemaChanged ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
-                    }
-                }
-            }
-            return NextOutcome.NONE_LEFT;
-        }
-    }
-}


Mime
View raw message