beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Reducing how much we call BigQuery Get Table API. The BQ API is queried once per bundle. With this change, it will be queried once per worker thread. This will help with throughput for BQ streaming inserts
Date Fri, 17 Jul 2020 02:49:22 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8250a7b  Reducing how much we call BigQuery Get Table API. The BQ API is queried
once per bundle. With this change, it will be queried once per worker thread. This will help
with throughput for BQ streaming inserts
     new e34fef8  Merge pull request #12125 from pabloem/BQ_API_quota
8250a7b is described below

commit 8250a7bf88fc71acc748cd6976b72e8bdd44ec75
Author: Pablo Estrada <pabloem@apache.org>
AuthorDate: Mon Jun 29 15:30:02 2020 -0700

    Reducing how much we call BigQuery Get Table API. The BQ API is queried once per bundle.
With this change, it will be queried once per worker thread. This will help with throughput
for BQ streaming inserts
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index ccaca4d..34dc946 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -963,6 +963,9 @@ bigquery_v2_messages.TableSchema` object.
         buffer_size=buffer_size)
 
 
+_KNOWN_TABLES = set()
+
+
 class BigQueryWriteFn(DoFn):
   """A ``DoFn`` that streams writes to BigQuery once the table is created."""
 
@@ -1023,7 +1026,6 @@ class BigQueryWriteFn(DoFn):
     self.write_disposition = write_disposition
     self._rows_buffer = []
     self._reset_rows_buffer()
-    self._observed_tables = set()
 
     self._total_buffered_rows = 0
     self.kms_key = kms_key
@@ -1075,8 +1077,6 @@ class BigQueryWriteFn(DoFn):
     self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
         client=self.test_client)
 
-    self._observed_tables = set()
-
     self._backoff_calculator = iter(
         retry.FuzzedExponentialIntervals(
             initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
@@ -1086,7 +1086,7 @@ class BigQueryWriteFn(DoFn):
         table_reference.projectId,
         table_reference.datasetId,
         table_reference.tableId)
-    if str_table_reference in self._observed_tables:
+    if str_table_reference in _KNOWN_TABLES:
       return
 
     if self.create_disposition == BigQueryDisposition.CREATE_NEVER:
@@ -1110,7 +1110,7 @@ class BigQueryWriteFn(DoFn):
         self.create_disposition,
         self.write_disposition,
         additional_create_parameters=self.additional_bq_parameters)
-    self._observed_tables.add(str_table_reference)
+    _KNOWN_TABLES.add(str_table_reference)
 
   def process(self, element, *schema_side_inputs):
     destination = element[0]


Mime
View raw message