| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit tests for BigQuery sources and sinks.""" |
| # pytype: skip-file |
| |
| import datetime |
| import decimal |
| import gc |
| import json |
| import logging |
| import os |
| import pickle |
| import re |
| import secrets |
| import time |
| import unittest |
| import uuid |
| |
| import hamcrest as hc |
| import mock |
| import pytest |
| import pytz |
| import requests |
| from parameterized import param |
| from parameterized import parameterized |
| |
| import apache_beam as beam |
| from apache_beam.internal import pickler |
| from apache_beam.internal.gcp.json_value import to_json_value |
| from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp |
| from apache_beam.io.filesystems import FileSystems |
| from apache_beam.io.gcp import bigquery as beam_bq |
| from apache_beam.io.gcp import bigquery_tools |
| from apache_beam.io.gcp.bigquery import MAX_INSERT_RETRIES |
| from apache_beam.io.gcp.bigquery import ReadFromBigQuery |
| from apache_beam.io.gcp.bigquery import TableRowJsonCoder |
| from apache_beam.io.gcp.bigquery import WriteToBigQuery |
| from apache_beam.io.gcp.bigquery import _StreamToBigQuery |
| from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit |
| from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder |
| from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri |
| from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR |
| from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper |
| from apache_beam.io.gcp.bigquery_tools import RetryStrategy |
| from apache_beam.io.gcp.internal.clients import bigquery |
| from apache_beam.io.gcp.pubsub import ReadFromPubSub |
| from apache_beam.io.gcp.tests import utils |
| from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher |
| from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher |
| from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher |
| from apache_beam.metrics.metric import Lineage |
| from apache_beam.options import value_provider |
| from apache_beam.options.pipeline_options import PipelineOptions |
| from apache_beam.options.pipeline_options import StandardOptions |
| from apache_beam.options.value_provider import RuntimeValueProvider |
| from apache_beam.options.value_provider import StaticValueProvider |
| from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner |
| from apache_beam.runners.runner import PipelineState |
| from apache_beam.testing import test_utils |
| from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher |
| from apache_beam.testing.test_pipeline import TestPipeline |
| from apache_beam.testing.test_stream import TestStream |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| from apache_beam.transforms.display import DisplayData |
| from apache_beam.transforms.display_test import DisplayDataItemMatcher |
| |
| # Protect against environments where bigquery library is not available. |
| # pylint: disable=wrong-import-order, wrong-import-position |
| |
| try: |
| from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client |
| from apitools.base.py.exceptions import HttpError |
| from apitools.base.py.exceptions import HttpForbiddenError |
| from google.cloud import bigquery as gcp_bigquery |
| from google.cloud import bigquery_storage_v1 as bq_storage |
| from google.api_core import exceptions |
| except ImportError: |
| gcp_bigquery = None |
| bq_storage = None |
| HttpError = None |
| HttpForbiddenError = None |
| exceptions = None |
| # pylint: enable=wrong-import-order, wrong-import-position |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| _ELEMENTS = [ |
| { |
| 'name': 'beam', 'language': 'py' |
| }, |
| { |
| 'name': 'beam', 'language': 'java' |
| }, |
| { |
| 'name': 'beam', 'language': 'go' |
| }, |
| { |
| 'name': 'flink', 'language': 'java' |
| }, |
| { |
| 'name': 'flink', 'language': 'scala' |
| }, |
| { |
| 'name': 'spark', 'language': 'scala' |
| }, |
| { |
| 'name': 'spark', 'language': 'py' |
| }, |
| { |
| 'name': 'spark', 'language': 'scala' |
| }, |
| { |
| 'name': 'beam', 'foundation': 'apache' |
| }, |
| { |
| 'name': 'flink', 'foundation': 'apache' |
| }, |
| { |
| 'name': 'spark', 'foundation': 'apache' |
| }, |
| ] |
| |
| |
| def _load_or_default(filename): |
| try: |
| with open(filename) as f: |
| return json.load(f) |
| except: # pylint: disable=bare-except |
| return {} |
| |
| |
| @unittest.skipIf( |
| HttpError is None or gcp_bigquery is None, |
| 'GCP dependencies are not installed') |
| class TestTableRowJsonCoder(unittest.TestCase): |
| def test_row_as_table_row(self): |
| schema_definition = [('s', 'STRING'), ('i', 'INTEGER'), ('f', 'FLOAT'), |
| ('b', 'BOOLEAN'), ('n', 'NUMERIC'), ('r', 'RECORD'), |
| ('g', 'GEOGRAPHY')] |
| data_definition = [ |
| 'abc', |
| 123, |
| 123.456, |
| True, |
| decimal.Decimal('987654321.987654321'), { |
| 'a': 'b' |
| }, |
| 'LINESTRING(1 2, 3 4, 5 6, 7 8)' |
| ] |
| str_def = ( |
| '{"s": "abc", ' |
| '"i": 123, ' |
| '"f": 123.456, ' |
| '"b": true, ' |
| '"n": "987654321.987654321", ' |
| '"r": {"a": "b"}, ' |
| '"g": "LINESTRING(1 2, 3 4, 5 6, 7 8)"}') |
| schema = bigquery.TableSchema( |
| fields=[ |
| bigquery.TableFieldSchema(name=k, type=v) |
| for k, v in schema_definition |
| ]) |
| coder = TableRowJsonCoder(table_schema=schema) |
| |
| def value_or_decimal_to_json(val): |
| if isinstance(val, decimal.Decimal): |
| return to_json_value(str(val)) |
| else: |
| return to_json_value(val) |
| |
| test_row = bigquery.TableRow( |
| f=[ |
| bigquery.TableCell(v=value_or_decimal_to_json(e)) |
| for e in data_definition |
| ]) |
| |
| self.assertEqual(str_def, coder.encode(test_row)) |
| self.assertEqual(test_row, coder.decode(coder.encode(test_row))) |
| # A coder without schema can still decode. |
| self.assertEqual( |
| test_row, TableRowJsonCoder().decode(coder.encode(test_row))) |
| |
| def test_row_and_no_schema(self): |
| coder = TableRowJsonCoder() |
| test_row = bigquery.TableRow( |
| f=[ |
| bigquery.TableCell(v=to_json_value(e)) |
| for e in ['abc', 123, 123.456, True] |
| ]) |
| with self.assertRaisesRegex(AttributeError, |
| r'^The TableRowJsonCoder requires'): |
| coder.encode(test_row) |
| |
| def json_compliance_exception(self, value): |
| with self.assertRaisesRegex(ValueError, re.escape(JSON_COMPLIANCE_ERROR)): |
| schema_definition = [('f', 'FLOAT')] |
| schema = bigquery.TableSchema( |
| fields=[ |
| bigquery.TableFieldSchema(name=k, type=v) |
| for k, v in schema_definition |
| ]) |
| coder = TableRowJsonCoder(table_schema=schema) |
| test_row = bigquery.TableRow( |
| f=[bigquery.TableCell(v=to_json_value(value))]) |
| coder.encode(test_row) |
| |
| def test_invalid_json_nan(self): |
| self.json_compliance_exception(float('nan')) |
| |
| def test_invalid_json_inf(self): |
| self.json_compliance_exception(float('inf')) |
| |
| def test_invalid_json_neg_inf(self): |
| self.json_compliance_exception(float('-inf')) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestJsonToDictCoder(unittest.TestCase): |
| @staticmethod |
| def _make_schema(fields): |
| def _fill_schema(fields): |
| for field in fields: |
| table_field = bigquery.TableFieldSchema() |
| table_field.name, table_field.type, table_field.mode, nested_fields, \ |
| = field |
| if nested_fields: |
| table_field.fields = list(_fill_schema(nested_fields)) |
| yield table_field |
| |
| schema = bigquery.TableSchema() |
| schema.fields = list(_fill_schema(fields)) |
| return schema |
| |
| def test_coder_is_pickable(self): |
| try: |
| schema = self._make_schema([ |
| ( |
| 'record', |
| 'RECORD', |
| 'NULLABLE', [ |
| ('float', 'FLOAT', 'NULLABLE', []), |
| ]), |
| ('integer', 'INTEGER', 'NULLABLE', []), |
| ]) |
| coder = _JsonToDictCoder(schema) |
| pickler.loads(pickler.dumps(coder)) |
| except pickle.PicklingError: |
| self.fail('{} is not pickable'.format(coder.__class__.__name__)) |
| |
| def test_values_are_converted(self): |
| input_row = b'{"float": "10.5", "string": "abc"}' |
| expected_row = {'float': 10.5, 'string': 'abc'} |
| schema = self._make_schema([('float', 'FLOAT', 'NULLABLE', []), |
| ('string', 'STRING', 'NULLABLE', [])]) |
| coder = _JsonToDictCoder(schema) |
| |
| actual = coder.decode(input_row) |
| self.assertEqual(expected_row, actual) |
| |
| def test_null_fields_are_preserved(self): |
| input_row = b'{"float": "10.5"}' |
| expected_row = {'float': 10.5, 'string': None} |
| schema = self._make_schema([('float', 'FLOAT', 'NULLABLE', []), |
| ('string', 'STRING', 'NULLABLE', [])]) |
| coder = _JsonToDictCoder(schema) |
| |
| actual = coder.decode(input_row) |
| self.assertEqual(expected_row, actual) |
| |
| def test_record_field_is_properly_converted(self): |
| input_row = b'{"record": {"float": "55.5"}, "integer": 10}' |
| expected_row = {'record': {'float': 55.5}, 'integer': 10} |
| schema = self._make_schema([ |
| ( |
| 'record', |
| 'RECORD', |
| 'NULLABLE', [ |
| ('float', 'FLOAT', 'NULLABLE', []), |
| ]), |
| ('integer', 'INTEGER', 'NULLABLE', []), |
| ]) |
| coder = _JsonToDictCoder(schema) |
| |
| actual = coder.decode(input_row) |
| self.assertEqual(expected_row, actual) |
| |
| def test_record_and_repeatable_field_is_properly_converted(self): |
| input_row = b'{"record": [{"float": "55.5"}, {"float": "65.5"}], ' \ |
| b'"integer": 10}' |
| expected_row = {'record': [{'float': 55.5}, {'float': 65.5}], 'integer': 10} |
| schema = self._make_schema([ |
| ( |
| 'record', |
| 'RECORD', |
| 'REPEATED', [ |
| ('float', 'FLOAT', 'NULLABLE', []), |
| ]), |
| ('integer', 'INTEGER', 'NULLABLE', []), |
| ]) |
| coder = _JsonToDictCoder(schema) |
| |
| actual = coder.decode(input_row) |
| self.assertEqual(expected_row, actual) |
| |
| def test_repeatable_field_is_properly_converted(self): |
| input_row = b'{"repeated": ["55.5", "65.5"], "integer": "10"}' |
| expected_row = {'repeated': [55.5, 65.5], 'integer': 10} |
| schema = self._make_schema([ |
| ('repeated', 'FLOAT', 'REPEATED', []), |
| ('integer', 'INTEGER', 'NULLABLE', []), |
| ]) |
| coder = _JsonToDictCoder(schema) |
| |
| actual = coder.decode(input_row) |
| self.assertEqual(expected_row, actual) |
| |
| |
| @unittest.skipIf( |
| HttpError is None or HttpForbiddenError is None, |
| 'GCP dependencies are not installed') |
| class TestReadFromBigQuery(unittest.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| class UserDefinedOptions(PipelineOptions): |
| @classmethod |
| def _add_argparse_args(cls, parser): |
| parser.add_value_provider_argument('--gcs_location') |
| |
| cls.UserDefinedOptions = UserDefinedOptions |
| |
| def tearDown(self): |
| # Reset runtime options to avoid side-effects caused by other tests. |
| RuntimeValueProvider.set_runtime_options(None) |
| |
| @classmethod |
| def tearDownClass(cls): |
| # Unset the option added in setupClass to avoid interfere with other tests. |
| # Force a gc so PipelineOptions.__subclass__() no longer contains it. |
| del cls.UserDefinedOptions |
| gc.collect() |
| |
| def test_get_destination_uri_empty_runtime_vp(self): |
| with self.assertRaisesRegex(ValueError, |
| '^ReadFromBigQuery requires a GCS ' |
| 'location to be provided'): |
| # Don't provide any runtime values. |
| RuntimeValueProvider.set_runtime_options({}) |
| options = self.UserDefinedOptions() |
| |
| bigquery_export_destination_uri( |
| options.gcs_location, None, uuid.uuid4().hex) |
| |
| def test_get_destination_uri_none(self): |
| with self.assertRaisesRegex(ValueError, |
| '^ReadFromBigQuery requires a GCS ' |
| 'location to be provided'): |
| bigquery_export_destination_uri(None, None, uuid.uuid4().hex) |
| |
| def test_get_destination_uri_runtime_vp(self): |
| # Provide values at job-execution time. |
| RuntimeValueProvider.set_runtime_options({'gcs_location': 'gs://bucket'}) |
| options = self.UserDefinedOptions() |
| unique_id = uuid.uuid4().hex |
| |
| uri = bigquery_export_destination_uri(options.gcs_location, None, unique_id) |
| self.assertEqual( |
| uri, 'gs://bucket/' + unique_id + '/bigquery-table-dump-*.json') |
| |
| def test_get_destination_uri_static_vp(self): |
| unique_id = uuid.uuid4().hex |
| uri = bigquery_export_destination_uri( |
| StaticValueProvider(str, 'gs://bucket'), None, unique_id) |
| self.assertEqual( |
| uri, 'gs://bucket/' + unique_id + '/bigquery-table-dump-*.json') |
| |
| def test_get_destination_uri_fallback_temp_location(self): |
| # Don't provide any runtime values. |
| RuntimeValueProvider.set_runtime_options({}) |
| options = self.UserDefinedOptions() |
| |
| with self.assertLogs('apache_beam.io.gcp.bigquery_read_internal', |
| level='DEBUG') as context: |
| bigquery_export_destination_uri( |
| options.gcs_location, 'gs://bucket', uuid.uuid4().hex) |
| self.assertEqual( |
| context.output, |
| [ |
| 'DEBUG:apache_beam.io.gcp.bigquery_read_internal:gcs_location is ' |
| 'empty, using temp_location instead' |
| ]) |
| |
| @mock.patch.object(BigQueryWrapper, '_delete_table') |
| @mock.patch.object(BigQueryWrapper, '_delete_dataset') |
| @mock.patch('apache_beam.io.gcp.internal.clients.bigquery.BigqueryV2') |
| def test_temp_dataset_is_configurable( |
| self, api, delete_dataset, delete_table): |
| temp_dataset = bigquery.DatasetReference( |
| projectId='temp-project', datasetId='bq_dataset') |
| bq = BigQueryWrapper(client=api, temp_dataset_id=temp_dataset.datasetId) |
| gcs_location = 'gs://gcs_location' |
| |
| c = beam.io.gcp.bigquery._CustomBigQuerySource( |
| query='select * from test_table', |
| gcs_location=gcs_location, |
| method=beam.io.ReadFromBigQuery.Method.EXPORT, |
| validate=True, |
| pipeline_options=beam.options.pipeline_options.PipelineOptions(), |
| job_name='job_name', |
| step_name='step_name', |
| project='execution_project', |
| **{'temp_dataset': temp_dataset}) |
| |
| c._setup_temporary_dataset(bq) |
| api.datasets.assert_not_called() |
| |
| # User provided temporary dataset should not be deleted but the temporary |
| # table created by Beam should be deleted. |
| bq.clean_up_temporary_dataset(temp_dataset.projectId) |
| delete_dataset.assert_not_called() |
| delete_table.assert_called_with( |
| temp_dataset.projectId, temp_dataset.datasetId, mock.ANY) |
| |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.Forbidden if exceptions else None, |
| error_message='accessDenied'), |
| param( |
| exception_type=exceptions.ServiceUnavailable if exceptions else None, |
| error_message='backendError'), |
| ]) |
| def test_create_temp_dataset_exception(self, exception_type, error_message): |
| |
| # Uses the FnApiRunner to ensure errors are mocked/passed through correctly |
| with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, |
| 'Insert'),\ |
| mock.patch.object(BigQueryWrapper, |
| 'get_or_create_dataset') as mock_insert, \ |
| mock.patch('time.sleep'), \ |
| self.assertRaises(Exception) as exc,\ |
| beam.Pipeline('FnApiRunner') as p: |
| |
| mock_insert.side_effect = exception_type(error_message) |
| |
| _ = p | ReadFromBigQuery( |
| project='apache-beam-testing', |
| query='SELECT * FROM `project.dataset.table`', |
| gcs_location='gs://temp_location') |
| |
| mock_insert.assert_called() |
| self.assertIn(error_message, str(exc.exception)) |
| |
| @parameterized.expand([ |
| # read without exception |
| param(responses=[], expected_retries=0), |
| # first attempt returns a Http 500 blank error and retries |
| # second attempt returns a Http 408 blank error and retries, |
| # third attempt passes |
| param( |
| responses=[ |
| HttpForbiddenError( |
| response={'status': 500}, content="something", url="") |
| if HttpForbiddenError else None, |
| HttpForbiddenError( |
| response={'status': 408}, content="blank", url="") |
| if HttpForbiddenError else None |
| ], |
| expected_retries=2), |
| # first attempts returns a 403 rateLimitExceeded error |
| # second attempt returns a 429 blank error |
| # third attempt returns a Http 403 rateLimitExceeded error |
| # fourth attempt passes |
| param( |
| responses=[ |
| exceptions.Forbidden( |
| "some message", |
| errors=({ |
| "message": "transient", "reason": "rateLimitExceeded" |
| }, )) if exceptions else None, |
| exceptions.ResourceExhausted("some message") |
| if exceptions else None, |
| HttpForbiddenError( |
| response={'status': 403}, |
| content={ |
| "error": { |
| "errors": [{ |
| "message": "transient", |
| "reason": "rateLimitExceeded" |
| }] |
| } |
| }, |
| url="") if HttpForbiddenError else None, |
| ], |
| expected_retries=3), |
| ]) |
| def test_get_table_transient_exception(self, responses, expected_retries): |
| class DummyTable: |
| class DummySchema: |
| fields = [] |
| |
| numBytes = 5 |
| schema = DummySchema() |
| |
| # TODO(https://github.com/apache/beam/issues/34549): This test relies on |
| # lineage metrics which Prism doesn't seem to handle correctly. Defaulting |
| # to FnApiRunner instead. |
| with mock.patch('time.sleep'), \ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, |
| 'Get') as mock_get_table, \ |
| mock.patch.object(BigQueryWrapper, |
| 'wait_for_bq_job'), \ |
| mock.patch.object(BigQueryWrapper, |
| 'perform_extract_job'), \ |
| mock.patch.object(FileSystems, |
| 'match'), \ |
| mock.patch.object(FileSystems, |
| 'delete'), \ |
| beam.Pipeline('FnApiRunner') as p: |
| call_counter = 0 |
| |
| def store_callback(unused_request): |
| nonlocal call_counter |
| if call_counter < len(responses): |
| exception = responses[call_counter] |
| call_counter += 1 |
| raise exception |
| else: |
| call_counter += 1 |
| return DummyTable() |
| |
| mock_get_table.side_effect = store_callback |
| _ = p | beam.io.ReadFromBigQuery( |
| table="project.dataset.table", gcs_location="gs://some_bucket") |
| |
| # ReadFromBigQuery export mode calls get_table() twice. Once to get |
| # metadata (numBytes), and once to retrieve the table's schema |
| # Any additional calls are retries |
| self.assertEqual(expected_retries, mock_get_table.call_count - 2) |
| self.assertSetEqual( |
| Lineage.query(p.result.metrics(), Lineage.SOURCE), |
| set(["bigquery:project.dataset.table"])) |
| |
| @parameterized.expand([ |
| # first attempt returns a Http 429 with transient reason and retries |
| # second attempt returns a Http 403 with non-transient reason and fails |
| param( |
| responses=[ |
| HttpForbiddenError( |
| response={'status': 429}, |
| content={ |
| "error": { |
| "errors": [{ |
| "message": "transient", |
| "reason": "rateLimitExceeded" |
| }] |
| } |
| }, |
| url="") if HttpForbiddenError else None, |
| HttpForbiddenError( |
| response={'status': 403}, |
| content={ |
| "error": { |
| "errors": [{ |
| "message": "transient", "reason": "accessDenied" |
| }] |
| } |
| }, |
| url="") if HttpForbiddenError else None |
| ], |
| expected_retries=1), |
| # first attempt returns a transient 403 error and retries |
| # second attempt returns a 403 error with bad contents and fails |
| param( |
| responses=[ |
| HttpForbiddenError( |
| response={'status': 403}, |
| content={ |
| "error": { |
| "errors": [{ |
| "message": "transient", |
| "reason": "rateLimitExceeded" |
| }] |
| } |
| }, |
| url="") if HttpForbiddenError else None, |
| HttpError( |
| response={'status': 403}, content="bad contents", url="") |
| if HttpError else None |
| ], |
| expected_retries=1), |
| # first attempt returns a transient 403 error and retries |
| # second attempt returns a 429 error and retries |
| # third attempt returns a 403 with non-transient reason and fails |
| param( |
| responses=[ |
| exceptions.Forbidden( |
| "some error", |
| errors=({ |
| "message": "transient", "reason": "rateLimitExceeded" |
| }, )) if exceptions else None, |
| exceptions.ResourceExhausted("some transient error") |
| if exceptions else None, |
| exceptions.Forbidden( |
| "some error", |
| errors=({ |
| "message": "transient", "reason": "accessDenied" |
| }, )) if exceptions else None, |
| ], |
| expected_retries=2), |
| ]) |
| def test_get_table_non_transient_exception(self, responses, expected_retries): |
| class DummyTable: |
| class DummySchema: |
| fields = [] |
| |
| numBytes = 5 |
| schema = DummySchema() |
| |
| with mock.patch('time.sleep'), \ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, |
| 'Get') as mock_get_table, \ |
| mock.patch.object(BigQueryWrapper, |
| 'wait_for_bq_job'), \ |
| mock.patch.object(BigQueryWrapper, |
| 'perform_extract_job'), \ |
| mock.patch.object(FileSystems, |
| 'match'), \ |
| mock.patch.object(FileSystems, |
| 'delete'), \ |
| self.assertRaises(Exception), \ |
| beam.Pipeline() as p: |
| call_counter = 0 |
| |
| def store_callback(unused_request): |
| nonlocal call_counter |
| if call_counter < len(responses): |
| exception = responses[call_counter] |
| call_counter += 1 |
| raise exception |
| else: |
| call_counter += 1 |
| return DummyTable() |
| |
| mock_get_table.side_effect = store_callback |
| _ = p | beam.io.ReadFromBigQuery( |
| table="project.dataset.table", gcs_location="gs://some_bucket") |
| |
| # ReadFromBigQuery export mode calls get_table() twice. Once to get |
| # metadata (numBytes), and once to retrieve the table's schema |
| # However, the second call is never reached because this test will always |
| # fail before it does so |
| # After the first call, any additional calls are retries |
| self.assertEqual(expected_retries, mock_get_table.call_count - 1) |
| |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.BadRequest if exceptions else None, |
| error_message='invalidQuery'), |
| param( |
| exception_type=exceptions.NotFound if exceptions else None, |
| error_message='notFound'), |
| param( |
| exception_type=exceptions.Forbidden if exceptions else None, |
| error_message='responseTooLarge') |
| ]) |
| def test_query_job_exception(self, exception_type, error_message): |
| |
| # TODO(https://github.com/apache/beam/issues/34549): This test relies on |
| # mocking which prism doesn't seem to fully handle correctly (mocks get |
| # mixed between test runs). Pinning to FnApiRunner for now. |
| with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource, |
| 'estimate_size') as mock_estimate,\ |
| mock.patch.object(BigQueryWrapper, |
| 'get_query_location') as mock_query_location,\ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, |
| 'Insert') as mock_query_job,\ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.DatasetsService, 'Get'), \ |
| mock.patch('time.sleep'), \ |
| self.assertRaises(Exception) as exc, \ |
| beam.Pipeline('FnApiRunner') as p: |
| |
| mock_estimate.return_value = None |
| mock_query_location.return_value = None |
| mock_query_job.side_effect = exception_type(error_message) |
| |
| _ = p | ReadFromBigQuery( |
| query='SELECT * FROM `project.dataset.table`', |
| gcs_location='gs://temp_location') |
| |
| mock_query_job.assert_called() |
| self.assertIn(error_message, exc.exception.args[0]) |
| |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.BadRequest if exceptions else None, |
| error_message='invalid'), |
| param( |
| exception_type=exceptions.Forbidden if exceptions else None, |
| error_message='accessDenied') |
| ]) |
| def test_read_export_exception(self, exception_type, error_message): |
| |
| with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource, |
| 'estimate_size') as mock_estimate,\ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get'),\ |
| mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, |
| 'Insert') as mock_query_job, \ |
| mock.patch('time.sleep'), \ |
| self.assertRaises(Exception) as exc,\ |
| beam.Pipeline() as p: |
| |
| mock_estimate.return_value = None |
| mock_query_job.side_effect = exception_type(error_message) |
| |
| _ = p | ReadFromBigQuery( |
| project='apache-beam-testing', |
| method=ReadFromBigQuery.Method.EXPORT, |
| table='project:dataset.table', |
| gcs_location="gs://temp_location") |
| |
| mock_query_job.assert_called() |
| self.assertIn(error_message, str(exc.exception)) |
| |
| def test_read_direct_lineage(self): |
| # TODO(https://github.com/apache/beam/issues/34549): This test relies on |
| # lineage metrics which Prism doesn't seem to handle correctly. Defaulting |
| # to FnApiRunner instead. |
| with mock.patch.object(bigquery_tools.BigQueryWrapper, |
| '_bigquery_client'),\ |
| mock.patch.object(bq_storage.BigQueryReadClient, |
| 'create_read_session'),\ |
| beam.Pipeline('FnApiRunner') as p: |
| |
| _ = p | ReadFromBigQuery( |
| method=ReadFromBigQuery.Method.DIRECT_READ, |
| table='project:dataset.table') |
| self.assertSetEqual( |
| Lineage.query(p.result.metrics(), Lineage.SOURCE), |
| set(["bigquery:project.dataset.table"])) |
| |
| def test_read_all_lineage(self): |
| # TODO(https://github.com/apache/beam/issues/34549): This test relies on |
| # lineage metrics which Prism doesn't seem to handle correctly. Defaulting |
| # to FnApiRunner instead. |
| with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \ |
| beam.Pipeline('FnApiRunner') as p: |
| |
| export.return_value = (None, []) |
| |
| _ = ( |
| p |
| | beam.Create([ |
| beam.io.ReadFromBigQueryRequest(table='project1:dataset1.table1'), |
| beam.io.ReadFromBigQueryRequest(table='project2:dataset2.table2') |
| ]) |
| | beam.io.ReadAllFromBigQuery(gcs_location='gs://bucket/tmp')) |
| self.assertSetEqual( |
| Lineage.query(p.result.metrics(), Lineage.SOURCE), |
| set([ |
| 'bigquery:project1.dataset1.table1', |
| 'bigquery:project2.dataset2.table2' |
| ])) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQuerySink(unittest.TestCase): |
| def test_table_spec_display_data(self): |
| sink = beam.io.BigQuerySink('dataset.table') |
| dd = DisplayData.create_from(sink) |
| expected_items = [ |
| DisplayDataItemMatcher('table', 'dataset.table'), |
| DisplayDataItemMatcher('validation', False) |
| ] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_parse_schema_descriptor(self): |
| sink = beam.io.BigQuerySink('dataset.table', schema='s:STRING, n:INTEGER') |
| self.assertEqual(sink.table_reference.datasetId, 'dataset') |
| self.assertEqual(sink.table_reference.tableId, 'table') |
| result_schema = { |
| field['name']: field['type'] |
| for field in sink.schema['fields'] |
| } |
| self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema) |
| |
| def test_project_table_display_data(self): |
| sinkq = beam.io.BigQuerySink('project:dataset.table') |
| dd = DisplayData.create_from(sinkq) |
| expected_items = [ |
| DisplayDataItemMatcher('table', 'project:dataset.table'), |
| DisplayDataItemMatcher('validation', False) |
| ] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestWriteToBigQuery(unittest.TestCase): |
| def _cleanup_files(self): |
| if os.path.exists('insert_calls1'): |
| os.remove('insert_calls1') |
| |
| if os.path.exists('insert_calls2'): |
| os.remove('insert_calls2') |
| |
| def setUp(self): |
| self._cleanup_files() |
| |
| def tearDown(self): |
| self._cleanup_files() |
| |
| def test_noop_schema_parsing(self): |
| expected_table_schema = None |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( |
| schema=None) |
| self.assertEqual(expected_table_schema, table_schema) |
| |
| def test_dict_schema_parsing(self): |
| schema = { |
| 'fields': [{ |
| 'name': 's', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }, |
| { |
| 'name': 'r', |
| 'type': 'RECORD', |
| 'mode': 'NULLABLE', |
| 'fields': [{ |
| 'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }] |
| }] |
| } |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema) |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| expected_table_schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| self.assertEqual(expected_table_schema, table_schema) |
| |
| def test_string_schema_parsing(self): |
| schema = 's:STRING, n:INTEGER' |
| expected_dict_schema = { |
| 'fields': [{ |
| 'name': 's', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }] |
| } |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_table_schema_parsing(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| expected_dict_schema = { |
| 'fields': [{ |
| 'name': 's', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }, |
| { |
| 'name': 'r', |
| 'type': 'RECORD', |
| 'mode': 'NULLABLE', |
| 'fields': [{ |
| 'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }] |
| }] |
| } |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_table_schema_parsing_end_to_end(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(table_schema, schema) |
| |
| def test_none_schema_parsing(self): |
| schema = None |
| expected_dict_schema = None |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_noop_dict_schema_parsing(self): |
| schema = { |
| 'fields': [{ |
| 'name': 's', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }] |
| } |
| expected_dict_schema = schema |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_schema_autodetect_not_allowed_with_avro_file_loads(self): |
| with TestPipeline() as p: |
| pc = p | beam.Impulse() |
| |
| with self.assertRaisesRegex(ValueError, '^A schema must be provided'): |
| _ = ( |
| pc |
| | 'No Schema' >> beam.io.gcp.bigquery.WriteToBigQuery( |
| "dataset.table", |
| schema=None, |
| temp_file_format=bigquery_tools.FileFormat.AVRO)) |
| |
| with self.assertRaisesRegex(ValueError, |
| '^Schema auto-detection is not supported'): |
| _ = ( |
| pc |
| | 'Schema Autodetected' >> beam.io.gcp.bigquery.WriteToBigQuery( |
| "dataset.table", |
| schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT, |
| temp_file_format=bigquery_tools.FileFormat.AVRO)) |
| |
| def test_to_from_runner_api(self): |
| """Tests that serialization of WriteToBigQuery is correct. |
| |
| This is not intended to be a change-detector test. As such, this only tests |
| the more complicated serialization logic of parameters: ValueProviders, |
| callables, and side inputs. |
| """ |
| FULL_OUTPUT_TABLE = 'test_project:output_table' |
| |
| p = TestPipeline() |
| |
| # Used for testing side input parameters. |
| table_record_pcv = beam.pvalue.AsDict( |
| p | "MakeTable" >> beam.Create([('table', FULL_OUTPUT_TABLE)])) |
| |
| # Used for testing value provider parameters. |
| schema = value_provider.StaticValueProvider(str, '"a:str"') |
| |
| original = WriteToBigQuery( |
| table=lambda _, side_input: side_input['table'], |
| table_side_inputs=(table_record_pcv, ), |
| schema=schema) |
| |
| # pylint: disable=expression-not-assigned |
| p | beam.Create([]) | 'MyWriteToBigQuery' >> original |
| |
| # Run the pipeline through to generate a pipeline proto from an empty |
| # context. This ensures that the serialization code ran. |
| pipeline_proto, context = TestPipeline.from_runner_api( |
| p.to_runner_api(), p.runner, p.get_pipeline_options()).to_runner_api( |
| return_context=True) |
| |
| # Find the transform from the context. |
| write_to_bq_id = [ |
| k for k, v in pipeline_proto.components.transforms.items() |
| if v.unique_name == 'MyWriteToBigQuery' |
| ][0] |
| deserialized_node = context.transforms.get_by_id(write_to_bq_id) |
| deserialized = deserialized_node.transform |
| self.assertIsInstance(deserialized, WriteToBigQuery) |
| |
| # Test that the serialization of a value provider is correct. |
| self.assertEqual(original.schema, deserialized.schema) |
| |
| # Test that the serialization of a callable is correct. |
| self.assertEqual( |
| deserialized._table(None, {'table': FULL_OUTPUT_TABLE}), |
| FULL_OUTPUT_TABLE) |
| |
| # Test that the serialization of a side input is correct. |
| self.assertEqual( |
| len(original.table_side_inputs), len(deserialized.table_side_inputs)) |
| original_side_input_data = original.table_side_inputs[0]._side_input_data() |
| deserialized_side_input_data = deserialized.table_side_inputs[ |
| 0]._side_input_data() |
| self.assertEqual( |
| original_side_input_data.access_pattern, |
| deserialized_side_input_data.access_pattern) |
| self.assertEqual( |
| original_side_input_data.window_mapping_fn, |
| deserialized_side_input_data.window_mapping_fn) |
| self.assertEqual( |
| original_side_input_data.view_fn, deserialized_side_input_data.view_fn) |
| |
| def test_streaming_triggering_frequency_without_auto_sharding(self): |
| def noop(table, **kwargs): |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=noop) |
| opt = StandardOptions() |
| opt.streaming = True |
| with self.assertRaises(ValueError, |
| msg="triggering_frequency with STREAMING_INSERTS" + |
| "can only be used with with_auto_sharding=True"): |
| with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| triggering_frequency=1, |
| with_auto_sharding=False, |
| test_client=client)) |
| |
| def test_streaming_triggering_frequency_with_auto_sharding(self): |
| def noop(table, **kwargs): |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=noop) |
| opt = StandardOptions() |
| opt.streaming = True |
| with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| triggering_frequency=1, |
| with_auto_sharding=True, |
| test_client=client)) |
| |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert): |
| mock_insert.return_value = [] |
| table = 'project:dataset.table' |
| rows = [ |
| { |
| 'columnA': 'value1' |
| }, |
| { |
| 'columnA': 'value2' |
| }, |
| # this very large row exceeds max size, so should be sent to DLQ |
| { |
| 'columnA': "large_string" * 100 |
| } |
| ] |
| with beam.Pipeline() as p: |
| failed_rows = ( |
| p |
| | beam.Create(rows) |
| | WriteToBigQuery( |
| table=table, |
| method='STREAMING_INSERTS', |
| create_disposition='CREATE_NEVER', |
| schema='columnA:STRING', |
| max_insert_payload_size=500)) |
| |
| expected_failed_rows = [(table, rows[2])] |
| assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows)) |
| self.assertEqual(2, mock_insert.call_count) |
| |
| def test_max_retries_exceeds_limit(self): |
| table = 'project:dataset.table' |
| rows = [{'columnA': 'value1'}, {'columnA': 'value2'}] |
| with beam.Pipeline() as p: |
| data = p | beam.Create(rows) |
| |
| with self.assertRaises(ValueError) as context: |
| _ = data | 'WriteToBQ' >> WriteToBigQuery( |
| table=table, |
| schema='columnA:STRING', |
| method='STREAMING_INSERTS', |
| max_retries=MAX_INSERT_RETRIES + 1 # Exceeds the limit of 10000 |
| ) |
| |
| self.assertIn( |
| 'max_retries cannot be more than 10000', str(context.exception)) |
| |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.Forbidden if exceptions else None, |
| error_message='accessDenied'), |
| param( |
| exception_type=exceptions.ServiceUnavailable if exceptions else None, |
| error_message='backendError') |
| ]) |
| @unittest.skip('Not compatible with new GCS client. See GH issue #26334.') |
| def test_load_job_exception(self, exception_type, error_message): |
| |
| with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, |
| 'Insert') as mock_load_job,\ |
| mock.patch('apache_beam.io.gcp.internal.clients' |
| '.storage.storage_v1_client.StorageV1.ObjectsService'),\ |
| mock.patch('time.sleep'),\ |
| self.assertRaises(Exception) as exc,\ |
| beam.Pipeline() as p: |
| |
| mock_load_job.side_effect = exception_type(error_message) |
| |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| custom_gcs_temp_location="gs://temp_location", |
| method='FILE_LOADS')) |
| |
| mock_load_job.assert_called() |
| self.assertIn(error_message, exc.exception.args[0]) |
| |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.ServiceUnavailable if exceptions else None, |
| error_message='backendError'), |
| param( |
| exception_type=exceptions.InternalServerError if exceptions else None, |
| error_message='internalError'), |
| ]) |
| @unittest.skip('Not compatible with new GCS client. See GH issue #26334.') |
| def test_copy_load_job_exception(self, exception_type, error_message): |
| |
| from apache_beam.io.gcp import bigquery_file_loads |
| |
| old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE |
| old_max_partition_size = bigquery_file_loads._MAXIMUM_LOAD_SIZE |
| old_max_files_per_partition = bigquery_file_loads._MAXIMUM_SOURCE_URIS |
| bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 15 |
| bigquery_file_loads._MAXIMUM_LOAD_SIZE = 30 |
| bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1 |
| |
| with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, |
| 'Insert') as mock_insert_copy_job, \ |
| mock.patch.object(BigQueryWrapper, |
| 'perform_load_job') as mock_load_job, \ |
| mock.patch.object(BigQueryWrapper, |
| 'wait_for_bq_job'), \ |
| mock.patch('apache_beam.io.gcp.internal.clients' |
| '.storage.storage_v1_client.StorageV1.ObjectsService'),\ |
| mock.patch('time.sleep'), \ |
| self.assertRaises(Exception) as exc, \ |
| beam.Pipeline() as p: |
| |
| mock_insert_copy_job.side_effect = exception_type(error_message) |
| |
| dummy_job_reference = beam.io.gcp.internal.clients.bigquery.JobReference() |
| dummy_job_reference.jobId = 'job_id' |
| dummy_job_reference.location = 'US' |
| dummy_job_reference.projectId = 'apache-beam-testing' |
| |
| mock_load_job.return_value = dummy_job_reference |
| |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }, { |
| 'columnA': 'value3' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| custom_gcs_temp_location="gs://temp_location", |
| method='FILE_LOADS')) |
| |
| bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size |
| bigquery_file_loads._MAXIMUM_LOAD_SIZE = old_max_partition_size |
| bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files_per_partition |
| |
| self.assertEqual(4, mock_insert_copy_job.call_count) |
| self.assertIn(error_message, exc.exception.args[0]) |
| |
| |
| @unittest.skipIf( |
| HttpError is None or exceptions is None, |
| 'GCP dependencies are not installed') |
| class BigQueryStreamingInsertsErrorHandling(unittest.TestCase): |
| |
| # Running tests with a variety of exceptions from https://googleapis.dev |
| # /python/google-api-core/latest/_modules/google/api_core/exceptions.html. |
| # Choosing some exceptions that produce reasons included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt |
| # transient error retried and succeeds on second attempt, 0 rows sent to |
| # failed rows |
| param( |
| insert_response=[ |
| exceptions.TooManyRequests if exceptions else None, None |
| ], |
| error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS |
| failed_rows=[]), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to |
| # failed rows after hitting max_retries |
| param( |
| insert_response=[ |
| exceptions.InternalServerError if exceptions else None, |
| exceptions.InternalServerError if exceptions else None |
| ], |
| error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS |
| failed_rows=['value1', 'value3', 'value5']), |
| # reason in _NON_TRANSIENT_ERRORS for row 1 on both attempts, sent to |
| # failed_rows after hitting max_retries |
| param( |
| insert_response=[ |
| exceptions.Forbidden if exceptions else None, |
| exceptions.Forbidden if exceptions else None |
| ], |
| error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS |
| failed_rows=['value1', 'value3', 'value5']), |
| ]) |
| def test_insert_rows_json_exception_retry_always( |
| self, insert_response, error_reason, failed_rows): |
| # In this test, a pipeline will always retry all caught exception types |
| # since RetryStrategy is not set and defaults to RETRY_ALWAYS |
| with mock.patch('time.sleep'): |
| call_counter = 0 |
| mock_response = mock.Mock() |
| mock_response.reason = error_reason |
| |
| def store_callback(table, **kwargs): |
| nonlocal call_counter |
| # raise exception if insert_response element is an exception |
| if insert_response[call_counter]: |
| exception_type = insert_response[call_counter] |
| call_counter += 1 |
| raise exception_type('some exception', response=mock_response) |
| # return empty list if not insert_response element, indicating |
| # successful call to insert_rows_json |
| else: |
| call_counter += 1 |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json.side_effect = store_callback |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1', 'columnB': 'value2' |
| }, { |
| 'columnA': 'value3', 'columnB': 'value4' |
| }, { |
| 'columnA': 'value5', 'columnB': 'value6' |
| }]) |
| # Using _StreamToBigQuery in order to be able to pass max_retries |
| # in order to limit run time of test with RETRY_ALWAYS |
| | _StreamToBigQuery( |
| table_reference='project:dataset.table', |
| table_side_inputs=[], |
| schema_side_inputs=[], |
| schema='anyschema', |
| batch_size=None, |
| triggering_frequency=None, |
| create_disposition='CREATE_NEVER', |
| write_disposition=None, |
| kms_key=None, |
| retry_strategy=RetryStrategy.RETRY_ALWAYS, |
| additional_bq_parameters=[], |
| ignore_insert_ids=False, |
| ignore_unknown_columns=False, |
| with_auto_sharding=False, |
| test_client=client, |
| max_retries=len(insert_response) - 1, |
| num_streaming_keys=500)) |
| |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values, equal_to(failed_rows)) |
| |
| # Running tests with a variety of exceptions from https://googleapis.dev |
| # /python/google-api-core/latest/_modules/google/api_core/exceptions.html. |
| # Choosing some exceptions that produce reasons that are included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| param( |
| # not in _NON_TRANSIENT_ERRORS |
| exception_type=exceptions.BadGateway if exceptions else None, |
| error_reason='Bad Gateway'), |
| param( |
| # in _NON_TRANSIENT_ERRORS |
| exception_type=exceptions.Unauthorized if exceptions else None, |
| error_reason='Unauthorized'), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_exception_retry_never( |
| self, mock_send, unused_mock_sleep, exception_type, error_reason): |
| # In this test, a pipeline will never retry caught exception types |
| # since RetryStrategy is set to RETRY_NEVER |
| mock_response = mock.Mock() |
| mock_response.reason = error_reason |
| mock_send.side_effect = [ |
| exception_type('some exception', response=mock_response) |
| ] |
| |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS', |
| insert_retry_strategy=RetryStrategy.RETRY_NEVER)) |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values, equal_to(['value1', 'value2'])) |
| |
| self.assertEqual(1, mock_send.call_count) |
| |
| # Running tests with a variety of exceptions from https://googleapis.dev |
| # /python/google-api-core/latest/_modules/google/api_core/exceptions.html. |
| # Choosing some exceptions that produce reasons that are included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| param( |
| exception_type=exceptions.DeadlineExceeded if exceptions else None, |
| error_reason='Deadline Exceeded', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.Conflict if exceptions else None, |
| error_reason='Conflict', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.TooManyRequests if exceptions else None, |
| error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.InternalServerError if exceptions else None, |
| error_reason='Internal Server Error', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.BadGateway if exceptions else None, |
| error_reason='Bad Gateway', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.ServiceUnavailable if exceptions else None, |
| error_reason='Service Unavailable', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.GatewayTimeout if exceptions else None, |
| error_reason='Gateway Timeout', # not in _NON_TRANSIENT_ERRORS |
| failed_values=[], |
| expected_call_count=2), |
| param( |
| exception_type=exceptions.BadRequest if exceptions else None, |
| error_reason='Bad Request', # in _NON_TRANSIENT_ERRORS |
| failed_values=['value1', 'value2'], |
| expected_call_count=1), |
| param( |
| exception_type=exceptions.Unauthorized if exceptions else None, |
| error_reason='Unauthorized', # in _NON_TRANSIENT_ERRORS |
| failed_values=['value1', 'value2'], |
| expected_call_count=1), |
| param( |
| exception_type=exceptions.Forbidden if exceptions else None, |
| error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS |
| failed_values=['value1', 'value2'], |
| expected_call_count=1), |
| param( |
| exception_type=exceptions.NotFound if exceptions else None, |
| error_reason='Not Found', # in _NON_TRANSIENT_ERRORS |
| failed_values=['value1', 'value2'], |
| expected_call_count=1), |
| param( |
| exception_type=exceptions.MethodNotImplemented |
| if exceptions else None, |
| error_reason='Not Implemented', # in _NON_TRANSIENT_ERRORS |
| failed_values=['value1', 'value2'], |
| expected_call_count=1), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_exception_retry_on_transient_error( |
| self, |
| mock_send, |
| unused_mock_sleep, |
| exception_type, |
| error_reason, |
| failed_values, |
| expected_call_count): |
| # In this test, a pipeline will only retry caught exception types |
| # with reasons that are not in _NON_TRANSIENT_ERRORS since RetryStrategy is |
| # set to RETRY_ON_TRANSIENT_ERROR |
| mock_response = mock.Mock() |
| mock_response.reason = error_reason |
| mock_send.side_effect = [ |
| exception_type('some exception', response=mock_response), |
| # Return no exception and no errors on 2nd call, if there is a 2nd call |
| [] |
| ] |
| |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS', |
| insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR)) |
| failed_values_out = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values_out, equal_to(failed_values)) |
| self.assertEqual(expected_call_count, mock_send.call_count) |
| |
| # Running tests with persistent exceptions with exception types not |
| # caught in BigQueryWrapper._insert_all_rows but retriable by |
| # retry.with_exponential_backoff |
| @parameterized.expand([ |
| param( |
| exception_type=requests.exceptions.ConnectionError, |
| error_message='some connection error'), |
| param( |
| exception_type=requests.exceptions.Timeout, |
| error_message='some timeout error'), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_persistent_retriable_exception( |
| self, mock_send, unused_mock_sleep, exception_type, error_message): |
| # In this test, each insert_rows_json call will result in an exception |
| # and be retried with retry.with_exponential_backoff until MAX_RETRIES is |
| # reached |
| mock_send.side_effect = exception_type(error_message) |
| |
| # Expecting 1 initial call plus maximum number of retries |
| expected_call_count = 1 + bigquery_tools.MAX_RETRIES |
| |
| with self.assertRaises(Exception) as exc: |
| with beam.Pipeline() as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS')) |
| |
| self.assertEqual(expected_call_count, mock_send.call_count) |
| self.assertIn(error_message, exc.exception.args[0]) |
| |
| # Running tests with intermittent exceptions with exception types not |
| # caught in BigQueryWrapper._insert_all_rows but retriable by |
| # retry.with_exponential_backoff |
| @parameterized.expand([ |
| param( |
| exception_type=requests.exceptions.ConnectionError, |
| error_message='some connection error'), |
| param( |
| exception_type=requests.exceptions.Timeout, |
| error_message='some timeout error'), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_intermittent_retriable_exception( |
| self, mock_send, unused_mock_sleep, exception_type, error_message): |
| # In this test, the first 2 insert_rows_json calls will result in an |
| # exception and be retried with retry.with_exponential_backoff. The last |
| # call will not raise an exception and will succeed. |
| mock_send.side_effect = [ |
| exception_type(error_message), exception_type(error_message), [] |
| ] |
| |
| # This relies on DirectRunner-specific mocking behavior which can be |
| # inconsistent on Prism |
| with beam.Pipeline('FnApiRunner') as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS')) |
| |
| self.assertEqual(3, mock_send.call_count) |
| |
| # Running tests with a variety of error reasons from |
| # https://cloud.google.com/bigquery/docs/error-messages |
| # This covers the scenario when |
| # the google.cloud.bigquery.Client.insert_rows_json call returns an error list |
| # rather than raising an exception. |
| # Choosing some error reasons that are included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| ], |
| failed_rows=['value1']), |
| # reason in _NON_TRANSIENT_ERRORS for row 1 |
| # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st run |
| # row 1 sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }, { |
| 'index': 1, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| ], |
| failed_rows=['value1']), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1 on first attempt |
| # transient error succeeds on second attempt, 0 rows sent to failed rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [], |
| ], |
| failed_rows=[]), |
| ]) |
| def test_insert_rows_json_errors_retry_always( |
| self, insert_response, failed_rows, unused_sleep_mock=None): |
| # In this test, a pipeline will always retry all errors |
| # since RetryStrategy is not set and defaults to RETRY_ALWAYS |
| with mock.patch('time.sleep'): |
| call_counter = 0 |
| |
| def store_callback(table, **kwargs): |
| nonlocal call_counter |
| response = insert_response[call_counter] |
| call_counter += 1 |
| return response |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=store_callback) |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1', 'columnB': 'value2' |
| }, { |
| 'columnA': 'value3', 'columnB': 'value4' |
| }, { |
| 'columnA': 'value5', 'columnB': 'value6' |
| }]) |
| # Using _StreamToBigQuery in order to be able to pass max_retries |
| # in order to limit run time of test with RETRY_ALWAYS |
| | _StreamToBigQuery( |
| table_reference='project:dataset.table', |
| table_side_inputs=[], |
| schema_side_inputs=[], |
| schema='anyschema', |
| batch_size=None, |
| triggering_frequency=None, |
| create_disposition='CREATE_NEVER', |
| write_disposition=None, |
| kms_key=None, |
| retry_strategy=RetryStrategy.RETRY_ALWAYS, |
| additional_bq_parameters=[], |
| ignore_insert_ids=False, |
| ignore_unknown_columns=False, |
| with_auto_sharding=False, |
| test_client=client, |
| max_retries=len(insert_response) - 1, |
| num_streaming_keys=500)) |
| |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values, equal_to(failed_rows)) |
| |
| # Running tests with a variety of error reasons from |
| # https://cloud.google.com/bigquery/docs/error-messages |
| # This covers the scenario when |
| # the google.cloud.bigquery.Client.insert_rows_json call returns an error list |
| # rather than raising an exception. |
| # Choosing some error reasons that are included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalidQuery' |
| }] |
| }], |
| ], |
| streaming=False), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| ], |
| streaming=False), |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| ], |
| streaming=True), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| ], |
| streaming=True), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_errors_retry_never( |
| self, mock_send, unused_mock_sleep, insert_response, streaming): |
| # In this test, a pipeline will never retry errors since RetryStrategy is |
| # set to RETRY_NEVER |
| mock_send.side_effect = insert_response |
| opt = StandardOptions() |
| opt.streaming = streaming |
| with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS', |
| insert_retry_strategy=RetryStrategy.RETRY_NEVER)) |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values, equal_to(['value1'])) |
| |
| self.assertEqual(1, mock_send.call_count) |
| |
| # Running tests with a variety of error reasons from |
| # https://cloud.google.com/bigquery/docs/error-messages |
| # This covers the scenario when |
| # the google.cloud.bigquery.Client.insert_rows_json call returns an error list |
| # rather than raising an exception. |
| # Choosing some error reasons that are included in |
| # bigquery_tools._NON_TRANSIENT_ERRORS and some that are not |
| @parameterized.expand([ |
| # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| ], |
| failed_rows=['value1'], |
| streaming=False), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt |
| # transient error succeeds on 2nd attempt, 0 rows sent to failed rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [], |
| ], |
| failed_rows=[], |
| streaming=False), |
| # reason in _NON_TRANSIENT_ERRORS for row 1 |
| # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt |
| # all rows with errors are retried when any row has a retriable error |
| # row 1 sent to failed_rows after final attempt |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }, { |
| 'index': 1, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [ |
| { |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }, |
| ], |
| ], |
| failed_rows=['value1'], |
| streaming=False), |
| # reason in _NON_TRANSIENT_ERRORS for row 1, sent to failed_rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }], |
| ], |
| failed_rows=['value1'], |
| streaming=True), |
| # reason not in _NON_TRANSIENT_ERRORS for row 1 on 1st attempt |
| # transient error succeeds on 2nd attempt, 0 rows sent to failed rows |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [], |
| ], |
| failed_rows=[], |
| streaming=True), |
| # reason in _NON_TRANSIENT_ERRORS for row 1 |
| # reason not in _NON_TRANSIENT_ERRORS for row 2 on 1st and 2nd attempt |
| # all rows with errors are retried when any row has a retriable error |
| # row 1 sent to failed_rows after final attempt |
| param( |
| insert_response=[ |
| [{ |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }, { |
| 'index': 1, 'errors': [{ |
| 'reason': 'internalError' |
| }] |
| }], |
| [ |
| { |
| 'index': 0, 'errors': [{ |
| 'reason': 'invalid' |
| }] |
| }, |
| ], |
| ], |
| failed_rows=['value1'], |
| streaming=True), |
| ]) |
| @mock.patch('time.sleep') |
| @mock.patch('google.cloud.bigquery.Client.insert_rows_json') |
| def test_insert_rows_json_errors_retry_on_transient_error( |
| self, |
| mock_send, |
| unused_mock_sleep, |
| insert_response, |
| failed_rows, |
| streaming=False): |
| # In this test, a pipeline will only retry errors with reasons that are not |
| # in _NON_TRANSIENT_ERRORS since RetryStrategy is set to |
| # RETRY_ON_TRANSIENT_ERROR |
| call_counter = 0 |
| |
| def store_callback(table, **kwargs): |
| nonlocal call_counter |
| response = insert_response[call_counter] |
| call_counter += 1 |
| return response |
| |
| mock_send.side_effect = store_callback |
| |
| opt = StandardOptions() |
| opt.streaming = streaming |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1' |
| }, { |
| 'columnA': 'value2' |
| }, { |
| 'columnA': 'value3' |
| }]) |
| | WriteToBigQuery( |
| table='project:dataset.table', |
| schema={ |
| 'fields': [{ |
| 'name': 'columnA', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| }, |
| create_disposition='CREATE_NEVER', |
| method='STREAMING_INSERTS', |
| insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR)) |
| |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that(failed_values, equal_to(failed_rows)) |
| |
| |
| def test_with_batched_input_exceeds_size_limit(self): |
| from apache_beam.utils.windowed_value import WindowedValue |
| from apache_beam.transforms import window |
| |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=10, |
| create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, |
| write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, |
| kms_key=None, |
| with_batched_input=True, |
| max_insert_payload_size=500, # Small size to trigger the check |
| test_client=client) |
| |
| fn.start_bundle() |
| |
| # Create rows where the third one exceeds the size limit |
| rows = [ |
| ({ |
| 'month': 1 |
| }, 'insertid1'), |
| ({ |
| 'month': 2 |
| }, 'insertid2'), |
| ({ |
| 'columnA': 'large_string' * 100 |
| }, 'insertid3') # This exceeds 500 bytes |
| ] |
| |
| # Create a windowed value for testing |
| test_window = window.GlobalWindow() |
| test_timestamp = window.MIN_TIMESTAMP |
| windowed_value = WindowedValue(None, test_timestamp, [test_window]) |
| |
| # Process the batched input |
| result = fn.process(('project-id:dataset_id.table_id', rows), windowed_value) |
| |
| # Convert generator to list to check results |
| failed_rows = list(result) |
| |
| # Should have 2 failed outputs (FAILED_ROWS_WITH_ERRORS and FAILED_ROWS) |
| self.assertEqual(len(failed_rows), 2) |
| |
| # Check that the large row was sent to DLQ |
| failed_with_errors = [ |
| r for r in failed_rows if r.tag == fn.FAILED_ROWS_WITH_ERRORS |
| ] |
| failed_without_errors = [r for r in failed_rows if r.tag == fn.FAILED_ROWS] |
| |
| self.assertEqual(len(failed_with_errors), 1) |
| self.assertEqual(len(failed_without_errors), 1) |
| |
| # Verify the error message |
| destination, row, error = failed_with_errors[0].value.value |
| self.assertEqual(destination, 'project-id:dataset_id.table_id') |
| self.assertEqual(row, {'columnA': 'large_string' * 100}) |
| self.assertIn('exceeds the maximum insert payload size', error) |
| |
| # Verify that only the valid rows were sent to BigQuery |
| self.assertTrue(client.insert_rows_json.called) |
| # Check that the rows were inserted (might be in multiple batches) |
| total_rows = [] |
| for call in client.insert_rows_json.call_args_list: |
| total_rows.extend(call[1]['json_rows']) |
| |
| self.assertEqual(len(total_rows), 2) |
| self.assertEqual(total_rows[0], {'month': 1}) |
| self.assertEqual(total_rows[1], {'month': 2}) |
| |
| |
| def test_with_batched_input_splits_large_batch(self): |
| from apache_beam.utils.windowed_value import WindowedValue |
| from apache_beam.transforms import window |
| |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=10, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| with_batched_input=True, |
| max_insert_payload_size=800, # Small size to force batch splitting |
| test_client=client) |
| |
| fn.start_bundle() |
| |
| # Create rows that together exceed the batch size limit. |
| # Each row with 'data' * 10 is about 200 bytes |
| # So 2 rows should fit, 3rd should cause flush. |
| rows = [ |
| ({ |
| 'data': 'x' * 10 |
| }, 'insertid1'), |
| ({ |
| 'data': 'y' * 10 |
| }, 'insertid2'), |
| ({ |
| 'data': 'z' * 10 |
| }, 'insertid3'), # This should go in second batch |
| ] |
| |
| # Create a windowed value for testing |
| test_window = window.GlobalWindow() |
| test_timestamp = window.MIN_TIMESTAMP |
| windowed_value = WindowedValue(None, test_timestamp, [test_window]) |
| |
| # Process the batched input |
| result = fn.process(('project-id:dataset_id.table_id', rows), windowed_value) |
| |
| # Convert generator to list (should be empty as no failures) |
| failed_rows = list(result) |
| self.assertEqual(len(failed_rows), 0) |
| |
| # With 800 byte limit and ~341 byte rows |
| # we should be able to fit 2 rows per batch. |
| # So we expect 2 calls: [row1, row2] and [row3] |
| self.assertEqual(client.insert_rows_json.call_count, 2) |
| |
| # Check first batch (2 rows) |
| first_call = client.insert_rows_json.call_args_list[0][1] |
| self.assertEqual(len(first_call['json_rows']), 2) |
| self.assertEqual(first_call['json_rows'][0], {'data': 'x' * 10}) |
| self.assertEqual(first_call['json_rows'][1], {'data': 'y' * 10}) |
| |
| # Check second batch (1 row) |
| second_call = client.insert_rows_json.call_args_list[1][1] |
| self.assertEqual(len(second_call['json_rows']), 1) |
| self.assertEqual(second_call['json_rows'][0], {'data': 'z' * 10}) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class BigQueryStreamingInsertTransformTests(unittest.TestCase): |
| def test_dofn_client_process_performs_batching(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=2, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| test_client=client) |
| |
| fn.process(('project-id:dataset_id.table_id', {'month': 1})) |
| |
| # InsertRows not called as batch size is not hit yet |
| self.assertFalse(client.insert_rows_json.called) |
| |
| def test_dofn_client_process_flush_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=2, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| test_client=client) |
| |
| fn.start_bundle() |
| fn.process(('project-id:dataset_id.table_id', ({'month': 1}, 'insertid1'))) |
| fn.process(('project-id:dataset_id.table_id', ({'month': 2}, 'insertid2'))) |
| fn.finish_bundle() |
| # InsertRows called as batch size is hit |
| self.assertTrue(client.insert_rows_json.called) |
| |
| def test_dofn_client_finish_bundle_flush_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=2, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| test_client=client) |
| |
| fn.start_bundle() |
| |
| # Destination is a tuple of (destination, schema) to ensure the table is |
| # created. |
| fn.process(('project-id:dataset_id.table_id', ({'month': 1}, 'insertid3'))) |
| |
| self.assertTrue(client.tables.Get.called) |
| # InsertRows not called as batch size is not hit |
| self.assertFalse(client.insert_rows_json.called) |
| |
| fn.finish_bundle() |
| # InsertRows called in finish bundle |
| self.assertTrue(client.insert_rows_json.called) |
| |
| def test_dofn_client_no_records(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.tabledata.InsertAll.return_value = \ |
| bigquery.TableDataInsertAllResponse(insertErrors=[]) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=2, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| test_client=client) |
| |
| fn.start_bundle() |
| # InsertRows not called as batch size is not hit |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| fn.finish_bundle() |
| # InsertRows not called in finish bundle as no records |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| def test_with_batched_input(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project-id', datasetId='dataset_id', tableId='table_id')) |
| client.insert_rows_json.return_value = [] |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| batch_size=10, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| kms_key=None, |
| with_batched_input=True, |
| test_client=client) |
| |
| fn.start_bundle() |
| |
| # Destination is a tuple of (destination, schema) to ensure the table is |
| # created. |
| fn.process(( |
| 'project-id:dataset_id.table_id', |
| [({ |
| 'month': 1 |
| }, 'insertid3'), ({ |
| 'month': 2 |
| }, 'insertid2'), ({ |
| 'month': 3 |
| }, 'insertid1')])) |
| |
| # InsertRows called since the input is already batched. |
| self.assertTrue(client.insert_rows_json.called) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): |
| @mock.patch('time.sleep') |
| def test_failure_has_same_insert_ids(self, unused_mock_sleep): |
| tempdir = '%s%s' % (self._new_tempdir(), os.sep) |
| file_name_1 = os.path.join(tempdir, 'file1') |
| file_name_2 = os.path.join(tempdir, 'file2') |
| |
| def store_callback(table, **kwargs): |
| insert_ids = [r for r in kwargs['row_ids']] |
| colA_values = [r['columnA'] for r in kwargs['json_rows']] |
| json_output = {'insertIds': insert_ids, 'colA_values': colA_values} |
| # The first time we try to insert, we save those insertions in |
| # file insert_calls1. |
| if not os.path.exists(file_name_1): |
| with open(file_name_1, 'w') as f: |
| json.dump(json_output, f) |
| raise RuntimeError() |
| else: |
| with open(file_name_2, 'w') as f: |
| json.dump(json_output, f) |
| |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=store_callback) |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1', 'columnB': 'value2' |
| }, { |
| 'columnA': 'value3', 'columnB': 'value4' |
| }, { |
| 'columnA': 'value5', 'columnB': 'value6' |
| }]) |
| | _StreamToBigQuery( |
| table_reference='project:dataset.table', |
| table_side_inputs=[], |
| schema_side_inputs=[], |
| schema='anyschema', |
| batch_size=None, |
| triggering_frequency=None, |
| create_disposition='CREATE_NEVER', |
| write_disposition=None, |
| kms_key=None, |
| retry_strategy=None, |
| additional_bq_parameters=[], |
| ignore_insert_ids=False, |
| ignore_unknown_columns=False, |
| with_auto_sharding=False, |
| test_client=client, |
| num_streaming_keys=500)) |
| |
| with open(file_name_1) as f1, open(file_name_2) as f2: |
| self.assertEqual(json.load(f1), json.load(f2)) |
| |
| @parameterized.expand([ |
| param(retry_strategy=RetryStrategy.RETRY_ALWAYS), |
| param(retry_strategy=RetryStrategy.RETRY_NEVER), |
| param(retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR), |
| ]) |
| def test_failure_in_some_rows_does_not_duplicate(self, retry_strategy=None): |
| with mock.patch('time.sleep'): |
| # In this test we simulate a failure to write out two out of three rows. |
| # Row 0 and row 2 fail to be written on the first attempt, and then |
| # succeed on the next attempt (if there is one). |
| tempdir = '%s%s' % (self._new_tempdir(), os.sep) |
| file_name_1 = os.path.join(tempdir, 'file1_partial') |
| file_name_2 = os.path.join(tempdir, 'file2_partial') |
| |
| def store_callback(table, **kwargs): |
| insert_ids = [r for r in kwargs['row_ids']] |
| colA_values = [r['columnA'] for r in kwargs['json_rows']] |
| |
| # The first time this function is called, all rows are included |
| # so we need to filter out 'failed' rows. |
| json_output_1 = { |
| 'insertIds': [insert_ids[1]], 'colA_values': [colA_values[1]] |
| } |
| # The second time this function is called, only rows 0 and 2 are incl |
| # so we don't need to filter any of them. We just write them all out. |
| json_output_2 = {'insertIds': insert_ids, 'colA_values': colA_values} |
| |
| # The first time we try to insert, we save those insertions in |
| # file insert_calls1. |
| if not os.path.exists(file_name_1): |
| with open(file_name_1, 'w') as f: |
| json.dump(json_output_1, f) |
| return [ |
| { |
| 'index': 0, |
| 'errors': [{ |
| 'reason': 'i dont like this row' |
| }, { |
| 'reason': 'its bad' |
| }] |
| }, |
| { |
| 'index': 2, |
| 'errors': [{ |
| 'reason': 'i het this row' |
| }, { |
| 'reason': 'its no gud' |
| }] |
| }, |
| ] |
| else: |
| with open(file_name_2, 'w') as f: |
| json.dump(json_output_2, f) |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=store_callback) |
| |
| # The expected rows to be inserted according to the insert strategy |
| if retry_strategy == RetryStrategy.RETRY_NEVER: |
| result = ['value3'] |
| else: # RETRY_ALWAYS and RETRY_ON_TRANSIENT_ERRORS should insert all rows |
| result = ['value1', 'value3', 'value5'] |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| bq_write_out = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1', 'columnB': 'value2' |
| }, { |
| 'columnA': 'value3', 'columnB': 'value4' |
| }, { |
| 'columnA': 'value5', 'columnB': 'value6' |
| }]) |
| | _StreamToBigQuery( |
| table_reference='project:dataset.table', |
| table_side_inputs=[], |
| schema_side_inputs=[], |
| schema='anyschema', |
| batch_size=None, |
| triggering_frequency=None, |
| create_disposition='CREATE_NEVER', |
| write_disposition=None, |
| kms_key=None, |
| retry_strategy=retry_strategy, |
| additional_bq_parameters=[], |
| ignore_insert_ids=False, |
| ignore_unknown_columns=False, |
| with_auto_sharding=False, |
| test_client=client, |
| num_streaming_keys=500)) |
| |
| failed_values = ( |
| bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] |
| | beam.Map(lambda x: x[1]['columnA'])) |
| |
| assert_that( |
| failed_values, |
| equal_to(list({'value1', 'value3', 'value5'}.difference(result)))) |
| |
| data1 = _load_or_default(file_name_1) |
| data2 = _load_or_default(file_name_2) |
| |
| self.assertListEqual( |
| sorted(data1.get('colA_values', []) + data2.get('colA_values', [])), |
| result) |
| self.assertEqual(len(data1['colA_values']), 1) |
| |
| @parameterized.expand([ |
| param(with_auto_sharding=False), |
| param(with_auto_sharding=True), |
| ]) |
| def test_batch_size_with_auto_sharding(self, with_auto_sharding): |
| tempdir = '%s%s' % (self._new_tempdir(), os.sep) |
| file_name_1 = os.path.join(tempdir, 'file1') |
| file_name_2 = os.path.join(tempdir, 'file2') |
| |
| def store_callback(table, **kwargs): |
| insert_ids = [r for r in kwargs['row_ids']] |
| colA_values = [r['columnA'] for r in kwargs['json_rows']] |
| json_output = {'insertIds': insert_ids, 'colA_values': colA_values} |
| # Expect two batches of rows will be inserted. Store them separately. |
| if not os.path.exists(file_name_1): |
| with open(file_name_1, 'w') as f: |
| json.dump(json_output, f) |
| else: |
| with open(file_name_2, 'w') as f: |
| json.dump(json_output, f) |
| |
| return [] |
| |
| client = mock.Mock() |
| client.insert_rows_json = mock.Mock(side_effect=store_callback) |
| |
| # Using the bundle based direct runner to avoid pickling problems |
| # with mocks. |
| with beam.Pipeline(runner='BundleBasedDirectRunner') as p: |
| _ = ( |
| p |
| | beam.Create([{ |
| 'columnA': 'value1', 'columnB': 'value2' |
| }, { |
| 'columnA': 'value3', 'columnB': 'value4' |
| }, { |
| 'columnA': 'value5', 'columnB': 'value6' |
| }]) |
| | _StreamToBigQuery( |
| table_reference='project:dataset.table', |
| table_side_inputs=[], |
| schema_side_inputs=[], |
| schema='anyschema', |
| # Set a batch size such that the input elements will be inserted |
| # in 2 batches. |
| batch_size=2, |
| triggering_frequency=None, |
| create_disposition='CREATE_NEVER', |
| write_disposition=None, |
| kms_key=None, |
| retry_strategy=None, |
| additional_bq_parameters=[], |
| ignore_insert_ids=False, |
| ignore_unknown_columns=False, |
| with_auto_sharding=with_auto_sharding, |
| test_client=client, |
| num_streaming_keys=500)) |
| |
| with open(file_name_1) as f1, open(file_name_2) as f2: |
| out1 = json.load(f1)['colA_values'] |
| out2 = json.load(f2)['colA_values'] |
| out_all = out1 + out2 |
| out_all.sort() |
| self.assertEqual(out_all, ['value1', 'value3', 'value5']) |
| self.assertEqual(len(out1), 2) |
| self.assertEqual(len(out2), 1) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase): |
| BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_' |
| |
| def setUp(self): |
| self.test_pipeline = TestPipeline(is_integration_test=True) |
| self.runner_name = type(self.test_pipeline.runner).__name__ |
| self.project = self.test_pipeline.get_option('project') |
| |
| self.dataset_id = '%s%d%s' % ( |
| self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3)) |
| self.bigquery_client = bigquery_tools.BigQueryWrapper() |
| self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) |
| self.output_table = "%s.output_table" % (self.dataset_id) |
| _LOGGER.info( |
| "Created dataset %s in project %s", self.dataset_id, self.project) |
| |
| @pytest.mark.it_postcommit |
| def test_value_provider_transform(self): |
| output_table_1 = '%s%s' % (self.output_table, 1) |
| output_table_2 = '%s%s' % (self.output_table, 2) |
| schema = { |
| 'fields': [{ |
| 'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| } |
| |
| additional_bq_parameters = { |
| 'timePartitioning': { |
| 'type': 'DAY' |
| }, |
| 'clustering': { |
| 'fields': ['language'] |
| } |
| } |
| |
| table_ref = bigquery_tools.parse_table_reference(output_table_1) |
| table_ref2 = bigquery_tools.parse_table_reference(output_table_2) |
| |
| pipeline_verifiers = [ |
| BigQueryTableMatcher( |
| project=self.project, |
| dataset=table_ref.datasetId, |
| table=table_ref.tableId, |
| expected_properties=additional_bq_parameters), |
| BigQueryTableMatcher( |
| project=self.project, |
| dataset=table_ref2.datasetId, |
| table=table_ref2.tableId, |
| expected_properties=additional_bq_parameters), |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, language FROM %s" % output_table_1, |
| data=[(d['name'], d['language']) for d in _ELEMENTS |
| if 'language' in d]), |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, language FROM %s" % output_table_2, |
| data=[(d['name'], d['language']) for d in _ELEMENTS |
| if 'language' in d]) |
| ] |
| |
| args = self.test_pipeline.get_full_options_as_args( |
| on_success_matcher=hc.all_of(*pipeline_verifiers)) |
| |
| with beam.Pipeline(argv=args) as p: |
| input = p | beam.Create([row for row in _ELEMENTS if 'language' in row]) |
| |
| _ = ( |
| input |
| | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery( |
| table=value_provider.StaticValueProvider( |
| str, '%s:%s' % (self.project, output_table_1)), |
| schema=value_provider.StaticValueProvider(dict, schema), |
| additional_bq_parameters=additional_bq_parameters, |
| method='STREAMING_INSERTS')) |
| _ = ( |
| input |
| | "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery( |
| table=value_provider.StaticValueProvider( |
| str, '%s:%s' % (self.project, output_table_2)), |
| schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT, |
| additional_bq_parameters=lambda _: additional_bq_parameters, |
| method='FILE_LOADS')) |
| |
| @pytest.mark.it_postcommit |
| def test_multiple_destinations_transform(self): |
| streaming = self.test_pipeline.options.view_as(StandardOptions).streaming |
| if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner): |
| self.skipTest("TestStream is not supported on TestDataflowRunner") |
| |
| output_table_1 = '%s%s' % (self.output_table, 1) |
| output_table_2 = '%s%s' % (self.output_table, 2) |
| |
| full_output_table_1 = '%s:%s' % (self.project, output_table_1) |
| full_output_table_2 = '%s:%s' % (self.project, output_table_2) |
| |
| schema1 = { |
| 'fields': [{ |
| 'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| } |
| schema2 = { |
| 'fields': [{ |
| 'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE' |
| }, { |
| 'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE' |
| }] |
| } |
| |
| bad_record = {'language': 1, 'manguage': 2} |
| |
| if streaming: |
| pipeline_verifiers = [ |
| PipelineStateMatcher(PipelineState.RUNNING), |
| BigqueryFullResultStreamingMatcher( |
| project=self.project, |
| query="SELECT name, language FROM %s" % output_table_1, |
| data=[(d['name'], d['language']) for d in _ELEMENTS |
| if 'language' in d]), |
| BigqueryFullResultStreamingMatcher( |
| project=self.project, |
| query="SELECT name, foundation FROM %s" % output_table_2, |
| data=[(d['name'], d['foundation']) for d in _ELEMENTS |
| if 'foundation' in d]) |
| ] |
| else: |
| pipeline_verifiers = [ |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, language FROM %s" % output_table_1, |
| data=[(d['name'], d['language']) for d in _ELEMENTS |
| if 'language' in d]), |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, foundation FROM %s" % output_table_2, |
| data=[(d['name'], d['foundation']) for d in _ELEMENTS |
| if 'foundation' in d]) |
| ] |
| |
| args = self.test_pipeline.get_full_options_as_args( |
| on_success_matcher=hc.all_of(*pipeline_verifiers)) |
| |
| with beam.Pipeline(argv=args) as p: |
| if streaming: |
| _SIZE = len(_ELEMENTS) |
| test_stream = ( |
| TestStream().advance_watermark_to(0).add_elements( |
| _ELEMENTS[:_SIZE // 2]).advance_watermark_to(100).add_elements( |
| _ELEMENTS[_SIZE // 2:]).advance_watermark_to_infinity()) |
| input = p | test_stream |
| else: |
| input = p | beam.Create(_ELEMENTS) |
| |
| schema_table_pcv = beam.pvalue.AsDict( |
| p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1), |
| (full_output_table_2, schema2)])) |
| |
| table_record_pcv = beam.pvalue.AsDict( |
| p | "MakeTables" >> beam.Create([('table1', full_output_table_1), |
| ('table2', full_output_table_2)])) |
| |
| input2 = p | "Broken record" >> beam.Create([bad_record]) |
| |
| input = (input, input2) | beam.Flatten() |
| |
| r = ( |
| input |
| | "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery( |
| table=lambda x, tables: |
| (tables['table1'] if 'language' in x else tables['table2']), |
| table_side_inputs=(table_record_pcv, ), |
| schema=lambda dest, table_map: table_map.get(dest, None), |
| schema_side_inputs=(schema_table_pcv, ), |
| insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR, |
| method='STREAMING_INSERTS')) |
| |
| assert_that( |
| r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS] |
| | beam.Map(lambda elm: (elm[0], elm[1])), |
| equal_to([(full_output_table_1, bad_record)])) |
| |
| assert_that( |
| r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS], |
| equal_to([(full_output_table_1, bad_record)]), |
| label='FailedRowsMatch') |
| |
| def tearDown(self): |
| request = bigquery.BigqueryDatasetsDeleteRequest( |
| projectId=self.project, datasetId=self.dataset_id, deleteContents=True) |
| try: |
| _LOGGER.info( |
| "Deleting dataset %s in project %s", self.dataset_id, self.project) |
| self.bigquery_client.client.datasets.Delete(request) |
| except HttpError: |
| _LOGGER.debug( |
| 'Failed to clean up dataset %s in project %s', |
| self.dataset_id, |
| self.project) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class PubSubBigQueryIT(unittest.TestCase): |
| |
| INPUT_TOPIC = 'psit_topic_output' |
| INPUT_SUB = 'psit_subscription_input' |
| |
| BIG_QUERY_DATASET_ID = 'python_pubsub_bq_' |
| SCHEMA = { |
| 'fields': [{ |
| 'name': 'number', 'type': 'INTEGER', 'mode': 'NULLABLE' |
| }] |
| } |
| |
| _SIZE = 4 |
| |
| WAIT_UNTIL_FINISH_DURATION = 15 * 60 * 1000 |
| |
| def setUp(self): |
| # Set up PubSub |
| self.test_pipeline = TestPipeline(is_integration_test=True) |
| self.runner_name = type(self.test_pipeline.runner).__name__ |
| self.project = self.test_pipeline.get_option('project') |
| self.uuid = str(uuid.uuid4()) |
| from google.cloud import pubsub |
| self.pub_client = pubsub.PublisherClient() |
| self.input_topic = self.pub_client.create_topic( |
| name=self.pub_client.topic_path( |
| self.project, self.INPUT_TOPIC + self.uuid)) |
| self.sub_client = pubsub.SubscriberClient() |
| self.input_sub = self.sub_client.create_subscription( |
| name=self.sub_client.subscription_path( |
| self.project, self.INPUT_SUB + self.uuid), |
| topic=self.input_topic.name) |
| |
| # Set up BQ |
| self.dataset_ref = utils.create_bq_dataset( |
| self.project, self.BIG_QUERY_DATASET_ID) |
| self.output_table = "%s.output_table" % (self.dataset_ref.dataset_id) |
| |
| def tearDown(self): |
| # Tear down PubSub |
| test_utils.cleanup_topics(self.pub_client, [self.input_topic]) |
| test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub]) |
| # Tear down BigQuery |
| utils.delete_bq_dataset(self.project, self.dataset_ref) |
| |
| def _run_pubsub_bq_pipeline(self, method, triggering_frequency=None): |
| l = [i for i in range(self._SIZE)] |
| |
| matchers = [ |
| PipelineStateMatcher(PipelineState.RUNNING), |
| BigqueryFullResultStreamingMatcher( |
| project=self.project, |
| query="SELECT number FROM %s" % self.output_table, |
| data=[(i, ) for i in l]) |
| ] |
| |
| args = self.test_pipeline.get_full_options_as_args( |
| on_success_matcher=hc.all_of(*matchers), |
| wait_until_finish_duration=self.WAIT_UNTIL_FINISH_DURATION, |
| streaming=True, |
| allow_unsafe_triggers=True) |
| |
| def add_schema_info(element): |
| yield {'number': element} |
| |
| messages = [str(i).encode('utf-8') for i in l] |
| for message in messages: |
| self.pub_client.publish(self.input_topic.name, message) |
| |
| with beam.Pipeline(argv=args) as p: |
| mesages = ( |
| p |
| | ReadFromPubSub(subscription=self.input_sub.name) |
| | beam.ParDo(add_schema_info)) |
| _ = mesages | WriteToBigQuery( |
| self.output_table, |
| schema=self.SCHEMA, |
| method=method, |
| triggering_frequency=triggering_frequency) |
| |
| @pytest.mark.it_postcommit |
| def test_streaming_inserts(self): |
| self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.STREAMING_INSERTS) |
| |
| @pytest.mark.it_postcommit |
| def test_file_loads(self): |
| self._run_pubsub_bq_pipeline( |
| WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class BigQueryFileLoadsIntegrationTests(unittest.TestCase): |
| BIG_QUERY_DATASET_ID = 'python_bq_file_loads_' |
| |
| def setUp(self): |
| self.test_pipeline = TestPipeline(is_integration_test=True) |
| self.runner_name = type(self.test_pipeline.runner).__name__ |
| self.project = self.test_pipeline.get_option('project') |
| |
| self.dataset_id = '%s%d%s' % ( |
| self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3)) |
| self.bigquery_client = bigquery_tools.BigQueryWrapper() |
| self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) |
| self.output_table = '%s.output_table' % (self.dataset_id) |
| self.table_ref = bigquery_tools.parse_table_reference(self.output_table) |
| _LOGGER.info( |
| 'Created dataset %s in project %s', self.dataset_id, self.project) |
| |
| @pytest.mark.it_postcommit |
| def test_avro_file_load(self): |
| # Construct elements such that they can be written via Avro but not via |
| # JSON. See BEAM-8841. |
| from apache_beam.io.gcp import bigquery_file_loads |
| old_max_files = bigquery_file_loads._MAXIMUM_SOURCE_URIS |
| old_max_file_size = bigquery_file_loads._DEFAULT_MAX_FILE_SIZE |
| bigquery_file_loads._MAXIMUM_SOURCE_URIS = 1 |
| bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = 100 |
| elements = [ |
| { |
| 'name': 'Negative infinity', |
| 'value': -float('inf'), |
| 'timestamp': datetime.datetime(1970, 1, 1, tzinfo=pytz.utc), |
| }, |
| { |
| 'name': 'Not a number', |
| 'value': float('nan'), |
| 'timestamp': datetime.datetime(2930, 12, 9, tzinfo=pytz.utc), |
| }, |
| ] |
| |
| schema = beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema( |
| bigquery.TableSchema( |
| fields=[ |
| bigquery.TableFieldSchema( |
| name='name', type='STRING', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='value', type='FLOAT', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='timestamp', type='TIMESTAMP', mode='REQUIRED'), |
| ])) |
| |
| pipeline_verifiers = [ |
| # Some gymnastics here to avoid comparing NaN since NaN is not equal to |
| # anything, including itself. |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, value, timestamp FROM {} WHERE value<0".format( |
| self.output_table), |
| data=[(d['name'], d['value'], d['timestamp']) |
| for d in elements[:1]], |
| ), |
| BigqueryFullResultMatcher( |
| project=self.project, |
| query="SELECT name, timestamp FROM {}".format(self.output_table), |
| data=[(d['name'], d['timestamp']) for d in elements], |
| ), |
| ] |
| |
| args = self.test_pipeline.get_full_options_as_args( |
| on_success_matcher=hc.all_of(*pipeline_verifiers), |
| ) |
| |
| with beam.Pipeline(argv=args) as p: |
| input = p | 'CreateInput' >> beam.Create(elements) |
| schema_pc = p | 'CreateSchema' >> beam.Create([schema]) |
| |
| _ = ( |
| input |
| | 'WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery( |
| table='%s:%s' % (self.project, self.output_table), |
| schema=lambda _, schema: schema, |
| schema_side_inputs=(beam.pvalue.AsSingleton(schema_pc), ), |
| method='FILE_LOADS', |
| temp_file_format=bigquery_tools.FileFormat.AVRO, |
| )) |
| bigquery_file_loads._MAXIMUM_SOURCE_URIS = old_max_files |
| bigquery_file_loads._DEFAULT_MAX_FILE_SIZE = old_max_file_size |
| |
| def tearDown(self): |
| request = bigquery.BigqueryDatasetsDeleteRequest( |
| projectId=self.project, datasetId=self.dataset_id, deleteContents=True) |
| try: |
| _LOGGER.info( |
| "Deleting dataset %s in project %s", self.dataset_id, self.project) |
| self.bigquery_client.client.datasets.Delete(request) |
| except HttpError: |
| _LOGGER.debug( |
| 'Failed to clean up dataset %s in project %s', |
| self.dataset_id, |
| self.project) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |