beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apill...@apache.org
Subject [beam] branch master updated: [BEAM-9658] Plumb through WITH OFFSET
Date Tue, 28 Apr 2020 17:24:47 GMT
This is an automated email from the ASF dual-hosted git repository.

apilloud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f66ea2  [BEAM-9658] Plumb through WITH OFFSET
     new 7c22589  Merge pull request #11540 from apilloud/9658
7f66ea2 is described below

commit 7f66ea25417d70dff9982a8b1c88795f30642e73
Author: Andrew Pilloud <apilloud@google.com>
AuthorDate: Thu Apr 23 16:09:30 2020 -0700

    [BEAM-9658] Plumb through WITH OFFSET
---
 .../translation/ArrayScanToJoinConverter.java      | 30 ++++++++++++++++------
 .../translation/ArrayScanToUncollectConverter.java |  9 +++++--
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 10 ++++++++
 3 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
index cd7a4fd..d222dac 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToJoinConverter.java
@@ -29,6 +29,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.JoinRe
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Uncollect;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
@@ -80,14 +81,27 @@ class ArrayScanToJoinConverter extends RelConverter<ResolvedArrayScan>
{
         LogicalProject.create(createOneRow(getCluster()), projects, ImmutableList.of(columnName));
 
     // Create an UnCollect
-    // TODO: how to handle ordinality.
-    Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(), projectNode, false);
-    // The InputRef should only be 0 because Uncollect has only one field.
-    RelNode rightInput =
-        LogicalProject.create(
-            uncollectNode,
-            ImmutableList.of(getCluster().getRexBuilder().makeInputRef(uncollectNode, 0)),
-            ImmutableList.of(columnName));
+    boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+    // These asserts guaranteed by the parser code, but not the data structure.
+    // If they aren't true we need the Project to reorder columns.
+    assert zetaNode.getElementColumn().getId() == 1;
+    assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId() == 2;
+    Uncollect uncollectNode = Uncollect.create(projectNode.getTraitSet(), projectNode, ordinality);
+
+    List<RexInputRef> rightProjects = new ArrayList<>();
+    List<String> rightNames = new ArrayList<>();
+    rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode, 0));
+    rightNames.add(columnName);
+    if (ordinality) {
+      rightProjects.add(getCluster().getRexBuilder().makeInputRef(uncollectNode, 1));
+      rightNames.add(
+          String.format(
+              zetaNode.getArrayOffsetColumn().getColumn().getTableName(),
+              zetaNode.getArrayOffsetColumn().getColumn().getName()));
+    }
+
+    RelNode rightInput = LogicalProject.create(uncollectNode, rightProjects, rightNames);
 
     // Join condition should be a RexNode converted from join_expr.
     RexNode condition =
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
index 21eee28..87d777ff 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ArrayScanToUncollectConverter.java
@@ -54,7 +54,12 @@ class ArrayScanToUncollectConverter extends RelConverter<ResolvedArrayScan>
{
             Collections.singletonList(arrayLiteralExpression),
             ImmutableList.of(fieldName));
 
-    // TODO: how to handle ordinarily.
-    return Uncollect.create(projectNode.getTraitSet(), projectNode, false);
+    boolean ordinality = (zetaNode.getArrayOffsetColumn() != null);
+
+    // These asserts guaranteed by the parser code, but not the data structure.
+    // If they aren't true we need to add a Project to reorder columns.
+    assert zetaNode.getElementColumn().getId() == 1;
+    assert !ordinality || zetaNode.getArrayOffsetColumn().getColumn().getId() == 2;
+    return Uncollect.create(projectNode.getTraitSet(), projectNode, ordinality);
   }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index 6d4eb3e..26e34ca 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -2981,6 +2981,16 @@ public class ZetaSQLDialectSpecTest {
   }
 
   @Test
+  public void testNamedUNNESTLiteralOffset() {
+    String sql = "SELECT x, p FROM UNNEST([3, 4]) AS x WITH OFFSET p";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    thrown.expect(UnsupportedOperationException.class);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
   @Ignore("Seeing exception in Beam, need further investigation on the cause of this failed
query.")
   public void testNamedUNNESTJoin() {
     String sql =


Mime
View raw message