beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [1/3] beam git commit: Move stripping code into BigQueryHelpers and add better unit-test coverage.
Date Fri, 22 Sep 2017 03:23:19 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0a073af40 -> 66b864f2b


Move stripping code into BigQueryHelpers and add better unit-test coverage.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb6417f8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb6417f8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb6417f8

Branch: refs/heads/master
Commit: fb6417f81482d22ef1cff9505a6589360e506dc0
Parents: 0f50eb7
Author: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Authored: Tue Sep 19 20:24:18 2017 -0700
Committer: Reuven Lax <relax@relax-macbookpro2.roam.corp.google.com>
Committed: Thu Sep 21 20:16:22 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |  8 ++++
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  |  8 ++--
 .../sdk/io/gcp/bigquery/TableDestination.java   | 11 -----
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 47 ++++++++++++++++----
 .../sdk/io/gcp/bigquery/FakeDatasetService.java | 15 ++++++-
 5 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 7f9e27a..02a47c2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -112,6 +112,14 @@ public class BigQueryHelpers {
     return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
   }
 
+  /**
+   * Strip off any partition decorator information from a tablespec.
+   */
+  public static String stripPartitionDecorator(String tableSpec) {
+    int index = tableSpec.lastIndexOf('$');
+    return  (index  == -1) ? tableSpec : tableSpec.substring(0, index);
+  }
+
   static String jobToPrettyString(@Nullable Job job) throws IOException {
     return job == null ? "null" : job.toPrettyString();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index aff5ff1..fedd2fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -113,9 +113,7 @@ public class CreateTables<DestinationT>
   private void possibleCreateTable(
       BigQueryOptions options, TableDestination tableDestination, TableSchema tableSchema)
       throws InterruptedException, IOException {
-    String tableSpec = tableDestination.getStrippedTableSpec();
-    TableReference tableReference = tableDestination.getTableReference();
-    String tableDescription = tableDestination.getTableDescription();
+    String tableSpec = BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
     if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec))
{
       synchronized (createdTables) {
         // Another thread may have succeeded in creating the table in the meanwhile, so
@@ -123,6 +121,10 @@ public class CreateTables<DestinationT>
         // every thread from attempting a create and overwhelming our BigQuery quota.
         DatasetService datasetService = bqServices.getDatasetService(options);
         if (!createdTables.contains(tableSpec)) {
+          TableReference tableReference = tableDestination.getTableReference();
+          String tableDescription = tableDestination.getTableDescription();
+          tableReference.setTableId(
+              BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
           if (datasetService.getTable(tableReference) == null) {
             Table table = new Table()
                 .setTableReference(tableReference)

http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 4a4f66b..ecc34d3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -30,7 +30,6 @@ import javax.annotation.Nullable;
 public class TableDestination implements Serializable {
   private static final long serialVersionUID = 1L;
   private final String tableSpec;
-  @Nullable private String strippedTableSpec;
   @Nullable
   private final String tableDescription;
   @Nullable
@@ -60,24 +59,14 @@ public class TableDestination implements Serializable {
   public TableDestination(String tableSpec, @Nullable String tableDescription,
       @Nullable String jsonTimePartitioning) {
     this.tableSpec = tableSpec;
-    this.strippedTableSpec = null;
     this.tableDescription = tableDescription;
     this.jsonTimePartitioning = jsonTimePartitioning;
   }
 
-
   public String getTableSpec() {
     return tableSpec;
   }
 
-  public String getStrippedTableSpec() {
-    if (strippedTableSpec == null) {
-      int index = tableSpec.lastIndexOf('$');
-      strippedTableSpec = (index  == -1) ? tableSpec : tableSpec.substring(0, index);
-    }
-    return strippedTableSpec;
-  }
-
   public TableReference getTableReference() {
     return BigQueryHelpers.parseTableSpec(tableSpec);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 7927282..ad4cbaa 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -670,7 +670,7 @@ public class BigQueryIOTest implements Serializable {
         .setFields(
             ImmutableList.of(
                 new TableFieldSchema().setName("number").setType("INTEGER")));
-    p.apply(Create.of(row1, row1))
+    p.apply(Create.of(row1, row2))
         .apply(
             BigQueryIO.writeTableRows()
                 .to("project-id:dataset-id.table-id")
@@ -1820,7 +1820,7 @@ public class BigQueryIOTest implements Serializable {
 
     options.setTempLocation(baseDir.toString());
 
-    List<TableRow> read = convertBigDecimaslToLong(
+    List<TableRow> read = convertBigDecimalsToLong(
         SourceTestUtils.readFromSource(bqSource, options));
     assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
     SourceTestUtils.assertSplitAtFractionBehavior(
@@ -2329,7 +2329,7 @@ public class BigQueryIOTest implements Serializable {
             IntervalWindow.getCoder()));
   }
 
-  List<TableRow> convertBigDecimaslToLong(List<TableRow> toConvert) {
+  List<TableRow> convertBigDecimalsToLong(List<TableRow> toConvert) {
     // The numbers come back as BigDecimal objects after JSON serialization. Change them
back to
     // longs so that we can assert the output.
     List<TableRow> converted = Lists.newArrayList();
@@ -2345,13 +2345,42 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testTableDecoratorStripping() {
-    TableDestination tableDestination = tableDestination = new TableDestination(
-        "project:dataset.table$decorator", "");
-    assertEquals("project:dataset.table", tableDestination.getStrippedTableSpec());
+  public void testWriteToTableDecorator() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
-    tableDestination = new TableDestination("project:dataset.table", "");
-    assertEquals("project:dataset.table", tableDestination.getStrippedTableSpec());
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices =
+        new FakeBigQueryServices()
+            .withJobService(new FakeJobService())
+            .withDatasetService(datasetService);
+    datasetService.createDataset("project-id", "dataset-id", "", "");
 
+    Pipeline p = TestPipeline.create(bqOptions);
+    TableRow row1 = new TableRow().set("name", "a").set("number", "1");
+    TableRow row2 = new TableRow().set("name", "b").set("number", "2");
+
+    TableSchema schema = new TableSchema()
+        .setFields(
+            ImmutableList.of(
+                new TableFieldSchema().setName("number").setType("INTEGER")));
+    p.apply(Create.of(row1, row2))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to("project-id:dataset-id.table-id$decorator")
+                .withTestServices(fakeBqServices)
+                .withMethod(Method.STREAMING_INSERTS)
+                .withSchema(schema)
+                .withoutValidation());
+    p.run();
+  }
+
+  @Test
+  public void testTableDecoratorStripping() {
+    assertEquals("project:dataset.table",
+        BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator"));
+    assertEquals("project:dataset.table",
+        BigQueryHelpers.stripPartitionDecorator("project:dataset.table"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index bcd84f7..323f663 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context;
@@ -111,7 +112,17 @@ class FakeDatasetService implements DatasetService, Serializable {
 
   @Override
   public void createTable(Table table) throws IOException {
+    final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}");
+
     TableReference tableReference = table.getTableReference();
+    if (!tableRegexp.matcher(tableReference.getTableId()).matches()) {
+      throw new IOException(
+          String.format(
+              "invalid table ID %s. Table IDs must be alphanumeric "
+                  + "(plus underscores) and must be at most 1024 characters long. Also, table"
+                  + " decorators cannot be used.",
+              tableReference.getTableId()));
+    }
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset =
           BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId());
@@ -202,7 +213,9 @@ class FakeDatasetService implements DatasetService, Serializable {
 
       long dataSize = 0;
       TableContainer tableContainer = getTableContainer(
-          ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+          ref.getProjectId(),
+          ref.getDatasetId(),
+          BigQueryHelpers.stripPartitionDecorator(ref.getTableId()));
       for (int i = 0; i < rowList.size(); ++i) {
         TableRow row = rowList.get(i).getValue();
         List<TableDataInsertAllResponse.InsertErrors> allErrors = insertErrors.get(row);


Mime
View raw message