[BEAM-10647] Fixes get_query_location bug in BigQueryWrapper
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 1230d1d..826af40 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -63,7 +63,7 @@
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
-  from apitools.base.py.exceptions import HttpError
+  from apitools.base.py.exceptions import HttpError, HttpForbiddenError
 except ImportError:
   pass
 
@@ -290,9 +290,10 @@
     """
     Get the location of tables referenced in a query.
 
-    This method returns the location of the first referenced table in the query
-    and depends on the BigQuery service to provide error handling for
-    queries that reference tables in multiple locations.
+    This method returns the location of the first available referenced
+    table for user in the query and depends on the BigQuery service to
+    provide error handling for queries that reference tables in multiple
+    locations.
     """
     reference = bigquery.JobReference(
         jobId=uuid.uuid4().hex, projectId=project_id)
@@ -318,17 +319,25 @@
 
     referenced_tables = response.statistics.query.referencedTables
     if referenced_tables:  # Guards against both non-empty and non-None
-      table = referenced_tables[0]
-      location = self.get_table_location(
-          table.projectId, table.datasetId, table.tableId)
-      _LOGGER.info(
-          "Using location %r from table %r referenced by query %s",
-          location,
-          table,
-          query)
-      return location
+      for table in referenced_tables:
+        try:
+          location = self.get_table_location(
+              table.projectId, table.datasetId, table.tableId)
+        except HttpForbiddenError:
+          # Permission access for table (i.e. from authorized_view),
+          # try next one
+          continue
+        _LOGGER.info(
+            "Using location %r from table %r referenced by query %s",
+            location,
+            table,
+            query)
+        return location
 
-    _LOGGER.debug("Query %s does not reference any tables.", query)
+    _LOGGER.debug(
+        "Query %s does not reference any tables or "
+        "you don't have permission to inspect them.",
+        query)
     return None
 
   @retry.with_exponential_backoff(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 90716d0..985af6b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -39,7 +39,6 @@
 import apache_beam as beam
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.io.gcp.bigquery import TableRowJsonCoder
-from apache_beam.io.gcp.bigquery_test import HttpError
 from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
 from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
 from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes
@@ -50,6 +49,15 @@
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.pipeline_options import PipelineOptions
 
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError, HttpForbiddenError
+except ImportError:
+  HttpError = None
+  HttpForbiddenError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestTableSchemaParser(unittest.TestCase):
@@ -267,6 +275,37 @@
         'The maximum number of retries has been reached',
         str(context.exception))
 
+  def test_get_query_location(self):
+    client = mock.Mock()
+    query = """
+        SELECT
+            av.column1, table.column1
+        FROM `dataset.authorized_view` as av
+        JOIN `dataset.table` as table ON av.column2 = table.column2
+    """
+    job = mock.MagicMock(spec=bigquery.Job)
+    job.statistics.query.referencedTables = [
+        bigquery.TableReference(
+            projectId="first_project_id",
+            datasetId="first_dataset",
+            tableId="table_used_by_authorized_view"),
+        bigquery.TableReference(
+            projectId="second_project_id",
+            datasetId="second_dataset",
+            tableId="table"),
+    ]
+    client.jobs.Insert.return_value = job
+
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+    wrapper.get_table_location = mock.Mock(
+        side_effect=[
+            HttpForbiddenError(response={'status': '404'}, url='', content=''),
+            "US"
+        ])
+    location = wrapper.get_query_location(
+        project_id="second_project_id", query=query, use_legacy_sql=False)
+    self.assertEqual("US", location)
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryReader(unittest.TestCase):