| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| from __future__ import absolute_import |
| |
| import datetime |
| import decimal |
| import json |
| import logging |
| import re |
| import time |
| import unittest |
| |
| import mock |
| from future.utils import iteritems |
| |
| import apache_beam as beam |
| from apache_beam.internal.gcp.json_value import to_json_value |
| from apache_beam.io.gcp.bigquery import TableRowJsonCoder |
| from apache_beam.io.gcp.bigquery_test import HttpError |
| from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR |
| from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder |
| from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json |
| from apache_beam.io.gcp.internal.clients import bigquery |
| from apache_beam.options.pipeline_options import PipelineOptions |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestTableSchemaParser(unittest.TestCase): |
| def test_parse_table_schema_from_json(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE', description='s description') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='REQUIRED', description='n description') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='REQUIRED', description='r description', |
| fields=[string_field, number_field]) |
| expected_schema = bigquery.TableSchema(fields=[record_field]) |
| json_str = json.dumps({'fields': [ |
| {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', |
| 'description': 'r description', 'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', |
| 'description': 's description'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', |
| 'description': 'n description'}]}]}) |
| self.assertEqual(parse_table_schema_from_json(json_str), |
| expected_schema) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryWrapper(unittest.TestCase): |
| |
| def test_delete_non_existing_dataset(self): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| wrapper._delete_dataset('', '') |
| self.assertTrue(client.datasets.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_dataset_retries_fail(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = ValueError("Cannot delete") |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| with self.assertRaises(ValueError): |
| wrapper._delete_dataset('', '') |
| self.assertEqual( |
| beam.io.gcp.bigquery_tools.MAX_RETRIES + 1, |
| client.datasets.Delete.call_count) |
| self.assertTrue(client.datasets.Delete.called) |
| |
| def test_delete_non_existing_table(self): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_table_retries_fail(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = ValueError("Cannot delete") |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| with self.assertRaises(ValueError): |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = [ |
| HttpError(response={'status': '408'}, url='', content=''), |
| bigquery.BigqueryDatasetsDeleteResponse() |
| ] |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| wrapper._delete_dataset('', '') |
| self.assertTrue(client.datasets.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_table_retries_for_timeouts(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = [ |
| HttpError(response={'status': '408'}, url='', content=''), |
| bigquery.BigqueryTablesDeleteResponse() |
| ] |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_temporary_dataset_is_unique(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Get.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| with self.assertRaises(RuntimeError): |
| wrapper.create_temporary_dataset('project_id', 'location') |
| self.assertTrue(client.datasets.Get.called) |
| |
| def test_get_or_create_dataset_created(self): |
| client = mock.Mock() |
| client.datasets.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| client.datasets.Insert.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') |
| self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') |
| |
| def test_get_or_create_dataset_fetched(self): |
| client = mock.Mock() |
| client.datasets.Get.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') |
| self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') |
| |
| def test_get_or_create_table(self): |
| client = mock.Mock() |
| client.tables.Insert.return_value = 'table_id' |
| client.tables.Get.side_effect = [None, 'table_id'] |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| new_table = wrapper.get_or_create_table( |
| 'project_id', 'dataset_id', 'table_id', |
| bigquery.TableSchema(fields=[ |
| bigquery.TableFieldSchema(name='b', type='BOOLEAN', |
| mode='REQUIRED')]), False, False) |
| self.assertEqual(new_table, 'table_id') |
| |
| def test_get_or_create_table_race_condition(self): |
| client = mock.Mock() |
| client.tables.Insert.side_effect = HttpError( |
| response={'status': '409'}, url='', content='') |
| client.tables.Get.side_effect = [None, 'table_id'] |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| new_table = wrapper.get_or_create_table( |
| 'project_id', 'dataset_id', 'table_id', |
| bigquery.TableSchema(fields=[ |
| bigquery.TableFieldSchema(name='b', type='BOOLEAN', |
| mode='REQUIRED')]), False, False) |
| self.assertEqual(new_table, 'table_id') |
| |
| def test_get_or_create_table_intermittent_exception(self): |
| client = mock.Mock() |
| client.tables.Insert.side_effect = [ |
| HttpError(response={'status': '408'}, url='', content=''), 'table_id' |
| ] |
| client.tables.Get.side_effect = [None, 'table_id'] |
| wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| new_table = wrapper.get_or_create_table( |
| 'project_id', 'dataset_id', 'table_id', |
| bigquery.TableSchema(fields=[ |
| bigquery.TableFieldSchema( |
| name='b', type='BOOLEAN', mode='REQUIRED') |
| ]), False, False) |
| self.assertEqual(new_table, 'table_id') |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryReader(unittest.TestCase): |
| |
| def get_test_rows(self): |
| now = time.time() |
| dt = datetime.datetime.utcfromtimestamp(float(now)) |
| ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC') |
| expected_rows = [ |
| { |
| 'i': 1, |
| 's': 'abc', |
| 'f': 2.3, |
| 'b': True, |
| 't': ts, |
| 'dt': '2016-10-31', |
| 'ts': '22:39:12.627498', |
| 'dt_ts': '2008-12-25T07:30:00', |
| 'r': {'s2': 'b'}, |
| 'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}] |
| }, |
| { |
| 'i': 10, |
| 's': 'xyz', |
| 'f': -3.14, |
| 'b': False, |
| 'rpr': [], |
| 't': None, |
| 'dt': None, |
| 'ts': None, |
| 'dt_ts': None, |
| 'r': None, |
| }] |
| |
| nested_schema = [ |
| bigquery.TableFieldSchema( |
| name='s2', type='STRING', mode='NULLABLE')] |
| nested_schema_2 = [ |
| bigquery.TableFieldSchema( |
| name='s3', type='STRING', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='rpr2', type='RECORD', mode='REPEATED', fields=[ |
| bigquery.TableFieldSchema( |
| name='rs', type='STRING', mode='REPEATED'), |
| bigquery.TableFieldSchema( |
| name='s4', type='STRING', mode='NULLABLE')])] |
| |
| schema = bigquery.TableSchema( |
| fields=[ |
| bigquery.TableFieldSchema( |
| name='b', type='BOOLEAN', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='f', type='FLOAT', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='i', type='INTEGER', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='t', type='TIMESTAMP', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='dt', type='DATE', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='ts', type='TIME', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='dt_ts', type='DATETIME', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', |
| fields=nested_schema), |
| bigquery.TableFieldSchema( |
| name='rpr', type='RECORD', mode='REPEATED', |
| fields=nested_schema_2)]) |
| |
| table_rows = [ |
| bigquery.TableRow(f=[ |
| bigquery.TableCell(v=to_json_value('true')), |
| bigquery.TableCell(v=to_json_value(str(2.3))), |
| bigquery.TableCell(v=to_json_value(str(1))), |
| bigquery.TableCell(v=to_json_value('abc')), |
| # For timestamps cannot use str() because it will truncate the |
| # number representing the timestamp. |
| bigquery.TableCell(v=to_json_value('%f' % now)), |
| bigquery.TableCell(v=to_json_value('2016-10-31')), |
| bigquery.TableCell(v=to_json_value('22:39:12.627498')), |
| bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')), |
| # For record we cannot use dict because it doesn't create nested |
| # schemas correctly so we have to use this f,v based format |
| bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})), |
| bigquery.TableCell(v=to_json_value([ |
| {'v': {'f': [{'v': 'c'}, {'v': [ |
| {'v': {'f': [{'v': [{'v': 'd'}, {'v': 'e'}]}, {'v': None}]}} |
| ]}]}}])) |
| ]), |
| bigquery.TableRow(f=[ |
| bigquery.TableCell(v=to_json_value('false')), |
| bigquery.TableCell(v=to_json_value(str(-3.14))), |
| bigquery.TableCell(v=to_json_value(str(10))), |
| bigquery.TableCell(v=to_json_value('xyz')), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| # REPEATED field without any values. |
| bigquery.TableCell(v=None)])] |
| return table_rows, schema, expected_rows |
| |
| def test_read_from_table(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| |
| def test_read_from_query(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource(query='query').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertTrue(reader.use_legacy_sql) |
| self.assertTrue(reader.flatten_results) |
| |
| def test_read_from_query_sql_format(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource( |
| query='query', use_standard_sql=True).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertFalse(reader.use_legacy_sql) |
| self.assertTrue(reader.flatten_results) |
| |
| def test_read_from_query_unflatten_records(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource( |
| query='query', flatten_results=False).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertTrue(reader.use_legacy_sql) |
| self.assertFalse(reader.flatten_results) |
| |
| def test_using_both_query_and_table_fails(self): |
| with self.assertRaisesRegexp( |
| ValueError, |
| r'Both a BigQuery table and a query were specified\. Please specify ' |
| r'only one of these'): |
| beam.io.BigQuerySource(table='dataset.table', query='query') |
| |
| def test_using_neither_query_nor_table_fails(self): |
| with self.assertRaisesRegexp( |
| ValueError, r'A BigQuery table or a query must be specified'): |
| beam.io.BigQuerySource() |
| |
| def test_read_from_table_as_tablerows(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, _ = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| # We set the coder to TableRowJsonCoder, which is a signal that |
| # the caller wants to see the rows as TableRows. |
| with beam.io.BigQuerySource( |
| 'dataset.table', coder=TableRowJsonCoder).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, table_rows) |
| self.assertEqual(schema, reader.schema) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_read_from_table_and_job_complete_retry(self, patched_time_sleep): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| # Return jobComplete=False on first call to trigger the code path where |
| # query needs to handle waiting a bit. |
| client.jobs.GetQueryResults.side_effect = [ |
| bigquery.GetQueryResultsResponse( |
| jobComplete=False), |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema)] |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| |
| def test_read_from_table_and_multiple_pages(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| # Return a pageToken on first call to trigger the code path where |
| # query needs to handle multiple pages of results. |
| client.jobs.GetQueryResults.side_effect = [ |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema, |
| pageToken='token'), |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema)] |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| # We return expected rows for each of the two pages of results so we |
| # adjust our expectation below accordingly. |
| self.assertEqual(actual_rows, expected_rows * 2) |
| |
| def test_table_schema_without_project(self): |
| # Reader should pick executing project by default. |
| source = beam.io.BigQuerySource(table='mydataset.mytable') |
| options = PipelineOptions(flags=['--project', 'myproject']) |
| source.pipeline_options = options |
| reader = source.reader() |
| self.assertEqual('SELECT * FROM [myproject:mydataset.mytable];', |
| reader.query) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryWriter(unittest.TestCase): |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_no_table_and_create_never(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table not found but create ' |
| r'disposition is CREATE_NEVER'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| create_disposition=create_disposition).writer(client): |
| pass |
| |
| def test_no_table_and_create_if_needed(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| client.tables.Insert.return_value = table |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| schema='somefield:INTEGER', |
| create_disposition=create_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tables.Insert.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_no_table_and_create_if_needed_and_no_schema( |
| self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table requires a schema\. None ' |
| r'can be inferred because the table does not exist'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| create_disposition=create_disposition).writer(client): |
| pass |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_table_not_empty_and_write_disposition_empty( |
| self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tabledata.List.return_value = bigquery.TableDataList(totalRows=1) |
| write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table is not empty but write ' |
| r'disposition is WRITE_EMPTY'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| |
| def test_table_empty_and_write_disposition_empty(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tabledata.List.return_value = bigquery.TableDataList(totalRows=0) |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tabledata.List.called) |
| self.assertFalse(client.tables.Delete.called) |
| self.assertFalse(client.tables.Insert.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_table_with_write_disposition_truncate(self, _patched_sleep): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tables.Delete.called) |
| self.assertTrue(client.tables.Insert.called) |
| |
| def test_table_with_write_disposition_append(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertFalse(client.tables.Delete.called) |
| self.assertFalse(client.tables.Insert.called) |
| |
| def test_rows_are_written(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| insert_response = mock.Mock() |
| insert_response.insertErrors = [] |
| client.tabledata.InsertAll.return_value = insert_response |
| |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client) as writer: |
| writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14}) |
| |
| sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} |
| expected_rows = [] |
| json_object = bigquery.JsonObject() |
| for k, v in iteritems(sample_row): |
| json_object.additionalProperties.append( |
| bigquery.JsonObject.AdditionalProperty( |
| key=k, value=to_json_value(v))) |
| expected_rows.append( |
| bigquery.TableDataInsertAllRequest.RowsValueListEntry( |
| insertId='_1', # First row ID generated with prefix '' |
| json=json_object)) |
| client.tabledata.InsertAll.assert_called_with( |
| bigquery.BigqueryTabledataInsertAllRequest( |
| projectId='project', datasetId='dataset', tableId='table', |
| tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( |
| rows=expected_rows, skipInvalidRows=False,))) |
| |
| def test_table_schema_without_project(self): |
| # Writer should pick executing project by default. |
| sink = beam.io.BigQuerySink(table='mydataset.mytable') |
| options = PipelineOptions(flags=['--project', 'myproject']) |
| sink.pipeline_options = options |
| writer = sink.writer() |
| self.assertEqual('myproject', writer.project_id) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestRowAsDictJsonCoder(unittest.TestCase): |
| |
| def test_row_as_dict(self): |
| coder = RowAsDictJsonCoder() |
| test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True} |
| self.assertEqual(test_value, coder.decode(coder.encode(test_value))) |
| |
| def test_decimal_in_row_as_dict(self): |
| decimal_value = decimal.Decimal('123456789.987654321') |
| coder = RowAsDictJsonCoder() |
| # Bigquery IO uses decimals to represent NUMERIC types. |
| # To export to BQ, it's necessary to convert to strings, due to the |
| # lower precision of JSON numbers. This means that we can't recognize |
| # a NUMERIC when we decode from JSON, thus we match the string here. |
| test_value = {'f': 123.456, |
| 'b': True, |
| 'numerico': decimal_value} |
| output_value = {'f': 123.456, |
| 'b': True, |
| 'numerico': str(decimal_value)} |
| self.assertEqual(output_value, coder.decode(coder.encode(test_value))) |
| |
| def json_compliance_exception(self, value): |
| with self.assertRaisesRegexp(ValueError, re.escape(JSON_COMPLIANCE_ERROR)): |
| coder = RowAsDictJsonCoder() |
| test_value = {'s': value} |
| coder.decode(coder.encode(test_value)) |
| |
| def test_invalid_json_nan(self): |
| self.json_compliance_exception(float('nan')) |
| |
| def test_invalid_json_inf(self): |
| self.json_compliance_exception(float('inf')) |
| |
| def test_invalid_json_neg_inf(self): |
| self.json_compliance_exception(float('-inf')) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |