beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: Updates BigQueryTableSource to consider data in streaming buffer when determining estimated size.
Date Thu, 02 Nov 2017 22:20:54 GMT
Repository: beam
Updated Branches:
  refs/heads/master 482d17889 -> 492547835


Updates BigQueryTableSource to consider data in streaming buffer when determining estimated
size.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2c9fba4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2c9fba4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2c9fba4

Branch: refs/heads/master
Commit: b2c9fba4dd5f7f5a0ac0045f9ff8f30d55088a34
Parents: 482d178
Author: chamikara@google.com <chamikara@google.com>
Authored: Sat Oct 21 19:20:07 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Thu Nov 2 15:20:27 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/bigquery/BigQueryTableSource.java    | 10 ++-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 81 +++++++++++++++++++-
 2 files changed, 88 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f717cb7..dbac00f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -102,8 +103,13 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T>
{
       TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class),
           BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
 
-      Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
-          .getTable(table).getNumBytes();
+      Table tableRef = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+              .getTable(table);
+      Long numBytes = tableRef.getNumBytes();
+      if (tableRef.getStreamingBuffer() != null) {
+        numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
+      }
+
       tableSizeBytes.compareAndSet(null, numBytes);
     }
     return tableSizeBytes.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/b2c9fba4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index aa818c6..5b4b7e6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -41,6 +41,7 @@ import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatistics2;
 import com.google.api.services.bigquery.model.JobStatistics4;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Streamingbuffer;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -64,6 +65,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.math.BigInteger;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -343,7 +345,7 @@ public class BigQueryIOTest implements Serializable {
             }));
     PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L),
         KV.of("c", 3L), KV.of("d", 4L), KV.of("e", 5L), KV.of("f", 6L)));
-     p.run();
+    p.run();
   }
 
   @Test
@@ -1697,6 +1699,83 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testEstimatedSizeWithoutStreamingBuffer() throws Exception {
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    FakeJobService fakeJobService = new FakeJobService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+            .withJobService(fakeJobService)
+            .withDatasetService(fakeDatasetService);
+
+    List<TableRow> data = ImmutableList.of(
+            new TableRow().set("name", "a").set("number", 1L),
+            new TableRow().set("name", "b").set("number", 2L),
+            new TableRow().set("name", "c").set("number", 3L),
+            new TableRow().set("name", "d").set("number", 4L),
+            new TableRow().set("name", "e").set("number", 5L),
+            new TableRow().set("name", "f").set("number", 6L));
+
+    TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    fakeDatasetService.createDataset("project", "data_set", "", "", null);
+    fakeDatasetService.createTable(new Table().setTableReference(table)
+            .setSchema(new TableSchema()
+                    .setFields(
+                            ImmutableList.of(
+                                    new TableFieldSchema().setName("name").setType("STRING"),
+                                    new TableFieldSchema().setName("number").setType("INTEGER")))));
+    fakeDatasetService.insertAll(table, data, null);
+
+    String stepUuid = "testStepUuid";
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+            stepUuid,
+            StaticValueProvider.of(table),
+            fakeBqServices,
+            TableRowJsonCoder.of(),
+            BigQueryIO.TableRowParser.INSTANCE);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    assertEquals(108, bqSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testEstimatedSizeWithStreamingBuffer() throws Exception {
+    FakeDatasetService fakeDatasetService = new FakeDatasetService();
+    FakeJobService fakeJobService = new FakeJobService();
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+            .withJobService(fakeJobService)
+            .withDatasetService(fakeDatasetService);
+
+    List<TableRow> data = ImmutableList.of(
+            new TableRow().set("name", "a").set("number", 1L),
+            new TableRow().set("name", "b").set("number", 2L),
+            new TableRow().set("name", "c").set("number", 3L),
+            new TableRow().set("name", "d").set("number", 4L),
+            new TableRow().set("name", "e").set("number", 5L),
+            new TableRow().set("name", "f").set("number", 6L));
+
+    TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    fakeDatasetService.createDataset("project", "data_set", "", "", null);
+    fakeDatasetService.createTable(new Table().setTableReference(table)
+            .setSchema(new TableSchema()
+                    .setFields(
+                            ImmutableList.of(
+                                    new TableFieldSchema().setName("name").setType("STRING"),
+                                    new TableFieldSchema().setName("number").setType("INTEGER"))))
+            .setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.valueOf(10))));
+    fakeDatasetService.insertAll(table, data, null);
+
+    String stepUuid = "testStepUuid";
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+            stepUuid,
+            StaticValueProvider.of(table),
+            fakeBqServices,
+            TableRowJsonCoder.of(),
+            BigQueryIO.TableRowParser.INSTANCE);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    assertEquals(118, bqSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
   public void testBigQueryQuerySourceInitSplit() throws Exception {
     TableReference dryRunTable = new TableReference();
 


Mime
View raw message