beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] beam git commit: Fix BEAM-1301. Support table description in BigQuery IO.
Date Wed, 01 Feb 2017 18:22:03 GMT
Repository: beam
Updated Branches:
  refs/heads/master 92021c5dc -> 2689ca43c


Fix BEAM-1301. Support table description in BigQuery IO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e8810b8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e8810b8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e8810b8

Branch: refs/heads/master
Commit: 3e8810b888b40b4b3a9548db8ffc457b16d41eab
Parents: 92021c5
Author: Rafal Wojdyla <rav@spotify.com>
Authored: Mon Jan 23 18:14:15 2017 -0500
Committer: Rafal Wojdyla <rav@spotify.com>
Committed: Wed Feb 1 12:34:42 2017 -0500

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 128 ++++++++++++++-----
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   4 +
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  25 ++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   2 +-
 4 files changed, 127 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index b15807e..5dbec54 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1632,6 +1632,11 @@ public class BigQueryIO {
       return new Bound().withWriteDisposition(disposition);
     }
 
+    /** Creates a write transformation with the specified table description. */
+    public static Bound withTableDescription(@Nullable String tableDescription) {
+      return new Bound().withTableDescription(tableDescription);
+    }
+
     /**
      * Creates a write transformation with BigQuery table validation disabled.
      */
@@ -1672,6 +1677,8 @@ public class BigQueryIO {
       // WRITE_APPEND and WRITE_EMPTY.
       final WriteDisposition writeDisposition;
 
+      @Nullable final String tableDescription;
+
       // An option to indicate if table validation is desired. Default is true.
       final boolean validate;
 
@@ -1708,6 +1715,7 @@ public class BigQueryIO {
             null /* jsonSchema */,
             CreateDisposition.CREATE_IF_NEEDED,
             WriteDisposition.WRITE_EMPTY,
+            null /* tableDescription */,
             true /* validate */,
             null /* bigQueryServices */);
       }
@@ -1715,7 +1723,10 @@ public class BigQueryIO {
       private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
           @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
           @Nullable ValueProvider<String> jsonSchema,
-          CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean
validate,
+          CreateDisposition createDisposition,
+          WriteDisposition writeDisposition,
+          @Nullable String tableDescription,
+          boolean validate,
           @Nullable BigQueryServices bigQueryServices) {
         super(name);
         this.jsonTableRef = jsonTableRef;
@@ -1723,6 +1734,7 @@ public class BigQueryIO {
         this.jsonSchema = jsonSchema;
         this.createDisposition = checkNotNull(createDisposition, "createDisposition");
         this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+        this.tableDescription = tableDescription;
         this.validate = validate;
         this.bigQueryServices = bigQueryServices;
       }
@@ -1766,7 +1778,7 @@ public class BigQueryIO {
         return new Bound(name,
             NestedValueProvider.of(table, new TableRefToJson()),
             tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
+            writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1795,7 +1807,7 @@ public class BigQueryIO {
       public Bound toTableReference(
           SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
+            writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1807,7 +1819,7 @@ public class BigQueryIO {
       public Bound withSchema(TableSchema schema) {
         return new Bound(name, jsonTableRef, tableRefFunction,
             StaticValueProvider.of(toJsonString(schema)),
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1816,7 +1828,7 @@ public class BigQueryIO {
       public Bound withSchema(ValueProvider<TableSchema> schema) {
         return new Bound(name, jsonTableRef, tableRefFunction,
             NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1826,7 +1838,7 @@ public class BigQueryIO {
        */
       public Bound withCreateDisposition(CreateDisposition createDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1836,7 +1848,17 @@ public class BigQueryIO {
        */
       public Bound withWriteDisposition(WriteDisposition writeDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      }
+
+      /**
+       * Returns a copy of this write transformation, but using the specified table description.
+       *
+       * <p>Does not modify this object.
+       */
+      public Bound withTableDescription(@Nullable String tableDescription) {
+        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
+            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
       }
 
       /**
@@ -1846,13 +1868,13 @@ public class BigQueryIO {
        */
       public Bound withoutValidation() {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, false, bigQueryServices);
+            writeDisposition, tableDescription, false, bigQueryServices);
       }
 
       @VisibleForTesting
       Bound withTestServices(BigQueryServices testServices) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testServices);
+            writeDisposition, tableDescription, validate, testServices);
       }
 
       private static void verifyTableNotExistOrEmpty(
@@ -1962,7 +1984,9 @@ public class BigQueryIO {
               new StreamWithDeDup(getTable(), tableRefFunction,
                   jsonSchema == null ? null : NestedValueProvider.of(
                       jsonSchema, new JsonSchemaToTableSchema()),
-                  createDisposition, bqServices));
+                  createDisposition,
+                  tableDescription,
+                  bqServices));
         }
 
         ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -2024,7 +2048,8 @@ public class BigQueryIO {
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 WriteDisposition.WRITE_EMPTY,
-                CreateDisposition.CREATE_IF_NEEDED)));
+                CreateDisposition.CREATE_IF_NEEDED,
+                tableDescription)));
 
         PCollectionView<Iterable<String>> tempTablesView = tempTables
             .apply("TempTablesView", View.<String>asIterable());
@@ -2035,7 +2060,8 @@ public class BigQueryIO {
                 NestedValueProvider.of(table, new TableRefToJson()),
                 writeDisposition,
                 createDisposition,
-                tempTablesView))
+                tempTablesView,
+                tableDescription))
             .withSideInputs(tempTablesView));
 
         // Write single partition to final table
@@ -2049,7 +2075,8 @@ public class BigQueryIO {
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 writeDisposition,
-                createDisposition)));
+                createDisposition,
+                tableDescription)));
 
         return PDone.in(input.getPipeline());
       }
@@ -2128,7 +2155,9 @@ public class BigQueryIO {
             .add(DisplayData.item("writeDisposition", writeDisposition.toString())
               .withLabel("Table WriteDisposition"))
             .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"), true);
+              .withLabel("Validation Enabled"), true)
+            .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+              .withLabel("Table Description"));
       }
 
       /** Returns the create disposition. */
@@ -2302,6 +2331,7 @@ public class BigQueryIO {
       private final ValueProvider<String> jsonSchema;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
+      @Nullable private final String tableDescription;
 
       public WriteTables(
           boolean singlePartition,
@@ -2311,7 +2341,8 @@ public class BigQueryIO {
           ValueProvider<String> jsonTableRef,
           ValueProvider<String> jsonSchema,
           WriteDisposition writeDisposition,
-          CreateDisposition createDisposition) {
+          CreateDisposition createDisposition,
+          @Nullable String tableDescription) {
         this.singlePartition = singlePartition;
         this.bqServices = bqServices;
         this.jobIdToken = jobIdToken;
@@ -2320,6 +2351,7 @@ public class BigQueryIO {
         this.jsonSchema = jsonSchema;
         this.writeDisposition = writeDisposition;
         this.createDisposition = createDisposition;
+        this.tableDescription = tableDescription;
       }
 
       @ProcessElement
@@ -2333,13 +2365,15 @@ public class BigQueryIO {
 
         load(
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
             jobIdPrefix,
             ref,
             fromJsonString(
                 jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
             partition,
             writeDisposition,
-            createDisposition);
+            createDisposition,
+            tableDescription);
         c.output(toJsonString(ref));
 
         removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
@@ -2347,12 +2381,14 @@ public class BigQueryIO {
 
       private void load(
           JobService jobService,
+          DatasetService datasetService,
           String jobIdPrefix,
           TableReference ref,
           @Nullable TableSchema schema,
           List<String> gcsUris,
           WriteDisposition writeDisposition,
-          CreateDisposition createDisposition) throws InterruptedException, IOException {
+          CreateDisposition createDisposition,
+          @Nullable String tableDescription) throws InterruptedException, IOException {
         JobConfigurationLoad loadConfig = new JobConfigurationLoad()
             .setDestinationTable(ref)
             .setSchema(schema)
@@ -2373,6 +2409,9 @@ public class BigQueryIO {
           Status jobStatus = parseStatus(loadJob);
           switch (jobStatus) {
             case SUCCEEDED:
+              if (tableDescription != null) {
+                datasetService.patchTableDescription(ref, tableDescription);
+              }
               return;
             case UNKNOWN:
               throw new RuntimeException(String.format(
@@ -2428,7 +2467,9 @@ public class BigQueryIO {
             .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
                 .withLabel("Table Reference"))
             .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
-                .withLabel("Table Schema"));
+                .withLabel("Table Schema"))
+            .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+                .withLabel("Table Description"));
       }
     }
 
@@ -2442,6 +2483,7 @@ public class BigQueryIO {
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
       private final PCollectionView<Iterable<String>> tempTablesView;
+      @Nullable private final String tableDescription;
 
       public WriteRename(
           BigQueryServices bqServices,
@@ -2449,13 +2491,15 @@ public class BigQueryIO {
           ValueProvider<String> jsonTableRef,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition,
-          PCollectionView<Iterable<String>> tempTablesView) {
+          PCollectionView<Iterable<String>> tempTablesView,
+          @Nullable String tableDescription) {
         this.bqServices = bqServices;
         this.jobIdToken = jobIdToken;
         this.jsonTableRef = jsonTableRef;
         this.writeDisposition = writeDisposition;
         this.createDisposition = createDisposition;
         this.tempTablesView = tempTablesView;
+        this.tableDescription = tableDescription;
       }
 
       @ProcessElement
@@ -2473,11 +2517,13 @@ public class BigQueryIO {
         }
         copy(
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
             jobIdToken.get(),
             fromJsonString(jsonTableRef.get(), TableReference.class),
             tempTables,
             writeDisposition,
-            createDisposition);
+            createDisposition,
+            tableDescription);
 
         DatasetService tableService =
             bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
@@ -2486,11 +2532,13 @@ public class BigQueryIO {
 
       private void copy(
           JobService jobService,
+          DatasetService datasetService,
           String jobIdPrefix,
           TableReference ref,
           List<TableReference> tempTables,
           WriteDisposition writeDisposition,
-          CreateDisposition createDisposition) throws InterruptedException, IOException {
+          CreateDisposition createDisposition,
+          @Nullable String tableDescription) throws InterruptedException, IOException {
         JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
             .setSourceTables(tempTables)
             .setDestinationTable(ref)
@@ -2509,6 +2557,9 @@ public class BigQueryIO {
           Status jobStatus = parseStatus(copyJob);
           switch (jobStatus) {
             case SUCCEEDED:
+              if (tableDescription != null) {
+                datasetService.patchTableDescription(ref, tableDescription);
+              }
               return;
             case UNKNOWN:
               throw new RuntimeException(String.format(
@@ -2628,6 +2679,8 @@ public class BigQueryIO {
     /** TableSchema in JSON. Use String to make the class Serializable. */
     @Nullable private final ValueProvider<String> jsonTableSchema;
 
+    @Nullable private final String tableDescription;
+
     private final BigQueryServices bqServices;
 
     /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
@@ -2649,12 +2702,13 @@ public class BigQueryIO {
 
     /** Constructor. */
     StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
-        Write.CreateDisposition createDisposition,
-        BigQueryServices bqServices) {
+                     Write.CreateDisposition createDisposition,
+                     @Nullable String tableDescription, BigQueryServices bqServices) {
       this.jsonTableSchema = schema == null ? null :
           NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
       this.createDisposition = createDisposition;
       this.bqServices = checkNotNull(bqServices, "bqServices");
+      this.tableDescription = tableDescription;
     }
 
     /**
@@ -2702,8 +2756,11 @@ public class BigQueryIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)
-        .withLabel("Table Schema"));
+      builder
+          .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
+            .withLabel("Table Schema"))
+          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+            .withLabel("Table Description"));
     }
 
     public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
@@ -2721,7 +2778,10 @@ public class BigQueryIO {
               TableSchema tableSchema = JSON_FACTORY.fromString(
                   jsonTableSchema.get(), TableSchema.class);
               datasetService.createTable(
-                  new Table().setTableReference(tableReference).setSchema(tableSchema));
+                  new Table()
+                      .setTableReference(tableReference)
+                      .setSchema(tableSchema)
+                      .setDescription(tableDescription));
             }
             createdTables.add(tableSpec);
           }
@@ -2967,18 +3027,20 @@ public class BigQueryIO {
     @Nullable private final transient ValueProvider<TableSchema> tableSchema;
     private final Write.CreateDisposition createDisposition;
     private final BigQueryServices bqServices;
+    @Nullable private final String tableDescription;
 
     /** Constructor. */
     StreamWithDeDup(ValueProvider<TableReference> tableReference,
-        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        @Nullable ValueProvider<TableSchema> tableSchema,
-        Write.CreateDisposition createDisposition,
-        BigQueryServices bqServices) {
+                    @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+                    @Nullable ValueProvider<TableSchema> tableSchema,
+                    Write.CreateDisposition createDisposition,
+                    @Nullable String tableDescription, BigQueryServices bqServices) {
       this.tableReference = tableReference;
       this.tableRefFunction = tableRefFunction;
       this.tableSchema = tableSchema;
       this.createDisposition = createDisposition;
       this.bqServices = checkNotNull(bqServices, "bqServices");
+      this.tableDescription = tableDescription;
     }
 
     @Override
@@ -3009,7 +3071,11 @@ public class BigQueryIO {
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(tableSchema, createDisposition, bqServices)));
+          .apply(ParDo.of(new StreamingWriteFn(
+              tableSchema,
+              createDisposition,
+              tableDescription,
+              bqServices)));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user

http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index a85d16d..ebff6c1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -166,6 +166,10 @@ interface BigQueryServices extends Serializable {
      */
     long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String>
insertIdList)
         throws IOException, InterruptedException;
+
+    /** Patch BigQuery {@link Table} description. */
+    Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
+        throws IOException, InterruptedException;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index b958c8d..15ca262 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -789,6 +789,31 @@ class BigQueryServicesImpl implements BigQueryServices {
       return insertAll(
           ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
     }
+
+
+    @Override
+    public Table patchTableDescription(TableReference tableReference,
+                                       @Nullable String tableDescription)
+        throws IOException, InterruptedException {
+      Table table = new Table();
+      table.setDescription(tableDescription);
+
+      BackOff backoff =
+          FluentBackoff.DEFAULT
+              .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
+      return executeWithRetries(
+          client.tables().patch(
+              tableReference.getProjectId(),
+              tableReference.getDatasetId(),
+              tableReference.getTableId(),
+              table),
+          String.format(
+              "Unable to patch table description: %s, aborting after %d retries.",
+              tableReference, MAX_RPC_RETRIES),
+          Sleeper.DEFAULT,
+          backoff,
+          ALWAYS_RETRY);
+    }
   }
 
   private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {

http://git-wip-us.apache.org/repos/asf/beam/blob/3e8810b8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index c0ce027..8b1b6dd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1556,7 +1556,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testStreamingWriteFnCreateNever() throws Exception {
     BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
-        null, CreateDisposition.CREATE_NEVER, new FakeBigQueryServices());
+        null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices());
     assertEquals(BigQueryIO.parseTableSpec("dataset.table"),
         fn.getOrCreateTable(null, "dataset.table"));
   }


Mime
View raw message