[BEAM-10647] Fixes get_query_location bug in BigQueryWrapper
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 1230d1d..826af40 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -63,7 +63,7 @@
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
- from apitools.base.py.exceptions import HttpError
+ from apitools.base.py.exceptions import HttpError, HttpForbiddenError
except ImportError:
pass
@@ -290,9 +290,10 @@
"""
Get the location of tables referenced in a query.
- This method returns the location of the first referenced table in the query
- and depends on the BigQuery service to provide error handling for
- queries that reference tables in multiple locations.
+ This method returns the location of the first available referenced
+ table for user in the query and depends on the BigQuery service to
+ provide error handling for queries that reference tables in multiple
+ locations.
"""
reference = bigquery.JobReference(
jobId=uuid.uuid4().hex, projectId=project_id)
@@ -318,17 +319,25 @@
referenced_tables = response.statistics.query.referencedTables
if referenced_tables: # Guards against both non-empty and non-None
- table = referenced_tables[0]
- location = self.get_table_location(
- table.projectId, table.datasetId, table.tableId)
- _LOGGER.info(
- "Using location %r from table %r referenced by query %s",
- location,
- table,
- query)
- return location
+ for table in referenced_tables:
+ try:
+ location = self.get_table_location(
+ table.projectId, table.datasetId, table.tableId)
+ except HttpForbiddenError:
+ # Permission access for table (i.e. from authorized_view),
+ # try next one
+ continue
+ _LOGGER.info(
+ "Using location %r from table %r referenced by query %s",
+ location,
+ table,
+ query)
+ return location
- _LOGGER.debug("Query %s does not reference any tables.", query)
+ _LOGGER.debug(
+ "Query %s does not reference any tables or "
+ "you don't have permission to inspect them.",
+ query)
return None
@retry.with_exponential_backoff(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 90716d0..985af6b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -39,7 +39,6 @@
import apache_beam as beam
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
-from apache_beam.io.gcp.bigquery_test import HttpError
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes
@@ -50,6 +49,15 @@
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apitools.base.py.exceptions import HttpError, HttpForbiddenError
+except ImportError:
+ HttpError = None
+ HttpForbiddenError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestTableSchemaParser(unittest.TestCase):
@@ -267,6 +275,37 @@
'The maximum number of retries has been reached',
str(context.exception))
+ def test_get_query_location(self):
+ client = mock.Mock()
+ query = """
+ SELECT
+ av.column1, table.column1
+ FROM `dataset.authorized_view` as av
+ JOIN `dataset.table` as table ON av.column2 = table.column2
+ """
+ job = mock.MagicMock(spec=bigquery.Job)
+ job.statistics.query.referencedTables = [
+ bigquery.TableReference(
+ projectId="first_project_id",
+ datasetId="first_dataset",
+ tableId="table_used_by_authorized_view"),
+ bigquery.TableReference(
+ projectId="second_project_id",
+ datasetId="second_dataset",
+ tableId="table"),
+ ]
+ client.jobs.Insert.return_value = job
+
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+ wrapper.get_table_location = mock.Mock(
+ side_effect=[
+ HttpForbiddenError(response={'status': '404'}, url='', content=''),
+ "US"
+ ])
+ location = wrapper.get_query_location(
+ project_id="second_project_id", query=query, use_legacy_sql=False)
+ self.assertEqual("US", location)
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryReader(unittest.TestCase):