beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [beam] branch master updated: Merge pull request #11290: [BEAM-9670] Fix nullability widening in CoGroup key resolution
Date Thu, 02 Apr 2020 22:12:47 GMT
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5504f32  Merge pull request #11290: [BEAM-9670] Fix nullability widening in CoGroup
key resolution
5504f32 is described below

commit 5504f32a70bd65b5fbc76bb4a270fa1cfe43a4eb
Author: reuvenlax <relax@google.com>
AuthorDate: Thu Apr 2 15:12:31 2020 -0700

    Merge pull request #11290: [BEAM-9670] Fix nullability widening in CoGroup key resolution
---
 .../org/apache/beam/sdk/schemas/SchemaUtils.java   | 101 +++++++++++++++++++
 .../beam/sdk/schemas/transforms/CoGroup.java       |   5 +-
 .../apache/beam/sdk/schemas/SchemaUtilsTest.java   | 107 +++++++++++++++++++++
 .../beam/sdk/schemas/transforms/CoGroupTest.java   |   2 +-
 .../sql/impl/rel/BeamSetOperatorRelBase.java       |  11 ---
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        |  50 ++++++++++
 6 files changed, 261 insertions(+), 15 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java
new file mode 100644
index 0000000..e2e749a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+
+/** A set of utility functions for schemas. */
+public class SchemaUtils {
+  /**
+   * Given two schema that have matching types, return a nullable-widened schema.
+   *
+   * <p>The schemas must have matching types, except for field names which can differ.
The returned
+   * schema will contain the field names in the first schema. All field types will be nullable
if
+   * the corresponding field type is nullable in either of the input schemas.
+   */
+  public static Schema mergeWideningNullable(Schema schema1, Schema schema2) {
+    if (schema1.getFieldCount() != schema2.getFieldCount()) {
+      throw new IllegalArgumentException(
+          "Cannot merge schemas with different numbers of fields. "
+              + "schema1: "
+              + schema1
+              + " schema2: "
+              + schema2);
+    }
+    Schema.Builder builder = Schema.builder();
+    for (int i = 0; i < schema1.getFieldCount(); ++i) {
+      String name = schema1.getField(i).getName();
+      builder.addField(
+          name, widenNullableTypes(schema1.getField(i).getType(), schema2.getField(i).getType()));
+    }
+    return builder.build();
+  }
+
+  static FieldType widenNullableTypes(FieldType fieldType1, FieldType fieldType2) {
+    if (fieldType1.getTypeName() != fieldType2.getTypeName()) {
+      throw new IllegalArgumentException(
+          "Cannot merge two types: "
+              + fieldType1.getTypeName()
+              + " and "
+              + fieldType2.getTypeName());
+    }
+
+    FieldType result;
+    switch (fieldType1.getTypeName()) {
+      case ROW:
+        result =
+            FieldType.row(
+                mergeWideningNullable(fieldType1.getRowSchema(), fieldType2.getRowSchema()));
+        break;
+      case ARRAY:
+        FieldType arrayElementType =
+            widenNullableTypes(
+                fieldType1.getCollectionElementType(), fieldType2.getCollectionElementType());
+        result = FieldType.array(arrayElementType);
+        break;
+      case ITERABLE:
+        FieldType iterableElementType =
+            widenNullableTypes(
+                fieldType1.getCollectionElementType(), fieldType2.getCollectionElementType());
+        result = FieldType.iterable(iterableElementType);
+        break;
+      case MAP:
+        FieldType keyType =
+            widenNullableTypes(fieldType1.getMapKeyType(), fieldType2.getMapKeyType());
+        FieldType valueType =
+            widenNullableTypes(fieldType1.getMapValueType(), fieldType2.getMapValueType());
+        result = FieldType.map(keyType, valueType);
+        break;
+      case LOGICAL_TYPE:
+        if (!fieldType1
+            .getLogicalType()
+            .getIdentifier()
+            .equals(fieldType2.getLogicalType().getIdentifier())) {
+          throw new IllegalArgumentException(
+              "Logical types don't match and cannot be merged: "
+                  + fieldType1.getLogicalType().getIdentifier()
+                  + ".v.s"
+                  + fieldType2.getLogicalType().getIdentifier());
+        }
+        // fall through
+      default:
+        result = fieldType1;
+    }
+    return result.withNullable(fieldType1.getNullable() || fieldType2.getNullable());
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
index c4366e2..2f6da8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaUtils;
 import org.apache.beam.sdk.schemas.transforms.CoGroup.ConvertCoGbkResult.ConvertType;
 import org.apache.beam.sdk.schemas.utils.RowSelector;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
@@ -399,9 +400,7 @@ public class CoGroup {
         if (keySchema == null) {
           keySchema = currentKeySchema;
         } else {
-          if (!currentKeySchema.typesEqual(keySchema)) {
-            throw new IllegalStateException("All keys must have the same schema");
-          }
+          keySchema = SchemaUtils.mergeWideningNullable(keySchema, currentKeySchema);
         }
 
         // Create a new tag for the output.
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
new file mode 100644
index 0000000..45c06f6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.junit.Test;
+
+/** Tests for {@link org.apache.beam.sdk.schemas.SchemaUtils}. */
+public class SchemaUtilsTest {
+  @Test
+  public void testWidenPrimitives() {
+    Schema schema1 =
+        Schema.builder()
+            .addField("field1", FieldType.INT32)
+            .addNullableField("field2", FieldType.STRING)
+            .build();
+    Schema schema2 =
+        Schema.builder()
+            .addNullableField("field3", FieldType.INT32)
+            .addField("field4", FieldType.STRING)
+            .build();
+    Schema expected =
+        Schema.builder()
+            .addNullableField("field1", FieldType.INT32)
+            .addNullableField("field2", FieldType.STRING)
+            .build();
+    assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
+  }
+
+  @Test
+  public void testWidenNested() {
+    Schema schema1 =
+        Schema.builder()
+            .addField("field1", FieldType.INT32)
+            .addNullableField("field2", FieldType.STRING)
+            .build();
+    Schema schema2 =
+        Schema.builder()
+            .addNullableField("field3", FieldType.INT32)
+            .addField("field4", FieldType.STRING)
+            .build();
+    Schema top1 = Schema.builder().addField("top1", FieldType.row(schema1)).build();
+    Schema top2 = Schema.builder().addField("top2", FieldType.row(schema2)).build();
+    Schema expected =
+        Schema.builder()
+            .addNullableField("field1", FieldType.INT32)
+            .addNullableField("field2", FieldType.STRING)
+            .build();
+    Schema expectedTop = Schema.builder().addField("top1", FieldType.row(expected)).build();
+
+    assertEquals(expectedTop, SchemaUtils.mergeWideningNullable(top1, top2));
+  }
+
+  @Test
+  public void testWidenArray() {
+    Schema schema1 = Schema.builder().addArrayField("field1", FieldType.INT32).build();
+    Schema schema2 =
+        Schema.builder().addArrayField("field1", FieldType.INT32.withNullable(true)).build();
+    Schema expected =
+        Schema.builder().addArrayField("field1", FieldType.INT32.withNullable(true)).build();
+    assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
+  }
+
+  @Test
+  public void testWidenIterable() {
+    Schema schema1 = Schema.builder().addIterableField("field1", FieldType.INT32).build();
+    Schema schema2 =
+        Schema.builder().addIterableField("field1", FieldType.INT32.withNullable(true)).build();
+    Schema expected =
+        Schema.builder().addIterableField("field1", FieldType.INT32.withNullable(true)).build();
+    assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
+  }
+
+  @Test
+  public void testWidenMap() {
+    Schema schema1 =
+        Schema.builder().addMapField("field1", FieldType.INT32, FieldType.INT32).build();
+    Schema schema2 =
+        Schema.builder()
+            .addMapField(
+                "field1", FieldType.INT32.withNullable(true), FieldType.INT32.withNullable(true))
+            .build();
+    Schema expected =
+        Schema.builder()
+            .addMapField(
+                "field1", FieldType.INT32.withNullable(true), FieldType.INT32.withNullable(true))
+            .build();
+    assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
index 527a9c6..0008605 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
@@ -475,7 +475,7 @@ public class CoGroupTest {
                 Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 9, "us").build()))
             .setRowSchema(CG_SCHEMA_1);
 
-    thrown.expect(IllegalStateException.class);
+    thrown.expect(IllegalArgumentException.class);
     PCollection<Row> joined =
         PCollectionTuple.of("pc1", pc1, "pc2", pc2)
             .apply(
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
index a036148..ab76a7b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSetOperatorRelBase.java
@@ -21,7 +21,6 @@ import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Prec
 
 import java.io.Serializable;
 import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.transforms.CoGroup;
 import org.apache.beam.sdk.schemas.transforms.CoGroup.By;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -63,16 +62,6 @@ public class BeamSetOperatorRelBase extends PTransform<PCollectionList<Row>,
PCo
         inputs);
     PCollection<Row> leftRows = inputs.get(0);
     PCollection<Row> rightRows = inputs.get(1);
-    Schema leftSchema = leftRows.getSchema();
-    Schema rightSchema = rightRows.getSchema();
-    if (!leftSchema.typesEqual(rightSchema)) {
-      throw new IllegalArgumentException(
-          "Can't intersect two tables with different schemas."
-              + "lhsSchema: "
-              + leftSchema
-              + "  rhsSchema: "
-              + rightSchema);
-    }
 
     WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
     WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 8af8d8d..4695846 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -2271,6 +2271,56 @@ public class ZetaSQLDialectSpecTest {
   }
 
   @Test
+  public void testSelectNullIntersectDistinct() {
+    String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    System.err.println("SCHEMA " + stream.getSchema());
+
+    PAssert.that(stream).empty();
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSelectNullIntersectAll() {
+    String sql = "SELECT NULL INTERSECT ALL SELECT 2";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    System.err.println("SCHEMA " + stream.getSchema());
+
+    PAssert.that(stream).empty();
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSelectNullExceptDistinct() {
+    String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSelectNullExceptAll() {
+    String sql = "SELECT NULL EXCEPT ALL SELECT 2";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testTimestampLiteralWithNonUTCTimeZone() {
     String sql = "SELECT TIMESTAMP '2018-12-10 10:38:59-10:00'";
 


Mime
View raw message