beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ming...@apache.org
Subject [60/66] [abbrv] beam git commit: [BEAM-2744] rename BeamRecordType#size()
Date Mon, 11 Sep 2017 20:19:43 GMT
[BEAM-2744] rename BeamRecordType#size()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3931bbb5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3931bbb5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3931bbb5

Branch: refs/heads/master
Commit: 3931bbb5179c704285218c9126e885b282fda047
Parents: 2f99bf6
Author: James Xu <xumingmingv@gmail.com>
Authored: Tue Aug 8 15:15:59 2017 +0800
Committer: mingmxu <mingmxu@ebay.com>
Committed: Mon Sep 11 10:56:58 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BeamRecordCoder.java | 12 +++++------
 .../org/apache/beam/sdk/values/BeamRecord.java  | 22 +++++---------------
 .../apache/beam/sdk/values/BeamRecordType.java  |  7 ++++++-
 .../extensions/sql/impl/rel/BeamJoinRel.java    |  2 +-
 .../extensions/sql/impl/rel/BeamValuesRel.java  |  2 +-
 .../sql/impl/transform/BeamJoinTransforms.java  |  4 ++--
 .../extensions/sql/schema/BeamTableUtils.java   | 10 ++++-----
 .../sql/BeamSqlDslAggregationTest.java          | 16 +++++++-------
 .../beam/sdk/extensions/sql/TestUtils.java      |  6 +++---
 9 files changed, 38 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
index 4e24b82..cbed87d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java
@@ -43,7 +43,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   }
 
   public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){
-    if (recordType.size() != coderArray.size()) {
+    if (recordType.getFieldCount() != coderArray.size()) {
       throw new IllegalArgumentException("Coder size doesn't match with field size");
     }
     return new BeamRecordCoder(recordType, coderArray);
@@ -57,7 +57,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   public void encode(BeamRecord value, OutputStream outStream)
       throws CoderException, IOException {
     nullListCoder.encode(scanNullFields(value), outStream);
-    for (int idx = 0; idx < value.size(); ++idx) {
+    for (int idx = 0; idx < value.getFieldCount(); ++idx) {
       if (value.getFieldValue(idx) == null) {
         continue;
       }
@@ -70,8 +70,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
   public BeamRecord decode(InputStream inStream) throws CoderException, IOException {
     BitSet nullFields = nullListCoder.decode(inStream);
 
-    List<Object> fieldValues = new ArrayList<>(recordType.size());
-    for (int idx = 0; idx < recordType.size(); ++idx) {
+    List<Object> fieldValues = new ArrayList<>(recordType.getFieldCount());
+    for (int idx = 0; idx < recordType.getFieldCount(); ++idx) {
       if (nullFields.get(idx)) {
         fieldValues.add(null);
       } else {
@@ -87,8 +87,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> {
    * Scan {@link BeamRecord} to find fields with a NULL value.
    */
   private BitSet scanNullFields(BeamRecord record){
-    BitSet nullFields = new BitSet(record.size());
-    for (int idx = 0; idx < record.size(); ++idx) {
+    BitSet nullFields = new BitSet(record.getFieldCount());
+    for (int idx = 0; idx < record.getFieldCount(); ++idx) {
       if (record.getFieldValue(idx) == null) {
         nullFields.set(idx);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
index a3ede3c..fa3b574 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java
@@ -52,13 +52,13 @@ public class BeamRecord implements Serializable {
     }
 
     this.dataType = dataType;
-    this.dataValues = new ArrayList<>(dataType.size());
+    this.dataValues = new ArrayList<>(dataType.getFieldCount());
 
-    for (int idx = 0; idx < dataType.size(); ++idx) {
+    for (int idx = 0; idx < dataType.getFieldCount(); ++idx) {
       dataValues.add(null);
     }
 
-    for (int idx = 0; idx < dataType.size(); ++idx) {
+    for (int idx = 0; idx < dataType.getFieldCount(); ++idx) {
       addField(idx, rawDataValues.get(idx));
     }
   }
@@ -168,7 +168,7 @@ public class BeamRecord implements Serializable {
     return (Boolean) getFieldValue(idx);
   }
 
-  public int size() {
+  public int getFieldCount() {
     return dataValues.size();
   }
 
@@ -182,19 +182,7 @@ public class BeamRecord implements Serializable {
 
   @Override
   public String toString() {
-    return "BeamSqlRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
-  }
-
-  /**
-   * Return data fields as key=value.
-   */
-  public String valueInString() {
-    StringBuilder sb = new StringBuilder();
-    for (int idx = 0; idx < size(); ++idx) {
-      sb.append(
-          String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx)));
-    }
-    return sb.substring(1);
+    return "BeamRecord [dataValues=" + dataValues + ", dataType=" + dataType + "]";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
index 6ab783c..29cc80d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java
@@ -65,7 +65,12 @@ public class BeamRecordType implements Serializable{
      return fieldNames.indexOf(fieldName);
    }
 
-  public int size(){
+  public int getFieldCount(){
     return fieldNames.size();
   }
+
+  @Override
+  public String toString() {
+    return "BeamRecordType [fieldsName=" + fieldNames + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index 9dceb25..5ac9575 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -256,7 +256,7 @@ public class BeamJoinRel extends Join implements BeamRelNode {
 
   private BeamRecord buildNullRow(BeamRelNode relNode) {
     BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
-    return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null));
+    return new BeamRecord(leftType, Collections.nCopies(leftType.getFieldCount(), null));
   }
 
   private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount)
{

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
index fde002e..c4caff3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java
@@ -65,7 +65,7 @@ public class BeamValuesRel extends Values implements BeamRelNode {
 
     BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size());
+      List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.getFieldCount());
       for (int i = 0; i < tuple.size(); i++) {
         fieldsValue.add(BeamTableUtils.autoCastField(
             beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue()));

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 9a48c53..7a8d10d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -145,11 +145,11 @@ public class BeamJoinTransforms {
   private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow,
       BeamRecord rightRow) {
     // build the type
-    List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+    List<String> names = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount());
     names.addAll(leftRow.getDataType().getFieldNames());
     names.addAll(rightRow.getDataType().getFieldNames());
 
-    List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+    List<Integer> types = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount());
     types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes());
     types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes());
     BeamRecordSqlType type = BeamRecordSqlType.create(names, types);

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
index 99f9522..687a082 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java
@@ -41,18 +41,18 @@ public final class BeamTableUtils {
       CSVFormat csvFormat,
       String line,
       BeamRecordSqlType beamRecordSqlType) {
-    List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size());
+    List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount());
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
 
-      if (rawRecord.size() != beamRecordSqlType.size()) {
+      if (rawRecord.size() != beamRecordSqlType.getFieldCount()) {
         throw new IllegalArgumentException(String.format(
             "Expect %d fields, but actually %d",
-            beamRecordSqlType.size(), rawRecord.size()
+            beamRecordSqlType.getFieldCount(), rawRecord.size()
         ));
       } else {
-        for (int idx = 0; idx < beamRecordSqlType.size(); idx++) {
+        for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) {
           String raw = rawRecord.get(idx);
           fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw));
         }
@@ -66,7 +66,7 @@ public final class BeamTableUtils {
   public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) {
     StringWriter writer = new StringWriter();
     try (CSVPrinter printer = csvFormat.print(writer)) {
-      for (int i = 0; i < row.size(); i++) {
+      for (int i = 0; i < row.getFieldCount(); i++) {
         printer.print(row.getFieldValue(i).toString());
       }
       printer.println();

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index 4e74dbb..db562da 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -49,7 +49,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception
{
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+    String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2";
 
     PCollection<BeamRecord> result =
         input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
@@ -57,6 +57,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
+
     BeamRecord record = new BeamRecord(resultType, 0, 4L);
 
     PAssert.that(result).containsInAnyOrder(record);
@@ -81,7 +82,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{
-    String sql = "select f_int2, count(*) as size, "
+    String sql = "select f_int2, count(*) as getFieldCount, "
         + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as
min1,"
         + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short)
as min2,"
         + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as
min3,"
@@ -171,7 +172,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+    String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
         + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`"
         + " FROM TABLE_A"
         + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)";
@@ -208,7 +209,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   private void runHopWindow(PCollection<BeamRecord> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+    String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
         + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`"
         + " FROM PCOLLECTION"
         + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)";
@@ -246,7 +247,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
   }
 
   private void runSessionWindow(PCollection<BeamRecord> input) throws Exception {
-    String sql = "SELECT f_int2, COUNT(*) AS `size`,"
+    String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
         + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`"
         + " FROM TABLE_A"
         + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
@@ -273,7 +274,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
         "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
HOUR>)'");
     pipeline.enableAbandonedNodeEnforcement(false);
 
-    String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A "
+    String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM TABLE_A "
         + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)";
     PCollection<BeamRecord> result =
         PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1)
@@ -288,7 +289,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase {
     exceptions.expectMessage("Encountered \"*\"");
     pipeline.enableAbandonedNodeEnforcement(false);
 
-    String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2";
+    String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` "
+        + "FROM PCOLLECTION GROUP BY f_int2";
 
     PCollection<BeamRecord> result =
         boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql));

http://git-wip-us.apache.org/repos/asf/beam/blob/3931bbb5/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index aa1fc29..373deb7 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -35,7 +35,7 @@ public class TestUtils {
   public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> {
     @ProcessElement
     public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().valueInString());
+      ctx.output(ctx.element().toString());
     }
   }
 
@@ -45,7 +45,7 @@ public class TestUtils {
   public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) {
     List<String> strs = new ArrayList<>();
     for (BeamRecord row : rows) {
-      strs.add(row.valueInString());
+      strs.add(row.toString());
     }
 
     return strs;
@@ -181,7 +181,7 @@ public class TestUtils {
    */
   public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) {
     List<BeamRecord> rows = new ArrayList<>();
-    int fieldCount = type.size();
+    int fieldCount = type.getFieldCount();
 
     for (int i = 0; i < args.size(); i += fieldCount) {
       rows.add(new BeamRecord(type, args.subList(i, i + fieldCount)));


Mime
View raw message