drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [08/14] git commit: DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.
Date Tue, 20 May 2014 03:01:56 GMT
DRILL-776: Support SelectionVector SV2 and SV4 in Partition Sender.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e9ac37db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e9ac37db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e9ac37db

Branch: refs/heads/master
Commit: e9ac37dbff8f0b606673bb5827c0daac1d3275b0
Parents: c40735e
Author: Jinfeng Ni <jni@maprtech.com>
Authored: Mon May 19 08:35:34 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon May 19 17:46:19 2014 -0700

----------------------------------------------------------------------
 .../PartitionSenderRootExec.java                | 69 +++++++++++++++-----
 .../impl/partitionsender/Partitioner.java       |  5 ++
 .../partitionsender/PartitionerSV2Template.java | 60 +++++++++++++++++
 .../partitionsender/PartitionerSV4Template.java | 61 +++++++++++++++++
 .../physical/HashToRandomExchangePrel.java      |  5 ++
 .../org/apache/drill/TestExampleQueries.java    | 10 +++
 6 files changed, 193 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bcd484c..f574351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -182,8 +182,24 @@ public class PartitionSenderRootExec implements RootExec {
     // set up partitioning function
     final LogicalExpression expr = operator.getExpr();
     final ErrorCollector collector = new ErrorCollectorImpl();
-    final ClassGenerator<Partitioner> cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION,
-                                                                         context.getFunctionRegistry());
+    final ClassGenerator<Partitioner> cg ;
+
+    boolean hyper = false;
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+    case NONE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+      break;
+    case TWO_BYTE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, context.getFunctionRegistry());
+      break;
+    case FOUR_BYTE:
+      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, context.getFunctionRegistry());
+      hyper = true;
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
 
     final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr,
incoming, collector, context.getFunctionRegistry());
     if (collector.hasErrors()) {
@@ -255,22 +271,41 @@ public class PartitionSenderRootExec implements RootExec {
       Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
                                                        vvIn.getField().getType().getMode());
       JClass vvClass = cg.getModel().ref(vvType);
-      // the following block generates calls to copyFrom(); e.g.:
-      // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
-      //                                                     outgoingBatches[bucket].getRecordCount(),
-      //                                                     vv1);
-      cg.getEvalBlock()._if(
-        ((JExpression) JExpr.cast(vvClass,
-              ((JExpression)
-                     outgoingVectors
-                       .component(bucket))
-                       .component(JExpr.lit(fieldId))))
-                       .invoke("copyFromSafe")
-                       .arg(inIndex)
-                       .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                       .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
-                       ._return();
 
+      if (!hyper) {
+        // the following block generates calls to copyFrom(); e.g.:
+        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+        //                                                     outgoingBatches[bucket].getRecordCount(),
+        //                                                     vv1);
+        cg.getEvalBlock()._if(
+          ((JExpression) JExpr.cast(vvClass,
+                ((JExpression)
+                       outgoingVectors
+                         .component(bucket))
+                         .component(JExpr.lit(fieldId))))
+                         .invoke("copyFromSafe")
+                         .arg(inIndex)
+                         .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                         .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+                         ._return();
+      } else {
+        // the following block generates calls to copyFrom(); e.g.:
+        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+        //                                                     outgoingBatches[bucket].getRecordCount(),
+        //                                                     vv1[((inIndex)>>>
16)]);
+        cg.getEvalBlock()._if(
+          ((JExpression) JExpr.cast(vvClass,
+                ((JExpression)
+                       outgoingVectors
+                         .component(bucket))
+                         .component(JExpr.lit(fieldId))))
+                         .invoke("copyFromSafe")
+                         .arg(inIndex)
+                         .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                         .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression)
outgoingBatches.component(bucket)).invoke("flush"))
+                         ._return();
+
+      }
       ++fieldId;
     }
     // generate the OutgoingRecordBatch helper invocations

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 7d3998b..3ffead0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,4 +31,9 @@ public interface Partitioner {
   public abstract void partitionBatch(RecordBatch incoming);
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class,
PartitionerTemplate.class);
+
+  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = new
TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class);
+
+  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = new
TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
new file mode 100644
index 0000000..981055a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class PartitionerSV2Template implements Partitioner {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class);
+
+  private SelectionVector2 sv2;
+
+  public PartitionerSV2Template() throws SchemaChangeException {
+  }
+
+  @Override
+  public final void setup(FragmentContext context,
+                          RecordBatch incoming,
+                          OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+    this.sv2 = incoming.getSelectionVector2();
+
+    doSetup(context, incoming, outgoing);
+
+  }
+
+  @Override
+  public void partitionBatch(RecordBatch incoming) {
+
+    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+      // for each record
+      doEval(sv2.getIndex(recordId), 0);
+    }
+
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming")
RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
new file mode 100644
index 0000000..2e00f9b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class PartitionerSV4Template implements Partitioner {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class);
+
+  private SelectionVector4 sv4;
+
+  public PartitionerSV4Template() throws SchemaChangeException {
+  }
+
+  @Override
+  public final void setup(FragmentContext context,
+                          RecordBatch incoming,
+                          OutgoingRecordBatch[] outgoing) throws SchemaChangeException {
+
+    this.sv4 = incoming.getSelectionVector4();
+
+    doSetup(context, incoming, outgoing);
+
+  }
+
+  @Override
+  public void partitionBatch(RecordBatch incoming) {
+
+    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
+      // for each record
+      doEval(sv4.get(recordId), 0);
+    }
+
+  }
+
+  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming")
RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index d582684..9756a76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -115,5 +115,10 @@ public class HashToRandomExchangePrel extends SinglePrel {
     return SelectionVectorMode.NONE;
   }
 
+  @Override
+  public SelectionVectorMode[] getSupportedEncodings() {
+    return SelectionVectorMode.ALL;
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e9ac37db/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 83b43fb..1757290 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -49,6 +49,16 @@ public class TestExampleQueries extends BaseTestQuery{
   }
 
   @Test
+  public void testHashPartitionSV2 () throws Exception{
+    test("select count(n_nationkey) from cp.`tpch/nation.parquet` where n_nationkey >
8 group by n_regionkey");
+  }
+
+  @Test
+  public void testHashPartitionSV4 () throws Exception{
+    test("select count(n_nationkey) as cnt from cp.`tpch/nation.parquet` group by n_regionkey
order by cnt");
+  }
+
+  @Test
   public void testSelectWithLimit() throws Exception{
     test("select employee_id,  first_name, last_name from cp.`employee.json` limit 5 ");
   }


Mime
View raw message