beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: [BEAM-6768] BigQuery dynamic destinations for Python SDK Streaming Inserts (#7677)
Date Mon, 04 Mar 2019 22:26:22 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 932e802  [BEAM-6768] BigQuery dynamic destinations for Python SDK Streaming Inserts
(#7677)
932e802 is described below

commit 932e802279a2daa0ff7797a8fc81e952a4e4f252
Author: Pablo <pabloem@users.noreply.github.com>
AuthorDate: Mon Mar 4 14:26:09 2019 -0800

    [BEAM-6768] BigQuery dynamic destinations for Python SDK Streaming Inserts (#7677)
    
    [BEAM-6768] BigQuery dynamic destinations for Python SDK Streaming Inserts
---
 .../apache_beam/examples/snippets/snippets.py      |  19 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         | 225 ++++++++++----
 .../apache_beam/io/gcp/bigquery_file_loads.py      |  21 +-
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 336 +++++++++++----------
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  61 +++-
 .../apache_beam/io/gcp/bigquery_tools_test.py      |   2 +-
 sdks/python/apache_beam/io/gcp/tests/utils.py      |  17 ++
 sdks/python/apache_beam/transforms/display.py      |   9 +-
 sdks/python/apache_beam/utils/retry.py             |  11 +
 9 files changed, 441 insertions(+), 260 deletions(-)

diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 422b388..f5cc4c9 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1116,21 +1116,10 @@ def model_bigqueryio(p, write_project='', write_dataset='', write_table=''):
   # [END model_bigqueryio_schema]
 
   # [START model_bigqueryio_schema_object]
-  from apache_beam.io.gcp.internal.clients import bigquery
-
-  table_schema = bigquery.TableSchema()
-
-  source_field = bigquery.TableFieldSchema()
-  source_field.name = 'source'
-  source_field.type = 'STRING'
-  source_field.mode = 'NULLABLE'
-  table_schema.fields.append(source_field)
-
-  quote_field = bigquery.TableFieldSchema()
-  quote_field.name = 'quote'
-  quote_field.type = 'STRING'
-  quote_field.mode = 'REQUIRED'
-  table_schema.fields.append(quote_field)
+  table_schema = {'fields': [
+      {'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'}
+  ]}
   # [END model_bigqueryio_schema_object]
 
   if write_project and write_dataset and write_table:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index ff61065..a1b6f99 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -120,19 +120,24 @@ to BigQuery.
 from __future__ import absolute_import
 
 import collections
+import itertools
 import json
 import logging
+import time
 from builtins import object
 from builtins import zip
 
 from future.utils import itervalues
 from past.builtins import unicode
 
+import apache_beam as beam
 from apache_beam import coders
+from apache_beam import pvalue
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.options import value_provider as vp
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -140,6 +145,8 @@ from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.transforms.window import GlobalWindows
+from apache_beam.utils import retry
 from apache_beam.utils.annotations import deprecated
 
 __all__ = [
@@ -535,22 +542,25 @@ bigquery_v2_messages.TableSchema` object.
 
 
 class BigQueryWriteFn(DoFn):
-  """A ``DoFn`` that streams writes to BigQuery once the table is created.
-  """
-
-  def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
-               create_disposition, write_disposition, kms_key, test_client):
+  """A ``DoFn`` that streams writes to BigQuery once the table is created."""
+
+  DEFAULT_MAX_BUFFERED_ROWS = 2000
+  DEFAULT_MAX_BATCH_SIZE = 500
+
+  FAILED_ROWS = 'FailedRows'
+
+  def __init__(
+      self,
+      batch_size,
+      create_disposition=None,
+      write_disposition=None,
+      kms_key=None,
+      test_client=None,
+      max_buffered_rows=None,
+      retry_strategy=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
-      table_id: The ID of the table. The ID must contain only letters
-        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
-        None then the table argument must contain the entire table reference
-        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
-      dataset_id: The ID of the dataset containing this table or null if the
-        table reference is specified entirely by the table argument.
-      project_id: The ID of the project containing this table or null if the
-        table reference is specified entirely by the table argument.
       batch_size: Number of rows to be written to BQ per streaming API insert.
       schema: The schema to be used if the BigQuery table to write has to be
         created. This can be either specified as a 'bigquery.TableSchema' object
@@ -572,25 +582,35 @@ class BigQueryWriteFn(DoFn):
       kms_key: Experimental. Optional Cloud KMS key name for use when creating
         new tables.
       test_client: Override the default bigquery client used for testing.
+
+      max_buffered_rows: The maximum number of rows that are allowed to stay
+        buffered when running dynamic destinations. When destinations are
+        dynamic, it is important to keep caches small even when a single
+        batch has not been completely filled up.
+      retry_strategy: The strategy to use when retrying streaming inserts
+        into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
     """
-    self.table_id = table_id
-    self.dataset_id = dataset_id
-    self.project_id = project_id
-    self.schema = schema
     self.test_client = test_client
     self.create_disposition = create_disposition
     self.write_disposition = write_disposition
     self._rows_buffer = []
-    # The default batch size is 500
-    self._max_batch_size = batch_size or 500
+    self._reset_rows_buffer()
+
+    self._total_buffered_rows = 0
     self.kms_key = kms_key
+    self._max_batch_size = batch_size or BigQueryWriteFn.DEFAULT_MAX_BATCH_SIZE
+    self._max_buffered_rows = (max_buffered_rows
+                               or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS)
+    self._retry_strategy = (
+        retry_strategy or bigquery_tools.RetryStrategy.RETRY_ON_TRANSIENT_ERROR)
 
   def display_data(self):
-    return {'table_id': self.table_id,
-            'dataset_id': self.dataset_id,
-            'project_id': self.project_id,
-            'schema': str(self.schema),
-            'max_batch_size': self._max_batch_size}
+    return {'max_batch_size': self._max_batch_size,
+            'max_buffered_rows': self._max_buffered_rows,
+            'retry_strategy': self._retry_strategy}
+
+  def _reset_rows_buffer(self):
+    self._rows_buffer = collections.defaultdict(lambda: [])
 
   @staticmethod
   def get_table_schema(schema):
@@ -612,37 +632,113 @@ class BigQueryWriteFn(DoFn):
       raise TypeError('Unexpected schema argument: %s.' % schema)
 
   def start_bundle(self):
-    self._rows_buffer = []
-    self.table_schema = self.get_table_schema(self.schema)
+    self._reset_rows_buffer()
 
     self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
         client=self.test_client)
+
+    self._observed_tables = set()
+
+    self._backoff_calculator = iter(retry.FuzzedExponentialIntervals(
+        initial_delay_secs=0.2,
+        num_retries=10000,
+        max_delay_secs=1500))
+
+  def _create_table_if_needed(self, schema, table_reference):
+    str_table_reference = '%s:%s.%s' % (
+        table_reference.projectId,
+        table_reference.datasetId,
+        table_reference.tableId)
+    if str_table_reference in self._observed_tables:
+      return
+
+    logging.debug('Creating or getting table %s with schema %s.',
+                  table_reference, schema)
+
+    table_schema = self.get_table_schema(schema)
+    if table_reference.projectId is None:
+      table_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
     self.bigquery_wrapper.get_or_create_table(
-        self.project_id, self.dataset_id, self.table_id, self.table_schema,
+        table_reference.projectId,
+        table_reference.datasetId,
+        table_reference.tableId,
+        table_schema,
         self.create_disposition, self.write_disposition)
+    self._observed_tables.add(str_table_reference)
 
   def process(self, element, unused_create_fn_output=None):
-    self._rows_buffer.append(element)
-    if len(self._rows_buffer) >= self._max_batch_size:
-      self._flush_batch()
+    destination = element[0]
+    if isinstance(destination, tuple):
+      schema = destination[1]
+      destination = destination[0]
+      self._create_table_if_needed(
+          schema,
+          bigquery_tools.parse_table_reference(destination))
+
+    row = element[1]
+    self._rows_buffer[destination].append(row)
+    self._total_buffered_rows += 1
+    if len(self._rows_buffer[destination]) >= self._max_batch_size:
+      return self._flush_batch(destination)
+    elif self._total_buffered_rows >= self._max_buffered_rows:
+      return self._flush_all_batches()
 
   def finish_bundle(self):
-    if self._rows_buffer:
-      self._flush_batch()
-    self._rows_buffer = []
+    return self._flush_all_batches()
+
+  def _flush_all_batches(self):
+    logging.debug('Attempting to flush to all destinations. Total buffered: %s',
+                  self._total_buffered_rows)
+
+    return itertools.chain(*[self._flush_batch(destination)
+                             for destination in list(self._rows_buffer.keys())
+                             if self._rows_buffer[destination]])
+
+  def _flush_batch(self, destination):
 
-  def _flush_batch(self):
     # Flush the current batch of rows to BigQuery.
-    passed, errors = self.bigquery_wrapper.insert_rows(
-        project_id=self.project_id, dataset_id=self.dataset_id,
-        table_id=self.table_id, rows=self._rows_buffer)
-    if not passed:
-      raise RuntimeError('Could not successfully insert rows to BigQuery'
-                         ' table [%s:%s.%s]. Errors: %s'%
-                         (self.project_id, self.dataset_id,
-                          self.table_id, errors))
-    logging.debug("Successfully wrote %d rows.", len(self._rows_buffer))
-    self._rows_buffer = []
+    rows = self._rows_buffer[destination]
+    table_reference = bigquery_tools.parse_table_reference(destination)
+
+    if table_reference.projectId is None:
+      table_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+
+    logging.debug('Flushing data to %s. Total %s rows.',
+                  destination, len(rows))
+
+    while True:
+      # TODO: Figure out an insertId to make calls idempotent.
+      passed, errors = self.bigquery_wrapper.insert_rows(
+          project_id=table_reference.projectId,
+          dataset_id=table_reference.datasetId,
+          table_id=table_reference.tableId,
+          rows=rows,
+          skip_invalid_rows=True)
+
+      logging.debug("Passed: %s. Errors are %s", passed, errors)
+      failed_rows = [rows[entry.index] for entry in errors]
+      should_retry = any(
+          bigquery_tools.RetryStrategy.should_retry(
+              self._retry_strategy, entry.errors[0].reason)
+          for entry in errors)
+      rows = failed_rows
+
+      if not should_retry:
+        break
+      else:
+        retry_backoff = next(self._backoff_calculator)
+        logging.info('Sleeping %s seconds before retrying insertion.',
+                     retry_backoff)
+        time.sleep(retry_backoff)
+
+    self._total_buffered_rows -= len(self._rows_buffer[destination])
+    del self._rows_buffer[destination]
+
+    return [pvalue.TaggedOutput(BigQueryWriteFn.FAILED_ROWS,
+                                GlobalWindows.windowed_value(
+                                    (destination, row))) for row in failed_rows]
 
 
 class WriteToBigQuery(PTransform):
@@ -665,7 +761,8 @@ class WriteToBigQuery(PTransform):
                max_files_per_bundle=None,
                test_client=None,
                gs_location=None,
-               method=None):
+               method=None,
+               insert_retry_strategy=None):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -734,6 +831,8 @@ bigquery_v2_messages.TableSchema`
         data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data.
         DEFAULT will use STREAMING_INSERTS on Streaming pipelines and
         FILE_LOADS on Batch pipelines.
+      insert_retry_strategy: The strategy to use when retrying streaming inserts
+        into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
     """
     self.table_reference = bigquery_tools.parse_table_reference(
         table, dataset, project)
@@ -741,7 +840,7 @@ bigquery_v2_messages.TableSchema`
         create_disposition)
     self.write_disposition = BigQueryDisposition.validate_write(
         write_disposition)
-    self.schema = schema
+    self.schema = WriteToBigQuery.get_dict_table_schema(schema)
     self.batch_size = batch_size
     self.kms_key = kms_key
     self.test_client = test_client
@@ -749,6 +848,7 @@ bigquery_v2_messages.TableSchema`
     self.max_file_size = max_file_size
     self.max_files_per_bundle = max_files_per_bundle
     self.method = method or WriteToBigQuery.Method.DEFAULT
+    self.insert_retry_strategy = insert_retry_strategy
 
   @staticmethod
   def get_table_schema_from_string(schema):
@@ -839,23 +939,32 @@ bigquery_v2_messages.TableSchema):
       self.table_reference.projectId = pcoll.pipeline.options.view_as(
           GoogleCloudOptions).project
 
-    if standard_options.streaming or self.method == 'STREAMING_INSERTS':
+    if (standard_options.streaming or
+        self.method == WriteToBigQuery.Method.STREAMING_INSERTS):
       # TODO: Support load jobs for streaming pipelines.
       bigquery_write_fn = BigQueryWriteFn(
-          table_id=self.table_reference.tableId,
-          dataset_id=self.table_reference.datasetId,
-          project_id=self.table_reference.projectId,
           batch_size=self.batch_size,
-          schema=self.get_dict_table_schema(self.schema),
           create_disposition=self.create_disposition,
           write_disposition=self.write_disposition,
           kms_key=self.kms_key,
+          retry_strategy=self.insert_retry_strategy,
           test_client=self.test_client)
-      return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
+
+      # TODO: Use utility functions from BQTools
+      table_fn = self._get_table_fn()
+
+      outputs = (pcoll
+                 | 'AppendDestination' >> beam.Map(lambda x: (table_fn(x), x))
+                 | 'StreamInsertRows' >> ParDo(bigquery_write_fn).with_outputs(
+                     BigQueryWriteFn.FAILED_ROWS, main='main'))
+
+      return {BigQueryWriteFn.FAILED_ROWS: outputs[BigQueryWriteFn.FAILED_ROWS]}
     else:
+      if standard_options.streaming:
+        raise NotImplementedError(
+            'File Loads to BigQuery are only supported on Batch pipelines.')
+
       from apache_beam.io.gcp import bigquery_file_loads
-      assert not standard_options.streaming, (
-          'File Loads to BigQuery are only supported on Batch pipelines.')
       return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
           destination=self.table_reference,
           schema=self.get_dict_table_schema(self.schema),
@@ -866,6 +975,14 @@ bigquery_v2_messages.TableSchema):
           gs_location=self.gs_location,
           test_client=self.test_client)
 
+  def _get_table_fn(self):
+    if callable(self.table_reference):
+      return self.table_reference
+    elif not callable(self.table_reference) and self.schema is not None:
+      return lambda x: (self.table_reference, self.schema)
+    else:
+      return lambda x: self.table_reference
+
   def display_data(self):
     res = {}
     if self.table_reference is not None:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 72223a9..93c0e80 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -98,25 +98,6 @@ def _bq_uuid(seed=None):
     return str(hashlib.md5(seed.encode('utf8')).hexdigest())
 
 
-class _AppendDestinationsFn(beam.DoFn):
-  """Adds the destination to an element, making it a KV pair.
-
-  Outputs a PCollection of KV-pairs where the key is a TableReference for the
-  destination, and the value is the record itself.
-
-  Experimental; no backwards compatibility guarantees.
-  """
-
-  def __init__(self, destination):
-    if callable(destination):
-      self.destination = destination
-    else:
-      self.destination = lambda x: destination
-
-  def process(self, element):
-    yield (self.destination(element), element)
-
-
 class _ShardDestinations(beam.DoFn):
   """Adds a shard number to the key of the KV element.
 
@@ -524,7 +505,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
     outputs = (
         pcoll
         | "ApplyGlobalWindow" >> beam.WindowInto(beam.window.GlobalWindows())
-        | "AppendDestination" >> beam.ParDo(_AppendDestinationsFn(
+        | "AppendDestination" >> beam.ParDo(bigquery_tools.AppendDestinationsFn(
             self.destination))
         | beam.ParDo(
             WriteRecordsToFile(max_files_per_bundle=self.max_files_per_bundle,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 725a684..69717d2 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -21,17 +21,26 @@ from __future__ import absolute_import
 import decimal
 import json
 import logging
+import random
 import re
+import time
 import unittest
 
 import hamcrest as hc
 import mock
+from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery import TableRowJsonCoder
+from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
 from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
@@ -274,56 +283,94 @@ class TestBigQuerySink(unittest.TestCase):
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class WriteToBigQuery(unittest.TestCase):
 
-  def test_dofn_client_start_bundle_called(self):
-    client = mock.Mock()
-    client.tables.Get.return_value = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
-    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
-    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+  def test_noop_schema_parsing(self):
+    expected_table_schema = None
+    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+        schema=None)
+    self.assertEqual(expected_table_schema, table_schema)
+
+  def test_dict_schema_parsing(self):
     schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema)
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+    expected_table_schema = bigquery.TableSchema(
+        fields=[string_field, number_field, record_field])
+    self.assertEqual(expected_table_schema, table_schema)
 
-    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
-        batch_size=2,
-        schema=schema,
-        create_disposition=create_disposition,
-        write_disposition=write_disposition,
-        kms_key=None,
-        test_client=client)
+  def test_string_schema_parsing(self):
+    schema = 's:STRING, n:INTEGER'
+    expected_dict_schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
 
-    fn.start_bundle()
-    self.assertTrue(client.tables.Get.called)
+  def test_table_schema_parsing(self):
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+    schema = bigquery.TableSchema(
+        fields=[string_field, number_field, record_field])
+    expected_dict_schema = {'fields': [
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
 
-  def test_dofn_client_start_bundle_create_called(self):
-    client = mock.Mock()
-    client.tables.Get.side_effect = HttpError(
-        response={'status': 404}, content=None, url=None)
-    client.tables.Insert.return_value = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
-    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
-    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+  def test_table_schema_parsing_end_to_end(self):
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    nested_field = bigquery.TableFieldSchema(
+        name='x', type='INTEGER', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    record_field = bigquery.TableFieldSchema(
+        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+    schema = bigquery.TableSchema(
+        fields=[string_field, number_field, record_field])
+    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(table_schema, schema)
+
+  def test_none_schema_parsing(self):
+    schema = None
+    expected_dict_schema = None
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
+
+  def test_noop_dict_schema_parsing(self):
     schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+    expected_dict_schema = schema
+    dict_schema = (
+        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+    self.assertEqual(expected_dict_schema, dict_schema)
 
-    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
-        batch_size=2,
-        schema=schema,
-        create_disposition=create_disposition,
-        write_disposition=write_disposition,
-        kms_key='kms_key',
-        test_client=client)
 
-    fn.start_bundle()
-    self.assertTrue(client.tables.Get.called)
-    self.assertTrue(client.tables.Insert.called)
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class BigQueryStreamingInsertTransformTests(unittest.TestCase):
 
   def test_dofn_client_process_performs_batching(self):
     client = mock.Mock()
@@ -331,27 +378,19 @@ class WriteToBigQuery(unittest.TestCase):
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', tableId='table_id'))
     client.tabledata.InsertAll.return_value = \
-        bigquery.TableDataInsertAllResponse(insertErrors=[])
+      bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-    schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
         batch_size=2,
-        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         kms_key=None,
         test_client=client)
 
-    fn.start_bundle()
-    fn.process({'month': 1})
+    fn.process(('project_id:dataset_id.table_id', {'month': 1}))
 
-    self.assertTrue(client.tables.Get.called)
     # InsertRows not called as batch size is not hit yet
     self.assertFalse(client.tabledata.InsertAll.called)
 
@@ -364,24 +403,17 @@ class WriteToBigQuery(unittest.TestCase):
         bigquery.TableDataInsertAllResponse(insertErrors=[]))
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-    schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
         batch_size=2,
-        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         kms_key=None,
         test_client=client)
 
     fn.start_bundle()
-    fn.process({'month': 1})
-    fn.process({'month': 2})
-    self.assertTrue(client.tables.Get.called)
+    fn.process(('project_id:dataset_id.table_id', {'month': 1}))
+    fn.process(('project_id:dataset_id.table_id', {'month': 2}))
     # InsertRows called as batch size is hit
     self.assertTrue(client.tabledata.InsertAll.called)
 
@@ -391,25 +423,22 @@ class WriteToBigQuery(unittest.TestCase):
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', tableId='table_id'))
     client.tabledata.InsertAll.return_value = \
-        bigquery.TableDataInsertAllResponse(insertErrors=[])
+      bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-    schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
         batch_size=2,
-        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         kms_key=None,
         test_client=client)
 
     fn.start_bundle()
-    fn.process({'month': 1})
+
+    # Destination is a tuple of (destination, schema) to ensure the table is
+    # created.
+    fn.process((('project_id:dataset_id.table_id', None), {'month': 1}))
 
     self.assertTrue(client.tables.Get.called)
     # InsertRows not called as batch size is not hit
@@ -425,25 +454,18 @@ class WriteToBigQuery(unittest.TestCase):
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', tableId='table_id'))
     client.tabledata.InsertAll.return_value = \
-        bigquery.TableDataInsertAllResponse(insertErrors=[])
+      bigquery.TableDataInsertAllResponse(insertErrors=[])
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
-    schema = {'fields': [
-        {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
 
     fn = beam.io.gcp.bigquery.BigQueryWriteFn(
-        table_id='table_id',
-        dataset_id='dataset_id',
-        project_id='project_id',
         batch_size=2,
-        schema=schema,
         create_disposition=create_disposition,
         write_disposition=write_disposition,
         kms_key=None,
         test_client=client)
 
     fn.start_bundle()
-    self.assertTrue(client.tables.Get.called)
     # InsertRows not called as batch size is not hit
     self.assertFalse(client.tabledata.InsertAll.called)
 
@@ -451,90 +473,82 @@ class WriteToBigQuery(unittest.TestCase):
     # InsertRows not called in finish bundle as no records
     self.assertFalse(client.tabledata.InsertAll.called)
 
-  def test_noop_schema_parsing(self):
-    expected_table_schema = None
-    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
-        schema=None)
-    self.assertEqual(expected_table_schema, table_schema)
-
-  def test_dict_schema_parsing(self):
-    schema = {'fields': [
-        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
-        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
-        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
-            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
-    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema)
-    string_field = bigquery.TableFieldSchema(
-        name='s', type='STRING', mode='NULLABLE')
-    nested_field = bigquery.TableFieldSchema(
-        name='x', type='INTEGER', mode='NULLABLE')
-    number_field = bigquery.TableFieldSchema(
-        name='n', type='INTEGER', mode='NULLABLE')
-    record_field = bigquery.TableFieldSchema(
-        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
-    expected_table_schema = bigquery.TableSchema(
-        fields=[string_field, number_field, record_field])
-    self.assertEqual(expected_table_schema, table_schema)
-
-  def test_string_schema_parsing(self):
-    schema = 's:STRING, n:INTEGER'
-    expected_dict_schema = {'fields': [
-        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
-        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
-    dict_schema = (
-        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
-    self.assertEqual(expected_dict_schema, dict_schema)
-
-  def test_table_schema_parsing(self):
-    string_field = bigquery.TableFieldSchema(
-        name='s', type='STRING', mode='NULLABLE')
-    nested_field = bigquery.TableFieldSchema(
-        name='x', type='INTEGER', mode='NULLABLE')
-    number_field = bigquery.TableFieldSchema(
-        name='n', type='INTEGER', mode='NULLABLE')
-    record_field = bigquery.TableFieldSchema(
-        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
-    schema = bigquery.TableSchema(
-        fields=[string_field, number_field, record_field])
-    expected_dict_schema = {'fields': [
-        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
-        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
-        {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
-            {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
-    dict_schema = (
-        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
-    self.assertEqual(expected_dict_schema, dict_schema)
 
-  def test_table_schema_parsing_end_to_end(self):
-    string_field = bigquery.TableFieldSchema(
-        name='s', type='STRING', mode='NULLABLE')
-    nested_field = bigquery.TableFieldSchema(
-        name='x', type='INTEGER', mode='NULLABLE')
-    number_field = bigquery.TableFieldSchema(
-        name='n', type='INTEGER', mode='NULLABLE')
-    record_field = bigquery.TableFieldSchema(
-        name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
-    schema = bigquery.TableSchema(
-        fields=[string_field, number_field, record_field])
-    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
-        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
-    self.assertEqual(table_schema, schema)
-
-  def test_none_schema_parsing(self):
-    schema = None
-    expected_dict_schema = None
-    dict_schema = (
-        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
-    self.assertEqual(expected_dict_schema, dict_schema)
-
-  def test_noop_dict_schema_parsing(self):
-    schema = {'fields': [
-        {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
-        {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
-    expected_dict_schema = schema
-    dict_schema = (
-        beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
-    self.assertEqual(expected_dict_schema, dict_schema)
+class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
+  BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+                                  str(int(time.time())),
+                                  random.randint(0, 10000))
+    self.bigquery_client = bigquery_tools.BigQueryWrapper()
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    self.output_table = "%s.output_table" % (self.dataset_id)
+    logging.info("Created dataset %s in project %s",
+                 self.dataset_id, self.project)
+
+  @attr('IT')
+  def test_multiple_destinations_transform(self):
+    output_table_1 = '%s%s' % (self.output_table, 1)
+    output_table_2 = '%s%s' % (self.output_table, 2)
+    schema1 = {'fields': [
+        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+    schema2 = {'fields': [
+        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+        {'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+    bad_record = {'language': 1, 'manguage': 2}
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_1,
+            data=[(d['name'], d['language'])
+                  for d in _ELEMENTS
+                  if 'language' in d]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % output_table_2,
+            data=[(d['name'], d['foundation'])
+                  for d in _ELEMENTS
+                  if 'foundation' in d])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      input = p | beam.Create(_ELEMENTS)
+
+      input2 = p | "Broken record" >> beam.Create([bad_record])
+
+      input = (input, input2) | beam.Flatten()
+
+      r = (input
+           | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
+               table=lambda x: ((output_table_1, schema1)
+                                if 'language' in x
+                                else (output_table_2, schema2)),
+               method='STREAMING_INSERTS'))
+
+      assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
+                  equal_to([(output_table_1, bad_record)]))
+
+  def tearDown(self):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id,
+        deleteContents=True)
+    try:
+      logging.info("Deleting dataset %s in project %s",
+                   self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      logging.debug('Failed to clean up dataset %s in project %s',
+                    self.dataset_id, self.project)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 2af9bf5..5f97cf5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -47,6 +47,7 @@ from apache_beam.internal.http_client import get_new_http
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import DoFn
 from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
@@ -129,7 +130,8 @@ def parse_table_reference(table, dataset=None, project=None):
       argument.
 
   Returns:
-    A TableReference for the table name that was provided.
+    A TableReference object from the bigquery API. The object has the following
+    attributes: projectId, datasetId, and tableId.
 
   Raises:
     ValueError: if the table reference as a string does not match the expected
@@ -357,15 +359,20 @@ class BigQueryWrapper(object):
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
-      retry_filter=retry.retry_on_server_errors_filter)
-  def _insert_all_rows(self, project_id, dataset_id, table_id, rows):
+      retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
+  def _insert_all_rows(self, project_id, dataset_id, table_id, rows,
+                       skip_invalid_rows=False):
+    """Calls the insertAll BigQuery API endpoint.
+
+    Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
+      /rest/v2/tabledata/insertAll."""
     # The rows argument is a list of
     # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
     # required by the InsertAll() method.
     request = bigquery.BigqueryTabledataInsertAllRequest(
         projectId=project_id, datasetId=dataset_id, tableId=table_id,
         tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-            # TODO(silviuc): Should have an option for skipInvalidRows?
+            skipInvalidRows=skip_invalid_rows,
             # TODO(silviuc): Should have an option for ignoreUnknownValues?
             rows=rows))
     response = self.client.tabledata.InsertAll(request)
@@ -664,7 +671,8 @@ class BigQueryWrapper(object):
         break
       page_token = response.pageToken
 
-  def insert_rows(self, project_id, dataset_id, table_id, rows):
+  def insert_rows(self, project_id, dataset_id, table_id, rows,
+                  skip_invalid_rows=False):
     """Inserts rows into the specified table.
 
     Args:
@@ -673,6 +681,8 @@ class BigQueryWrapper(object):
       table_id: The table id.
       rows: A list of plain Python dictionaries. Each dictionary is a row and
         each key in it is the name of a field.
+      skip_invalid_rows: If there are rows with insertion errors, whether they
+        should be skipped, and all others should be inserted successfully.
 
     Returns:
       A tuple (bool, errors). If first element is False then the second element
@@ -702,7 +712,7 @@ class BigQueryWrapper(object):
               insertId=str(self.unique_row_id),
               json=json_object))
     result, errors = self._insert_all_rows(
-        project_id, dataset_id, table_id, final_rows)
+        project_id, dataset_id, table_id, final_rows, skip_invalid_rows)
     return result, errors
 
   def _convert_cell_value_to_dict(self, value, field):
@@ -953,3 +963,42 @@ class RowAsDictJsonCoder(coders.Coder):
 
   def decode(self, encoded_table_row):
     return json.loads(encoded_table_row.decode('utf-8'))
+
+
+class RetryStrategy(object):
+  RETRY_ALWAYS = 'RETRY_ALWAYS'
+  RETRY_NEVER = 'RETRY_NEVER'
+  RETRY_ON_TRANSIENT_ERROR = 'RETRY_ON_TRANSIENT_ERROR'
+
+  _NON_TRANSIENT_ERRORS = {'invalid', 'invalidQuery', 'notImplemented'}
+
+  @staticmethod
+  def should_retry(strategy, error_message):
+    if strategy == RetryStrategy.RETRY_ALWAYS:
+      return True
+    elif strategy == RetryStrategy.RETRY_NEVER:
+      return False
+    elif (strategy == RetryStrategy.RETRY_ON_TRANSIENT_ERROR and
+          error_message not in RetryStrategy._NON_TRANSIENT_ERRORS):
+      return True
+    else:
+      return False
+
+
+class AppendDestinationsFn(DoFn):
+  """Adds the destination to an element, making it a KV pair.
+
+  Outputs a PCollection of KV-pairs where the key is a TableReference for the
+  destination, and the value is the record itself.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+
+  def __init__(self, destination):
+    if callable(destination):
+      self.destination = destination
+    else:
+      self.destination = lambda x: destination
+
+  def process(self, element):
+    yield (self.destination(element), element)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index eeceadb..2dd9af8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -565,7 +565,7 @@ class TestBigQueryWriter(unittest.TestCase):
         bigquery.BigqueryTabledataInsertAllRequest(
             projectId='project', datasetId='dataset', tableId='table',
             tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-                rows=expected_rows)))
+                rows=expected_rows, skipInvalidRows=False,)))
 
   def test_table_schema_without_project(self):
     # Writer should pick executing project by default.
diff --git a/sdks/python/apache_beam/io/gcp/tests/utils.py b/sdks/python/apache_beam/io/gcp/tests/utils.py
index 60987f1..e72d917 100644
--- a/sdks/python/apache_beam/io/gcp/tests/utils.py
+++ b/sdks/python/apache_beam/io/gcp/tests/utils.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
 import logging
 import time
 
+from apache_beam.io import filesystems
 from apache_beam.utils import retry
 
 # Protect against environments where bigquery library is not available.
@@ -77,6 +78,9 @@ def delete_bq_dataset(project, dataset_ref):
   client.delete_dataset(dataset_ref, delete_contents=True)
 
 
+@retry.with_exponential_backoff(
+    num_retries=3,
+    retry_filter=retry.retry_on_server_errors_filter)
 def delete_bq_table(project, dataset_id, table_id):
   """Delete a BiqQuery table.
 
@@ -93,3 +97,16 @@ def delete_bq_table(project, dataset_id, table_id):
     client.delete_table(table_ref)
   except NotFound:
     raise GcpTestIOError('BigQuery table does not exist: %s' % table_ref)
+
+
+@retry.with_exponential_backoff(
+    num_retries=3,
+    retry_filter=retry.retry_on_server_errors_filter)
+def delete_directory(directory):
+  """Delete a directory in a filesystem.
+
+  Args:
+    directory: Full path to a directory supported by Beam filesystems (e.g.
+      "gs://mybucket/mydir/", "s3://...", ...)
+  """
+  filesystems.FileSystems.delete([directory])
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 614c9a9..bcbf68e 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -232,11 +232,14 @@ class DisplayDataItem(object):
         value or type.
     """
     if self.key is None:
-      raise ValueError('Invalid DisplayDataItem. Key must not be None')
+      raise ValueError(
+          'Invalid DisplayDataItem %s. Key must not be None.' % self)
     if self.namespace is None:
-      raise ValueError('Invalid DisplayDataItem. Namespace must not be None')
+      raise ValueError(
+          'Invalid DisplayDataItem %s. Namespace must not be None' % self)
     if self.value is None:
-      raise ValueError('Invalid DisplayDataItem. Value must not be None')
+      raise ValueError(
+          'Invalid DisplayDataItem %s. Value must not be None' % self)
     if self.type is None:
       raise ValueError(
           'Invalid DisplayDataItem. Value {} is of an unsupported type.'
diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py
index e30bd10..d27939b 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -110,6 +110,17 @@ def retry_on_server_errors_and_timeout_filter(exception):
   return retry_on_server_errors_filter(exception)
 
 
+def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
+  """Retry on server, timeout and 403 errors.
+
+  403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded,
+  rateLimitExceeded."""
+  if HttpError is not None and isinstance(exception, HttpError):
+    if exception.status_code == 403:
+      return True
+  return retry_on_server_errors_and_timeout_filter(exception)
+
+
 def retry_on_beam_io_error_filter(exception):
   """Filter allowing retries on Beam IO errors."""
   return isinstance(exception, BeamIOError)


Mime
View raw message