drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [14/14] git commit: DRILL-781: Use MapVector as the top level vector for HBase Column Families
Date Tue, 20 May 2014 03:02:02 GMT
DRILL-781: Use MapVector as the top level vector for HBase Column Families


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d7e3d3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d7e3d3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d7e3d3a

Branch: refs/heads/master
Commit: 5d7e3d3ab548eb2b23607df46ea843a9c1532b72
Parents: e9e63c4
Author: Aditya Kishore <aditya@maprtech.com>
Authored: Mon May 19 15:28:09 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon May 19 18:06:36 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 160 ++++++-------------
 .../exec/store/hbase/HBaseSchemaFactory.java    |   1 -
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   1 -
 .../drill/hbase/TestHBaseFilterPushDown.java    |  16 +-
 .../drill/hbase/TestHBaseProjectPushDown.java   |   7 +-
 .../drill/exec/record/MaterializedField.java    |   3 +-
 6 files changed, 69 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index ae9f833..439f97f 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -50,17 +52,17 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class HBaseRecordReader implements RecordReader, DrillHBaseConstants {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class);
 
   private static final int TARGET_RECORD_COUNT = 4000;
 
-  private List<SchemaPath> columns;
+  private LinkedHashSet<SchemaPath> columns;
   private OutputMutator outputMutator;
 
-  private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
+  private Map<String, MapVector> familyVectorMap;
   private VarBinaryVector rowKeyVector;
   private SchemaPath rowKeySchemaPath;
 
@@ -78,34 +80,29 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
     hbaseTable = subScanSpec.getTableName();
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
     boolean rowKeyOnly = true;
+    this.columns = Sets.newLinkedHashSet();
     if (projectedColumns != null && projectedColumns.size() != 0) {
-      /*
-       * This will change once the non-scaler value vectors are available.
-       * Then, each column family will have a single top level value vector
-       * and each column will be an item vector in its corresponding TLV.
-       */
-      this.columns = Lists.newArrayList(projectedColumns);
-      Iterator<SchemaPath> columnIterator = columns.iterator();
+      Iterator<SchemaPath> columnIterator = projectedColumns.iterator();
       while(columnIterator.hasNext()) {
         SchemaPath column = columnIterator.next();
-        if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
+        if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) {
           rowKeySchemaPath = ROW_KEY_PATH;
+          this.columns.add(rowKeySchemaPath);
           continue;
         }
         rowKeyOnly = false;
         NameSegment root = column.getRootSegment();
-        byte[] family = root.getPath().toString().getBytes();
+        byte[] family = root.getPath().getBytes();
+        this.columns.add(SchemaPath.getSimplePath(root.getPath()));
         PathSegment child = root.getChild();
         if (child != null && child.isNamed()) {
-          byte[] qualifier = child.getNameSegment().getPath().toString().getBytes();
+          byte[] qualifier = child.getNameSegment().getPath().getBytes();
           hbaseScan.addColumn(family, qualifier);
         } else {
-          columnIterator.remove();
           hbaseScan.addFamily(family);
         }
       }
     } else {
-      this.columns = Lists.newArrayList();
       rowKeyOnly = false;
       rowKeySchemaPath = ROW_KEY_PATH;
       this.columns.add(rowKeySchemaPath);
@@ -128,16 +125,16 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;
-    vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>();
+    familyVectorMap = new HashMap<String, MapVector>();
 
     try {
       // Add Vectors to output in the order specified when creating reader
       for (SchemaPath column : columns) {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY));
-          rowKeyVector = output.addField(field, VarBinaryVector.class);
-        } else if (column.getRootSegment().getChild() != null) {
-          getOrCreateColumnVector(new FamilyQualifierWrapper(column), false);
+          rowKeyVector = outputMutator.addField(field, VarBinaryVector.class);
+        } else {
+          getOrCreateFamilyVector(column.getRootSegment().getPath(), false);
         }
       }
       logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}',
znode '{}'.",
@@ -158,12 +155,14 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
       rowKeyVector.clear();
       rowKeyVector.allocateNew();
     }
-    for (ValueVector v : vvMap.values()) {
+    for (ValueVector v : familyVectorMap.values()) {
       v.clear();
       v.allocateNew();
     }
 
-    for (int count = 0; count < TARGET_RECORD_COUNT; count++) {
+    int rowCount = 0;
+    done:
+    for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
       Result result = null;
       try {
         if (leftOver != null) {
@@ -176,54 +175,54 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
         throw new DrillRuntimeException(e);
       }
       if (result == null) {
-        setOutputValueCount(count);
-        logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS),
count);
-        return count;
+        break done;
       }
 
       // parse the result and populate the value vectors
       KeyValue[] kvs = result.raw();
       byte[] bytes = result.getBytes().get();
       if (rowKeyVector != null) {
-        if (!rowKeyVector.getMutator().setSafe(count, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength()))
{
-          setOutputValueCount(count);
+        if (!rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength()))
{
           leftOver = result;
-          logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS),
count);
-          return count;
+          break done;
         }
       }
+
       for (KeyValue kv : kvs) {
         int familyOffset = kv.getFamilyOffset();
         int familyLength = kv.getFamilyLength();
+        MapVector mv = getOrCreateFamilyVector(new String(bytes, familyOffset, familyLength),
true);
+
         int qualifierOffset = kv.getQualifierOffset();
         int qualifierLength = kv.getQualifierLength();
+        NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(bytes, qualifierOffset,
qualifierLength));
+
         int valueOffset = kv.getValueOffset();
         int valueLength = kv.getValueLength();
-        NullableVarBinaryVector v = getOrCreateColumnVector(
-            new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset,
qualifierLength), true);
-        if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) {
-          setOutputValueCount(count);
+        if (!v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength)) {
           leftOver = result;
-          logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS),
count);
-          return count;
+          return rowCount;
         }
       }
     }
-    setOutputValueCount(TARGET_RECORD_COUNT);
-    logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), TARGET_RECORD_COUNT);
-    return TARGET_RECORD_COUNT;
+
+    setOutputRowCount(rowCount);
+    logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+    return rowCount;
   }
 
-  private NullableVarBinaryVector getOrCreateColumnVector(FamilyQualifierWrapper column,
boolean allocateOnCreate) {
+  private MapVector getOrCreateFamilyVector(String familyName, boolean allocateOnCreate)
{
     try {
-      NullableVarBinaryVector v = vvMap.get(column);
+      MapVector v = familyVectorMap.get(familyName);
       if(v == null) {
-        MaterializedField field = MaterializedField.create(column.asSchemaPath(), Types.optional(TypeProtos.MinorType.VARBINARY));
-        v = outputMutator.addField(field, NullableVarBinaryVector.class);
+        SchemaPath column = SchemaPath.getSimplePath(familyName);
+        MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.MAP));
+        v = outputMutator.addField(field, MapVector.class);
         if (allocateOnCreate) {
           v.allocateNew();
         }
-        vvMap.put(column, v);
+        columns.add(column);
+        familyVectorMap.put(familyName, v);
       }
       return v;
     } catch (SchemaChangeException e) {
@@ -231,6 +230,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
     }
   }
 
+  private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qualifier)
{
+    int oldSize = mv.size();
+    NullableVarBinaryVector v = mv.addOrGet(qualifier, Types.optional(TypeProtos.MinorType.VARBINARY),
NullableVarBinaryVector.class);
+    if (oldSize != mv.size()) {
+      v.allocateNew();
+    }
+    return v;
+  }
+
   @Override
   public void cleanup() {
     try {
@@ -245,8 +253,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
     }
   }
 
-  private void setOutputValueCount(int count) {
-    for (ValueVector vv : vvMap.values()) {
+  private void setOutputRowCount(int count) {
+    for (ValueVector vv : familyVectorMap.values()) {
       vv.getMutator().setValueCount(count);
     }
     if (rowKeyVector != null) {
@@ -254,68 +262,4 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants
{
     }
   }
 
-  private static class FamilyQualifierWrapper implements Comparable<FamilyQualifierWrapper>
{
-    int hashCode;
-    protected String stringVal;
-    protected String family;
-    protected String qualifier;
-
-    public FamilyQualifierWrapper(SchemaPath column) {
-      this(column.getRootSegment().getPath(), column.getRootSegment().getChild().getNameSegment().getPath());
-    }
-
-    public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int familyLength, int qualifierOffset,
int qualifierLength) {
-      this(new String(bytes, familyOffset, familyLength), new String(bytes, qualifierOffset,
qualifierLength));
-    }
-
-    public FamilyQualifierWrapper(String family, String qualifier) {
-      this.family = family;
-      this.qualifier = qualifier;
-      hashCode = 31*family.hashCode() + qualifier.hashCode();
-    }
-
-    @Override
-    public int hashCode() {
-      return this.hashCode;
-    }
-
-    @Override
-    public boolean equals(Object anObject) {
-      if (this == anObject) {
-        return true;
-      }
-      if (anObject instanceof FamilyQualifierWrapper) {
-        FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject;
-        // we compare qualifier first since many columns will have same family
-        if (!qualifier.equals(that.qualifier)) {
-          return false;
-        }
-        return family.equals(that.family);
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() {
-      if (stringVal == null) {
-        stringVal = new StringBuilder().append(new String(family)).append(".").append(new
String(qualifier)).toString();
-      }
-      return stringVal;
-    }
-
-    public SchemaPath asSchemaPath() {
-      return SchemaPath.getCompoundPath(family, qualifier);
-    }
-
-    @Override
-    public int compareTo(FamilyQualifierWrapper o) {
-      int val = family.compareTo(o.family);
-      if (val != 0) {
-        return val;
-      }
-      return qualifier.compareTo(o.qualifier);
-    }
-
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
index ce3b9fd..84f363b 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.SchemaFactory;
-import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 3881f4d..e30f79e 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 76300b7..90404b7 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -17,11 +17,6 @@
  */
 package org.apache.drill.hbase;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestHBaseFilterPushDown extends BaseHBaseTest {
@@ -49,6 +44,17 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   }
 
   @Test
+  public void testFilterPushDownRowKeyBetween() throws Exception {
+    runSQLVerifyCount("SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  row_key BETWEEN 'a2' AND 'b4'"
+        , 3);
+  }
+
+  @Test
   public void testFilterPushDownMultiColumns() throws Exception {
     runSQLVerifyCount("SELECT\n"
         + "  *\n"

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index 88194d5..b66d2ed 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -33,7 +33,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   @Test
   public void testColumnWith1RowPushDown() throws Exception{
     runSQLVerifyCount("SELECT\n"
-        + "f2['c7']\n"
+        + "f2['c7'] as `f[c7]`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 1);
@@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   public void testRowKeyAndColumnPushDown() throws Exception{
     setColumnWidth(9);
     runSQLVerifyCount("SELECT\n"
-        + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as
`'abc'`\n"
+        + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f[c2]`, 5 as `5`, 'abc' as `'abc'`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 6);
@@ -51,8 +51,9 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
 
   @Test
   public void testColumnFamilyPushDown() throws Exception{
+    setColumnWidth(74);
     runSQLVerifyCount("SELECT\n"
-        + "f\n"
+        + "f, f2\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
         , 6);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 439552f..3d749d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 
@@ -202,7 +203,7 @@ public class MaterializedField{
 
   @Override
   public String toString() {
-    return "MaterializedField [path=" + path + ", type=" + type + "]";
+    return "MaterializedField [path=" + path + ", type=" + Types.toString(type) + "]";
   }
 
   public String toExpr(){


Mime
View raw message