beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [beam] branch master updated: Merge pull request #8188: [BEAM-6953] Make bq constants args
Date Mon, 01 Apr 2019 22:06:32 GMT
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dcf6ad3  Merge pull request #8188: [BEAM-6953] Make bq constants args
dcf6ad3 is described below

commit dcf6ad301069e4d2cfaec5db6b178acb7bb67f49
Author: reuvenlax <relax@google.com>
AuthorDate: Mon Apr 1 15:06:16 2019 -0700

    Merge pull request #8188: [BEAM-6953] Make bq constants args
---
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java      | 18 ++++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 18 +++++++++---------
 .../beam/sdk/io/gcp/bigquery/StreamingWriteTables.java |  5 ++++-
 3 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 48af213..4506f64 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -53,4 +53,22 @@ public interface BigQueryOptions
   Integer getInsertBundleParallelism();
 
   void setInsertBundleParallelism(Integer parallelism);
+
+  @Description("The number of keys used per table when doing streaming inserts to BigQuery.")
+  @Default.Integer(50)
+  Integer getNumStreamingKeys();
+
+  void setNumStreamingKeys(Integer value);
+
+  @Description("The maximum number of rows to batch in a single streaming insert to BigQuery.")
+  @Default.Long(500)
+  Long getMaxStreamingRowsToBatch();
+
+  void setMaxStreamingRowsToBatch(Long value);
+
+  @Description("The maximum byte size of a single streaming insert to BigQuery.")
+  @Default.Long(64L * 1024L)
+  Long getMaxStreamingBatchSize();
+
+  void setMaxStreamingBatchSize(Long value);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3d3164e..b7f51b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -376,12 +376,6 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   @VisibleForTesting
   static class DatasetServiceImpl implements DatasetService {
-    // Approximate amount of table data to upload per InsertAll request.
-    private static final long UPLOAD_BATCH_SIZE_BYTES = 64L * 1024L;
-
-    // The maximum number of rows to upload per InsertAll request.
-    private static final long MAX_ROWS_PER_BATCH = 500;
-
     private static final FluentBackoff INSERT_BACKOFF_FACTORY =
         FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
 
@@ -395,24 +389,29 @@ class BigQueryServicesImpl implements BigQueryServices {
     private final Bigquery client;
     private final PipelineOptions options;
     private final long maxRowsPerBatch;
+    private final long maxRowBatchSize;
 
     private ExecutorService executor;
 
     @VisibleForTesting
     DatasetServiceImpl(Bigquery client, PipelineOptions options) {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       this.errorExtractor = new ApiErrorExtractor();
       this.client = client;
       this.options = options;
-      this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+      this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
+      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
       this.executor = null;
     }
 
     @VisibleForTesting
     DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       this.errorExtractor = new ApiErrorExtractor();
       this.client = client;
       this.options = options;
       this.maxRowsPerBatch = maxRowsPerBatch;
+      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
       this.executor = null;
     }
 
@@ -420,7 +419,8 @@ class BigQueryServicesImpl implements BigQueryServices {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = newBigQueryClient(bqOptions).build();
       this.options = bqOptions;
-      this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+      this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch();
+      this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize();
       this.executor = null;
     }
 
@@ -752,7 +752,7 @@ class BigQueryServicesImpl implements BigQueryServices {
           rows.add(out);
 
           dataSize += row.toString().length();
-          if (dataSize >= UPLOAD_BATCH_SIZE_BYTES
+          if (dataSize >= maxRowBatchSize
               || rows.size() >= maxRowsPerBatch
               || i == rowsToPublish.size() - 1) {
             TableDataInsertAllRequest content = new TableDataInsertAllRequest();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 3ca7aba..ea2c020 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -190,6 +190,9 @@ public class StreamingWriteTables<ElementT>
       TupleTag<T> failedInsertsTag,
       AtomicCoder<T> coder,
       ErrorContainer<T> errorContainer) {
+    BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    int numShards = options.getNumStreamingKeys();
+
     // A naive implementation would be to simply stream data directly to BigQuery.
     // However, this could occasionally lead to duplicated data, e.g., when
     // a VM that runs this code is restarted and the code is re-run.
@@ -204,7 +207,7 @@ public class StreamingWriteTables<ElementT>
     // streaming insert quota.
     PCollection<KV<ShardedKey<String>, TableRowInfo<ElementT>>> tagged
=
         input
-            .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(50)))
+            .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards)))
             .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), elementCoder))
             .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>()))
             .setCoder(


Mime
View raw message