kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/3] incubator-kudu git commit: KUDU-1004. java client: Supply byte array, get back ByteBuffer
Date Tue, 05 Jan 2016 18:53:13 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 07bededa7 -> 63b9268f9


KUDU-1004. java client: Supply byte array, get back ByteBuffer

This patch adds the missing methods, which is simple, but to handle
BBs correctly we need to stop storing byte[]s inside PartialRow.

This patch does that refactoring and augments the unit tests.

Change-Id: Ia9b4a743a42e034ca1accb1966e1a61f1f54ae45
Reviewed-on: http://gerrit.cloudera.org:8080/1647
Reviewed-by: Mike Percy <mpercy@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/170c32c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/170c32c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/170c32c0

Branch: refs/heads/master
Commit: 170c32c08dc8afb9c3d0e067f7ba35244ed42e2e
Parents: 07beded
Author: Jean-Daniel Cryans <jdcryans@cloudera.com>
Authored: Wed Dec 16 10:15:03 2015 -0800
Committer: Jean-Daniel Cryans <jdcryans@gerrit.cloudera.org>
Committed: Wed Dec 23 16:42:21 2015 +0000

----------------------------------------------------------------------
 .../main/java/org/kududb/client/KeyEncoder.java | 58 +++++++--------
 .../main/java/org/kududb/client/Operation.java  | 34 +++++----
 .../main/java/org/kududb/client/PartialRow.java | 77 +++++++++++++++++---
 .../main/java/org/kududb/client/RowResult.java  |  6 +-
 .../java/org/kududb/client/BaseKuduTest.java    |  3 +-
 .../java/org/kududb/client/TestRowResult.java   | 27 ++++---
 6 files changed, 135 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java b/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
index 5ec1359..b54828d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KeyEncoder.java
@@ -110,24 +110,42 @@ class KeyEncoder {
     final Type type = column.getType();
 
     if (type == Type.STRING || type == Type.BINARY) {
-      addComponent(row.getVarLengthData().get(columnIdx), type, isLast);
+      addBinaryComponent(row.getVarLengthData().get(columnIdx), isLast);
     } else {
       addComponent(row.getRowAlloc(),
                    schema.getColumnOffset(columnIdx),
                    type.getSize(),
-                   type,
-                   isLast);
+                   type);
     }
   }
 
   /**
-   * Encodes a value of the given type into the key.
+   * Encodes a byte buffer into the key.
    * @param value the value to encode
-   * @param type the type of the value to encode
    * @param isLast whether the value is the final component in the key
    */
-  private void addComponent(byte[] value, Type type, boolean isLast) {
-    addComponent(value, 0, value.length, type, isLast);
+  private void addBinaryComponent(ByteBuffer value, boolean isLast) {
+    value.reset();
+
+    // TODO find a way to not have to read byte-by-byte that doesn't require extra copies.
This is
+    // especially slow now that users can pass direct byte buffers.
+    while (value.hasRemaining()) {
+      byte currentByte = value.get();
+      buf.write(currentByte);
+      if (!isLast && currentByte == 0x00) {
+        // If we're a middle component of a composite key, we need to add a \x00
+        // at the end in order to separate this component from the next one. However,
+        // if we just did that, we'd have issues where a key that actually has
+        // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
+        // encode \x00 as \x00\x01. -- key_encoder.h
+        buf.write(0x01);
+      }
+    }
+
+    if (!isLast) {
+      buf.write(0x00);
+      buf.write(0x00);
+    }
   }
 
   /**
@@ -136,9 +154,8 @@ class KeyEncoder {
    * @param offset the offset into the {@code value} buffer that the value begins
    * @param len the length of the value
    * @param type the type of the value to encode
-   * @param isLast whether the value is the final component in the key
    */
-  private void addComponent(byte[] value, int offset, int len, Type type, boolean isLast)
{
+  private void addComponent(byte[] value, int offset, int len, Type type) {
     switch (type) {
       case BOOL:
         assert len == 1;
@@ -149,7 +166,7 @@ class KeyEncoder {
       case INT32:
       case INT64:
       case TIMESTAMP:
-        // picking the first byte because big endian
+        // Picking the first byte because big endian.
         byte lastByte = value[offset + (len - 1)];
         lastByte = Bytes.xorLeftMostBit(lastByte);
         buf.write(lastByte);
@@ -159,27 +176,6 @@ class KeyEncoder {
           }
         }
         break;
-      case BINARY:
-      case STRING:
-        // if this is the last component, just add
-        if (isLast) {
-          buf.write(value, offset, len);
-        } else {
-          // If we're a middle component of a composite key, we need to add a \x00
-          // at the end in order to separate this component from the next one. However,
-          // if we just did that, we'd have issues where a key that actually has
-          // \x00 in it would compare wrong, so we have to instead add \x00\x00, and
-          // encode \x00 as \x00\x01. -- key_encoder.h
-          for (int b = offset; b < (offset + len); b++) {
-            buf.write(value[b]);
-            if (value[b] == 0x00) {
-              buf.write(0x01);
-            }
-          }
-          buf.write(0x00);
-          buf.write(0x00);
-        }
-        break;
       default:
         throw new IllegalArgumentException(String.format(
             "The column type %s is not a valid key component type", type));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 7c7e244..741acb3 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -13,7 +13,6 @@
 // limitations under the License.
 package org.kududb.client;
 
-import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 import com.google.protobuf.ZeroCopyLiteralByteString;
@@ -29,9 +28,9 @@ import org.kududb.util.Pair;
 import org.jboss.netty.buffer.ChannelBuffer;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -170,7 +169,10 @@ public abstract class Operation extends KuduRpc<OperationResponse>
implements Ku
   static class OperationsEncoder {
     private Schema schema;
     private ByteBuffer rows;
-    private ByteArrayOutputStream indirect;
+    // We're filling this list as we go through the operations in encodeRow() and at the
same time
+    // compute the total size, which will be used to right-size the array in toPB().
+    private List<ByteBuffer> indirect;
+    private long indirectWrittenBytes;
 
     /**
      * Initializes the state of the encoder based on the schema and number of operations
to encode.
@@ -195,7 +197,7 @@ public abstract class Operation extends KuduRpc<OperationResponse>
implements Ku
       // instead of a doubling buffer like BAOS.
       this.rows = ByteBuffer.allocate(sizePerRow * numOperations)
                             .order(ByteOrder.LITTLE_ENDIAN);
-      this.indirect = new ByteArrayOutputStream();
+      this.indirect = new ArrayList<>(schema.getVarLengthColumnCount() * numOperations);
     }
 
     /**
@@ -214,7 +216,14 @@ public abstract class Operation extends KuduRpc<OperationResponse>
implements Ku
       if (indirect.size() > 0) {
         // TODO: same as above, we could avoid a copy here by using an implementation that
allows
         // zero-copy on a slice of an array.
-        rowOpsBuilder.setIndirectData(ZeroCopyLiteralByteString.wrap(indirect.toByteArray()));
+        byte[] indirectData = new byte[(int)indirectWrittenBytes];
+        int offset = 0;
+        for (ByteBuffer bb : indirect) {
+          int bbSize = bb.remaining();
+          bb.get(indirectData, offset, bbSize);
+          offset += bbSize;
+        }
+        rowOpsBuilder.setIndirectData(ZeroCopyLiteralByteString.wrap(indirectData));
       }
       return rowOpsBuilder.build();
     }
@@ -232,14 +241,13 @@ public abstract class Operation extends KuduRpc<OperationResponse>
implements Ku
         // Keys should always be specified, maybe check?
         if (row.isSet(colIdx) && !row.isSetToNull(colIdx)) {
           if (col.getType() == Type.STRING || col.getType() == Type.BINARY) {
-            byte[] varLengthData = row.getVarLengthData().get(colIdx);
-            rows.putLong(indirect.size());
-            rows.putLong(varLengthData.length);
-            try {
-              indirect.write(varLengthData);
-            } catch (IOException e) {
-              throw new AssertionError(e); // cannot occur
-            }
+            ByteBuffer varLengthData = row.getVarLengthData().get(colIdx);
+            varLengthData.reset();
+            rows.putLong(indirectWrittenBytes);
+            int bbSize = varLengthData.remaining();
+            rows.putLong(bbSize);
+            indirect.add(varLengthData);
+            indirectWrittenBytes += bbSize;
           } else {
             // This is for cols other than strings
             rows.put(rowData, currentRowOffset, col.getType().getSize());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java b/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
index c3ae1ab..c13e894 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/PartialRow.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package org.kududb.client;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
@@ -25,24 +26,31 @@ import org.kududb.annotations.InterfaceAudience;
 import org.kududb.annotations.InterfaceStability;
 
 /**
- * Class used to represent parts of a row along with its schema.
+ * Class used to represent parts of a row along with its schema.<p>
  *
  * Values can be replaced as often as needed, but once the enclosing {@link Operation} is
applied
- * then they cannot be changed again. This means that a PartialRow cannot be reused.
+ * then they cannot be changed again. This means that a PartialRow cannot be reused.<p>
  *
  * Each PartialRow is backed by an byte array where all the cells (except strings and binary
data)
- * are written. The others are kept in a List.
+ * are written. The others are kept in a List.<p>
+ *
+ * This class isn't thread-safe.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class PartialRow {
 
   private final Schema schema;
-  // Variable length data. If string, will be UTF-8 encoded.
-  private final List<byte[]> varLengthData;
+
+  // Variable length data. If string, will be UTF-8 encoded. Elements of this list _must_
have a
+  // mark that we can reset() to. Readers of these fields (encoders, etc) must call reset()
before
+  // attempting to read these values.
+  private final List<ByteBuffer> varLengthData;
   private final byte[] rowAlloc;
+
   private final BitSet columnsBitSet;
   private final BitSet nullsBitSet;
+
   private boolean frozen = false;
 
   /**
@@ -57,7 +65,7 @@ public class PartialRow {
         new BitSet(this.schema.getColumnCount()) : null;
     this.rowAlloc = new byte[schema.getRowSize()];
     // Pre-fill the array with nulls. We'll only replace cells that have varlen values.
-    this.varLengthData = Arrays.asList(new byte[this.schema.getColumnCount()][]);
+    this.varLengthData = Arrays.asList(new ByteBuffer[this.schema.getColumnCount()]);
   }
 
   /**
@@ -68,8 +76,19 @@ public class PartialRow {
     this.schema = row.schema;
 
     this.varLengthData = Lists.newArrayListWithCapacity(row.varLengthData.size());
-    for (byte[] data: row.varLengthData) {
-      this.varLengthData.add(data == null ? null : data.clone());
+    for (ByteBuffer data: row.varLengthData) {
+      if (data == null) {
+        this.varLengthData.add(null);
+      } else {
+        data.reset();
+        // Deep copy the ByteBuffer.
+        ByteBuffer clone = ByteBuffer.allocate(data.remaining());
+        clone.put(data);
+        clone.flip();
+
+        clone.mark(); // We always expect a mark.
+        this.varLengthData.add(clone);
+      }
     }
 
     this.rowAlloc = row.rowAlloc.clone();
@@ -326,6 +345,21 @@ public class PartialRow {
   }
 
   /**
+   * Add binary data with the specified value, from the current ByteBuffer's position to
its limit.
+   * This method duplicates the ByteBuffer but doesn't copy the data. This means that the
wrapped
+   * data must not be mutated after this.
+   * @param columnIndex the column's index in the schema
+   * @param value byte buffer to get the value from
+   * @throws IllegalArgumentException if the column doesn't exist or if the value doesn't
match
+   * the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addBinary(int columnIndex, ByteBuffer value) {
+    checkColumn(schema.getColumnByIndex(columnIndex), Type.BINARY);
+    addVarLengthData(columnIndex, value);
+  }
+
+  /**
    * Add binary data with the specified value.
    * Note that the provided value must not be mutated after this.
    * @param columnName Name of the column
@@ -338,8 +372,31 @@ public class PartialRow {
     addBinary(schema.getColumnIndex(columnName), val);
   }
 
+  /**
+   * Add binary data with the specified value, from the current ByteBuffer's position to
its limit.
+   * This method duplicates the ByteBuffer but doesn't copy the data. This means that the
wrapped
+   * data must not be mutated after this.
+   * @param columnName Name of the column
+   * @param value byte buffer to get the value from
+   * @throws IllegalArgumentException if the column doesn't exist or if the value doesn't
match
+   * the column's type
+   * @throws IllegalStateException if the row was already applied
+   */
+  public void addBinary(String columnName, ByteBuffer value) {
+    addBinary(schema.getColumnIndex(columnName), value);
+  }
+
   private void addVarLengthData(int columnIndex, byte[] val) {
-    varLengthData.set(columnIndex, val);
+    addVarLengthData(columnIndex, ByteBuffer.wrap(val));
+  }
+
+  private void addVarLengthData(int columnIndex, ByteBuffer val) {
+    // A duplicate will copy all the original's metadata but still point to the same content.
+    ByteBuffer duplicate = val.duplicate();
+    // Mark the current position so we can reset to it.
+    duplicate.mark();
+
+    varLengthData.set(columnIndex, duplicate);
     // Set the usage bit but we don't care where it is.
     getPositionInRowAllocAndSetBitSet(columnIndex);
     // We don't set anything in row alloc, it will be managed at encoding time.
@@ -464,7 +521,7 @@ public class PartialRow {
    * Get the list variable length data cells that were added to this row.
    * @return a list of binary data, may be empty
    */
-  List<byte[]> getVarLengthData() {
+  List<ByteBuffer> getVarLengthData() {
     return varLengthData;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RowResult.java b/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
index 2f005ad..503367f 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/RowResult.java
@@ -363,8 +363,7 @@ public class RowResult {
   /**
    * Get the specified column's binary data.
    *
-   * This doesn't copy the data and instead returns a ByteBuffer that wraps it. The ByteBuffer
-   * is backed by 'indirectData' in this RowResult.
+   * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
    *
    * @param columnName name of the column to get data for
    * @return a byte[] with the binary data.
@@ -378,8 +377,7 @@ public class RowResult {
   /**
    * Get the specified column's binary data.
    *
-   * This doesn't copy the data and instead returns a ByteBuffer that wraps it. The ByteBuffer
-   * is backed by 'indirectData' in this RowResult.
+   * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
    *
    * @param columnIndex Column index in the schema
    * @return a byte[] with the binary data.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
index e3d8435..f1d1ca1 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/BaseKuduTest.java
@@ -240,7 +240,8 @@ public class BaseKuduTest {
             new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(),
             new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(),
             new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(),
-            new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-array", Type.BINARY).build(),
+            new ColumnSchema.ColumnSchemaBuilder("binary-bytebuffer", Type.BINARY).build(),
             new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.TIMESTAMP).build());
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/170c32c0/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java b/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
index 840958a..ff19917 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestRowResult.java
@@ -52,9 +52,12 @@ public class TestRowResult extends BaseKuduTest {
     row.addFloat(5, 5.6f);
     row.addDouble(6, 7.8);
     row.addString(7, "string-value");
-    row.addBinary(8, "binary-value".getBytes());
-    row.setNull(9);
-    row.addLong(10, 10l);
+    row.addBinary(8, "binary-array".getBytes());
+    ByteBuffer bb = ByteBuffer.wrap("binary-bytebuffer".getBytes());
+    bb.position(7); // We're only inserting the bytebuffer part of the original array.
+    row.addBinary(9, bb);
+    row.setNull(10);
+    row.addLong(11, 11l);
 
     KuduSession session = syncClient.newSession();
     session.apply(insert);
@@ -89,21 +92,23 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals("string-value", rr.getString(7));
       assertEquals("string-value", rr.getString(allTypesSchema.getColumnByIndex(7).getName()));
 
-      assertArrayEquals("binary-value".getBytes(), rr.getBinaryCopy(8));
-      assertArrayEquals("binary-value".getBytes(),
+      assertArrayEquals("binary-array".getBytes(), rr.getBinaryCopy(8));
+      assertArrayEquals("binary-array".getBytes(),
           rr.getBinaryCopy(allTypesSchema.getColumnByIndex(8).getName()));
 
       ByteBuffer buffer = rr.getBinary(8);
       assertEquals(buffer, rr.getBinary(allTypesSchema.getColumnByIndex(8).getName()));
       byte[] binaryValue = new byte[buffer.remaining()];
       buffer.get(binaryValue);
-      assertArrayEquals("binary-value".getBytes(), binaryValue);
+      assertArrayEquals("binary-array".getBytes(), binaryValue);
 
-      assertEquals(true, rr.isNull(9));
-      assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(9).getName()));
+      assertArrayEquals("bytebuffer".getBytes(), rr.getBinaryCopy(9));
 
-      assertEquals(10, rr.getLong(10));
-      assertEquals(10, rr.getLong(allTypesSchema.getColumnByIndex(10).getName()));
+      assertEquals(true, rr.isNull(10));
+      assertEquals(true, rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
+
+      assertEquals(11, rr.getLong(11));
+      assertEquals(11, rr.getLong(allTypesSchema.getColumnByIndex(11).getName()));
 
       // We test with the column name once since it's the same method for all types, unlike
above.
       assertEquals(Type.INT8, rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
@@ -116,7 +121,7 @@ public class TestRowResult extends BaseKuduTest {
       assertEquals(Type.DOUBLE, rr.getColumnType(6));
       assertEquals(Type.STRING, rr.getColumnType(7));
       assertEquals(Type.BINARY, rr.getColumnType(8));
-      assertEquals(Type.TIMESTAMP, rr.getColumnType(10));
+      assertEquals(Type.TIMESTAMP, rr.getColumnType(11));
     }
   }
 }


Mime
View raw message