beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5404) Inefficient Serialization of Spanner MutationGroup in pipeline
Date Fri, 21 Sep 2018 15:25:01 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5404?focusedWorklogId=146413&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146413
]

ASF GitHub Bot logged work on BEAM-5404:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Sep/18 15:24
            Start Date: 21/Sep/18 15:24
    Worklog Time Spent: 10m 
      Work Description: nielm closed pull request #6407: [BEAM-5404] Use Java serialization
for MutationGroup objects.
URL: https://github.com/apache/beam/pull/6407
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
index 4c97fac5074..491fa41b549 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java
@@ -17,494 +17,58 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.cloud.ByteArray;
 import com.google.cloud.Date;
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Key;
-import com.google.cloud.spanner.KeySet;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Type;
 import com.google.cloud.spanner.Value;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.joda.time.DateTime;
 import org.joda.time.Days;
 import org.joda.time.MutableDateTime;
 
 /** Given the Spanner Schema, efficiently encodes the mutation group. */
 class MutationGroupEncoder {
-  private static final DateTime MIN_DATE = new DateTime(1, 1, 1, 0, 0);
 
+  private static final SerializableCoder<MutationGroup> CODER =
+      SerializableCoder.of(MutationGroup.class);
+  private static final DateTime MIN_DATE = new DateTime(1, 1, 1, 0, 0);
   private final SpannerSchema schema;
-  private final List<String> tables;
-  private final Map<String, Integer> tablesIndexes = new HashMap<>();
 
   public MutationGroupEncoder(SpannerSchema schema) {
     this.schema = schema;
-    tables = schema.getTables();
-
-    for (int i = 0; i < tables.size(); i++) {
-      tablesIndexes.put(tables.get(i).toLowerCase(), i);
-    }
   }
 
   public byte[] encode(MutationGroup g) {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
     try {
-      VarInt.encode(g.attached().size(), bos);
-      for (Mutation m : g) {
-        encodeMutation(bos, m);
-      }
+      CODER.encode(g, bos);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
     return bos.toByteArray();
   }
 
-  private static void setBit(byte[] bytes, int i) {
-    int word = i / 8;
-    int bit = 7 - i % 8;
-    bytes[word] = (byte) (bytes[word] | 1 << bit);
-  }
-
-  private static boolean getBit(byte[] bytes, int i) {
-    int word = i / 8;
-    int bit = 7 - i % 8;
-    return (bytes[word] & 1 << (bit)) != 0;
-  }
-
-  private void encodeMutation(ByteArrayOutputStream bos, Mutation m) throws IOException {
-    Mutation.Op op = m.getOperation();
-    bos.write(op.ordinal());
-    if (op == Mutation.Op.DELETE) {
-      encodeDelete(bos, m);
-    } else {
-      encodeModification(bos, m);
-    }
-  }
-
-  private void encodeDelete(ByteArrayOutputStream bos, Mutation m) throws IOException {
-    String table = m.getTable().toLowerCase();
-    int tableIndex = getTableIndex(table);
-    VarInt.encode(tableIndex, bos);
-    ObjectOutput out = new ObjectOutputStream(bos);
-    out.writeObject(m.getKeySet());
-  }
-
-  private Integer getTableIndex(String table) {
-    Integer result = tablesIndexes.get(table.toLowerCase());
-    checkArgument(result != null, "Unknown table '%s'", table);
-    return result;
-  }
-
-  private Mutation decodeDelete(ByteArrayInputStream bis) throws IOException {
-    int tableIndex = VarInt.decodeInt(bis);
-    String tableName = tables.get(tableIndex);
-
-    ObjectInputStream in = new ObjectInputStream(bis);
-    KeySet keySet;
-    try {
-      keySet = (KeySet) in.readObject();
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-    return Mutation.delete(tableName, keySet);
-  }
-
-  // Encodes a mutation that is not a delete one, using the following format
-  // [bitset of modified columns][value of column1][value of column2][value of column3]...
-  private void encodeModification(ByteArrayOutputStream bos, Mutation m) throws IOException
{
-    String tableName = m.getTable().toLowerCase();
-    int tableIndex = getTableIndex(tableName);
-    VarInt.encode(tableIndex, bos);
-    List<SpannerSchema.Column> columns = schema.getColumns(tableName);
-    checkArgument(columns != null, "Schema for table " + tableName + " not " + "found");
-    Map<String, Value> map = mutationAsMap(m);
-    // java.util.BitSet#toByteArray returns array of unpredictable length. Using byte arrays
-    // instead.
-    int bitsetSize = (columns.size() + 7) / 8;
-    byte[] exists = new byte[bitsetSize];
-    byte[] nulls = new byte[bitsetSize];
-    for (int i = 0; i < columns.size(); i++) {
-      String columnName = columns.get(i).getName();
-      boolean columnExists = map.containsKey(columnName);
-      boolean columnNull = columnExists && map.get(columnName).isNull();
-      if (columnExists) {
-        setBit(exists, i);
-      }
-      if (columnNull) {
-        setBit(nulls, i);
-        map.remove(columnName);
-      }
-    }
-    bos.write(exists);
-    bos.write(nulls);
-    for (int i = 0; i < columns.size(); i++) {
-      if (!getBit(exists, i) || getBit(nulls, i)) {
-        continue;
-      }
-      SpannerSchema.Column column = columns.get(i);
-      Value value = map.remove(column.getName());
-      encodeValue(bos, value);
-    }
-    checkArgument(
-        map.isEmpty(), "Columns %s were not defined in table %s", map.keySet(), m.getTable());
-  }
-
-  private void encodeValue(ByteArrayOutputStream bos, Value value) throws IOException {
-    switch (value.getType().getCode()) {
-      case ARRAY:
-        encodeArray(bos, value);
-        break;
-      default:
-        encodePrimitive(bos, value);
-    }
-  }
-
-  private void encodeArray(ByteArrayOutputStream bos, Value value) throws IOException {
-    // TODO: avoid using Java serialization here.
-    ObjectOutputStream out = new ObjectOutputStream(bos);
-    switch (value.getType().getArrayElementType().getCode()) {
-      case BOOL:
-        {
-          out.writeObject(new ArrayList<>(value.getBoolArray()));
-          break;
-        }
-      case INT64:
-        {
-          out.writeObject(new ArrayList<>(value.getInt64Array()));
-          break;
-        }
-      case FLOAT64:
-        {
-          out.writeObject(new ArrayList<>(value.getFloat64Array()));
-          break;
-        }
-      case STRING:
-        {
-          out.writeObject(new ArrayList<>(value.getStringArray()));
-          break;
-        }
-      case BYTES:
-        {
-          out.writeObject(new ArrayList<>(value.getBytesArray()));
-          break;
-        }
-      case TIMESTAMP:
-        {
-          out.writeObject(new ArrayList<>(value.getTimestampArray()));
-          break;
-        }
-      case DATE:
-        {
-          out.writeObject(new ArrayList<>(value.getDateArray()));
-          break;
-        }
-      default:
-        throw new IllegalArgumentException("Unknown type " + value.getType());
-    }
-  }
-
-  private void encodePrimitive(ByteArrayOutputStream bos, Value value) throws IOException
{
-    switch (value.getType().getCode()) {
-      case BOOL:
-        bos.write(value.getBool() ? 1 : 0);
-        break;
-      case INT64:
-        VarInt.encode(value.getInt64(), bos);
-        break;
-      case FLOAT64:
-        new DataOutputStream(bos).writeDouble(value.getFloat64());
-        break;
-      case STRING:
-        {
-          String str = value.getString();
-          byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
-          VarInt.encode(bytes.length, bos);
-          bos.write(bytes);
-          break;
-        }
-      case BYTES:
-        {
-          ByteArray bytes = value.getBytes();
-          VarInt.encode(bytes.length(), bos);
-          bos.write(bytes.toByteArray());
-          break;
-        }
-      case TIMESTAMP:
-        {
-          Timestamp timestamp = value.getTimestamp();
-          VarInt.encode(timestamp.getSeconds(), bos);
-          VarInt.encode(timestamp.getNanos(), bos);
-          break;
-        }
-      case DATE:
-        {
-          Date date = value.getDate();
-          VarInt.encode(encodeDate(date), bos);
-          break;
-        }
-      default:
-        throw new IllegalArgumentException("Unknown type " + value.getType());
-    }
-  }
-
   public MutationGroup decode(byte[] bytes) {
     ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
 
     try {
-      int numMutations = VarInt.decodeInt(bis);
-      Mutation primary = decodeMutation(bis);
-      List<Mutation> attached = new ArrayList<>(numMutations);
-      for (int i = 0; i < numMutations; i++) {
-        attached.add(decodeMutation(bis));
-      }
-      return MutationGroup.create(primary, attached);
+      return CODER.decode(bis);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
-  private Mutation decodeMutation(ByteArrayInputStream bis) throws IOException {
-    Mutation.Op op = Mutation.Op.values()[bis.read()];
-    if (op == Mutation.Op.DELETE) {
-      return decodeDelete(bis);
-    }
-    return decodeModification(bis, op);
-  }
-
-  private Mutation decodeModification(ByteArrayInputStream bis, Mutation.Op op) throws IOException
{
-    int tableIndex = VarInt.decodeInt(bis);
-    String tableName = tables.get(tableIndex);
-
-    Mutation.WriteBuilder m;
-    switch (op) {
-      case INSERT:
-        m = Mutation.newInsertBuilder(tableName);
-        break;
-      case INSERT_OR_UPDATE:
-        m = Mutation.newInsertOrUpdateBuilder(tableName);
-        break;
-      case REPLACE:
-        m = Mutation.newReplaceBuilder(tableName);
-        break;
-      case UPDATE:
-        m = Mutation.newUpdateBuilder(tableName);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown operation " + op);
-    }
-    List<SpannerSchema.Column> columns = schema.getColumns(tableName);
-    int bitsetSize = (columns.size() + 7) / 8;
-    byte[] exists = readBytes(bis, bitsetSize);
-    byte[] nulls = readBytes(bis, bitsetSize);
-
-    for (int i = 0; i < columns.size(); i++) {
-      if (!getBit(exists, i)) {
-        continue;
-      }
-      SpannerSchema.Column column = columns.get(i);
-      boolean isNull = getBit(nulls, i);
-      Type type = column.getType();
-      String fieldName = column.getName();
-      switch (type.getCode()) {
-        case ARRAY:
-          try {
-            decodeArray(bis, fieldName, type, isNull, m);
-          } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-          }
-          break;
-        default:
-          decodePrimitive(bis, fieldName, type, isNull, m);
-      }
-    }
-    return m.build();
-  }
-
-  private void decodeArray(
-      ByteArrayInputStream bis,
-      String fieldName,
-      Type type,
-      boolean isNull,
-      Mutation.WriteBuilder m)
-      throws IOException, ClassNotFoundException {
-    // TODO: avoid using Java serialization here.
-    switch (type.getArrayElementType().getCode()) {
-      case BOOL:
-        {
-          if (isNull) {
-            m.set(fieldName).toBoolArray((Iterable<Boolean>) null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toBoolArray((List<Boolean>) out.readObject());
-          }
-          break;
-        }
-      case INT64:
-        {
-          if (isNull) {
-            m.set(fieldName).toInt64Array((Iterable<Long>) null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toInt64Array((List<Long>) out.readObject());
-          }
-          break;
-        }
-      case FLOAT64:
-        {
-          if (isNull) {
-            m.set(fieldName).toFloat64Array((Iterable<Double>) null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toFloat64Array((List<Double>) out.readObject());
-          }
-          break;
-        }
-      case STRING:
-        {
-          if (isNull) {
-            m.set(fieldName).toStringArray(null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toStringArray((List<String>) out.readObject());
-          }
-          break;
-        }
-      case BYTES:
-        {
-          if (isNull) {
-            m.set(fieldName).toBytesArray(null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toBytesArray((List<ByteArray>) out.readObject());
-          }
-          break;
-        }
-      case TIMESTAMP:
-        {
-          if (isNull) {
-            m.set(fieldName).toTimestampArray(null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toTimestampArray((List<Timestamp>) out.readObject());
-          }
-          break;
-        }
-      case DATE:
-        {
-          if (isNull) {
-            m.set(fieldName).toDateArray(null);
-          } else {
-            ObjectInputStream out = new ObjectInputStream(bis);
-            m.set(fieldName).toDateArray((List<Date>) out.readObject());
-          }
-          break;
-        }
-      default:
-        throw new IllegalArgumentException("Unknown type " + type);
-    }
-  }
-
-  private void decodePrimitive(
-      ByteArrayInputStream bis,
-      String fieldName,
-      Type type,
-      boolean isNull,
-      Mutation.WriteBuilder m)
-      throws IOException {
-    switch (type.getCode()) {
-      case BOOL:
-        if (isNull) {
-          m.set(fieldName).to((Boolean) null);
-        } else {
-          m.set(fieldName).to(bis.read() != 0);
-        }
-        break;
-      case INT64:
-        if (isNull) {
-          m.set(fieldName).to((Long) null);
-        } else {
-          m.set(fieldName).to(VarInt.decodeLong(bis));
-        }
-        break;
-      case FLOAT64:
-        if (isNull) {
-          m.set(fieldName).to((Double) null);
-        } else {
-          m.set(fieldName).to(new DataInputStream(bis).readDouble());
-        }
-        break;
-      case STRING:
-        {
-          if (isNull) {
-            m.set(fieldName).to((String) null);
-          } else {
-            int len = VarInt.decodeInt(bis);
-            byte[] bytes = readBytes(bis, len);
-            m.set(fieldName).to(new String(bytes, StandardCharsets.UTF_8));
-          }
-          break;
-        }
-      case BYTES:
-        {
-          if (isNull) {
-            m.set(fieldName).to((ByteArray) null);
-          } else {
-            int len = VarInt.decodeInt(bis);
-            byte[] bytes = readBytes(bis, len);
-            m.set(fieldName).to(ByteArray.copyFrom(bytes));
-          }
-          break;
-        }
-      case TIMESTAMP:
-        {
-          if (isNull) {
-            m.set(fieldName).to((Timestamp) null);
-          } else {
-            long seconds = VarInt.decodeLong(bis);
-            int nanoseconds = VarInt.decodeInt(bis);
-            m.set(fieldName).to(Timestamp.ofTimeSecondsAndNanos(seconds, nanoseconds));
-          }
-          break;
-        }
-      case DATE:
-        {
-          if (isNull) {
-            m.set(fieldName).to((Date) null);
-          } else {
-            int days = VarInt.decodeInt(bis);
-            m.set(fieldName).to(decodeDate(days));
-          }
-          break;
-        }
-      default:
-        throw new IllegalArgumentException("Unknown type " + type);
-    }
-  }
-
-  private byte[] readBytes(ByteArrayInputStream bis, int len) throws IOException {
-    byte[] tmp = new byte[len];
-    new DataInputStream(bis).readFully(tmp);
-    return tmp;
-  }
-
   /**
    * Builds a lexicographically sortable binary key based on a primary key descriptor.
    *
@@ -680,12 +244,4 @@ private static int encodeDate(Date date) {
 
     return Days.daysBetween(MIN_DATE, jodaDate).getDays();
   }
-
-  private static Date decodeDate(int daysSinceEpoch) {
-
-    DateTime jodaDate = MIN_DATE.plusDays(daysSinceEpoch);
-
-    return Date.fromYearMonthDay(
-        jodaDate.getYear(), jodaDate.getMonthOfYear(), jodaDate.getDayOfMonth());
-  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
index a600551ed76..c8e2d7d4d89 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoderTest.java
@@ -104,32 +104,6 @@ public void testAllTypesMultipleMutations() throws Exception {
             Mutation.delete("test", KeySet.range(KeyRange.closedClosed(Key.of(1L), Key.of(2L))))));
   }
 
-  @Test
-  public void testUnknownColumn() throws Exception {
-    SpannerSchema.Builder builder = SpannerSchema.builder();
-    builder.addKeyPart("test", "bool_field", false);
-    builder.addColumn("test", "bool_field", "BOOL");
-    SpannerSchema schema = builder.build();
-
-    Mutation mutation = Mutation.newInsertBuilder("test").set("unknown").to(true).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Columns [unknown] were not defined in table test");
-    encodeAndVerify(g(mutation), schema);
-  }
-
-  @Test
-  public void testUnknownTable() throws Exception {
-    SpannerSchema.Builder builder = SpannerSchema.builder();
-    builder.addKeyPart("test", "bool_field", false);
-    builder.addColumn("test", "bool_field", "BOOL");
-    SpannerSchema schema = builder.build();
-
-    Mutation mutation = Mutation.newInsertBuilder("unknown").set("bool_field").to(true).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unknown table 'unknown'");
-    encodeAndVerify(g(mutation), schema);
-  }
-
   @Test
   public void testMutationCaseInsensitive() throws Exception {
     SpannerSchema.Builder builder = SpannerSchema.builder();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 146413)
    Time Spent: 50m  (was: 40m)

> Inefficient Serialization of Spanner MutationGroup in pipeline
> --------------------------------------------------------------
>
>                 Key: BEAM-5404
>                 URL: https://issues.apache.org/jira/browse/BEAM-5404
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>            Reporter: Niel Markwick
>            Assignee: Chamikara Jayalath
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The Cloud Spanner connector uses a custom serialization mechanism to convert MutationGroup
objects into a byte array. 
> This mechanism is very inefficient producing byte arrays approx 10x larger than simple
Java Serialization of the MutationGroup objects, which increases the resources needed by the
connector to ~40x the size of the original mutations.
> There are no obvious benefits to using this custom serialization system, as the objects
are deserialized within the pipeline itself. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message