| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit tests for BigQuery sources and sinks.""" |
| from __future__ import absolute_import |
| |
| import datetime |
| import decimal |
| import json |
| import logging |
| import re |
| import time |
| import unittest |
| |
| import hamcrest as hc |
| import mock |
| from future.utils import iteritems |
| |
| import apache_beam as beam |
| from apache_beam.internal.gcp.json_value import to_json_value |
| from apache_beam.io.gcp.bigquery import JSON_COMPLIANCE_ERROR |
| from apache_beam.io.gcp.bigquery import RowAsDictJsonCoder |
| from apache_beam.io.gcp.bigquery import TableRowJsonCoder |
| from apache_beam.io.gcp.bigquery import parse_table_schema_from_json |
| from apache_beam.io.gcp.internal.clients import bigquery |
| from apache_beam.options.pipeline_options import PipelineOptions |
| from apache_beam.transforms.display import DisplayData |
| from apache_beam.transforms.display_test import DisplayDataItemMatcher |
| |
| # Protect against environments where bigquery library is not available. |
| # pylint: disable=wrong-import-order, wrong-import-position |
| try: |
| from apitools.base.py.exceptions import HttpError |
| except ImportError: |
| HttpError = None |
| # pylint: enable=wrong-import-order, wrong-import-position |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestRowAsDictJsonCoder(unittest.TestCase): |
| |
| def test_row_as_dict(self): |
| coder = RowAsDictJsonCoder() |
| test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True} |
| self.assertEqual(test_value, coder.decode(coder.encode(test_value))) |
| |
| def test_decimal_in_row_as_dict(self): |
| decimal_value = decimal.Decimal('123456789.987654321') |
| coder = RowAsDictJsonCoder() |
| # Bigquery IO uses decimals to represent NUMERIC types. |
| # To export to BQ, it's necessary to convert to strings, due to the |
| # lower precision of JSON numbers. This means that we can't recognize |
| # a NUMERIC when we decode from JSON, thus we match the string here. |
| test_value = {'f': 123.456, |
| 'b': True, |
| 'numerico': decimal_value} |
| output_value = {'f': 123.456, |
| 'b': True, |
| 'numerico': str(decimal_value)} |
| self.assertEqual(output_value, coder.decode(coder.encode(test_value))) |
| |
| def json_compliance_exception(self, value): |
| with self.assertRaisesRegexp(ValueError, re.escape(JSON_COMPLIANCE_ERROR)): |
| coder = RowAsDictJsonCoder() |
| test_value = {'s': value} |
| coder.decode(coder.encode(test_value)) |
| |
| def test_invalid_json_nan(self): |
| self.json_compliance_exception(float('nan')) |
| |
| def test_invalid_json_inf(self): |
| self.json_compliance_exception(float('inf')) |
| |
| def test_invalid_json_neg_inf(self): |
| self.json_compliance_exception(float('-inf')) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestTableRowJsonCoder(unittest.TestCase): |
| |
| def test_row_as_table_row(self): |
| schema_definition = [ |
| ('s', 'STRING'), |
| ('i', 'INTEGER'), |
| ('f', 'FLOAT'), |
| ('b', 'BOOLEAN'), |
| ('n', 'NUMERIC'), |
| ('r', 'RECORD')] |
| data_definition = [ |
| 'abc', |
| 123, |
| 123.456, |
| True, |
| decimal.Decimal('987654321.987654321'), |
| {'a': 'b'}] |
| str_def = ('{"s": "abc", ' |
| '"i": 123, ' |
| '"f": 123.456, ' |
| '"b": true, ' |
| '"n": "987654321.987654321", ' |
| '"r": {"a": "b"}}') |
| schema = bigquery.TableSchema( |
| fields=[bigquery.TableFieldSchema(name=k, type=v) |
| for k, v in schema_definition]) |
| coder = TableRowJsonCoder(table_schema=schema) |
| |
| def value_or_decimal_to_json(val): |
| if isinstance(val, decimal.Decimal): |
| return to_json_value(str(val)) |
| else: |
| return to_json_value(val) |
| |
| test_row = bigquery.TableRow( |
| f=[bigquery.TableCell(v=value_or_decimal_to_json(e)) |
| for e in data_definition]) |
| |
| self.assertEqual(str_def, coder.encode(test_row)) |
| self.assertEqual(test_row, coder.decode(coder.encode(test_row))) |
| # A coder without schema can still decode. |
| self.assertEqual( |
| test_row, TableRowJsonCoder().decode(coder.encode(test_row))) |
| |
| def test_row_and_no_schema(self): |
| coder = TableRowJsonCoder() |
| test_row = bigquery.TableRow( |
| f=[bigquery.TableCell(v=to_json_value(e)) |
| for e in ['abc', 123, 123.456, True]]) |
| with self.assertRaisesRegexp(AttributeError, |
| r'^The TableRowJsonCoder requires'): |
| coder.encode(test_row) |
| |
| def json_compliance_exception(self, value): |
| with self.assertRaisesRegexp(ValueError, re.escape(JSON_COMPLIANCE_ERROR)): |
| schema_definition = [('f', 'FLOAT')] |
| schema = bigquery.TableSchema( |
| fields=[bigquery.TableFieldSchema(name=k, type=v) |
| for k, v in schema_definition]) |
| coder = TableRowJsonCoder(table_schema=schema) |
| test_row = bigquery.TableRow( |
| f=[bigquery.TableCell(v=to_json_value(value))]) |
| coder.encode(test_row) |
| |
| def test_invalid_json_nan(self): |
| self.json_compliance_exception(float('nan')) |
| |
| def test_invalid_json_inf(self): |
| self.json_compliance_exception(float('inf')) |
| |
| def test_invalid_json_neg_inf(self): |
| self.json_compliance_exception(float('-inf')) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestTableSchemaParser(unittest.TestCase): |
| def test_parse_table_schema_from_json(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE', description='s description') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='REQUIRED', description='n description') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='REQUIRED', description='r description', |
| fields=[string_field, number_field]) |
| expected_schema = bigquery.TableSchema(fields=[record_field]) |
| json_str = json.dumps({'fields': [ |
| {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', |
| 'description': 'r description', 'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', |
| 'description': 's description'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', |
| 'description': 'n description'}]}]}) |
| self.assertEqual(parse_table_schema_from_json(json_str), |
| expected_schema) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQuerySource(unittest.TestCase): |
| |
| def test_display_data_item_on_validate_true(self): |
| source = beam.io.BigQuerySource('dataset.table', validate=True) |
| |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', True), |
| DisplayDataItemMatcher('table', 'dataset.table')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_table_reference_display_data(self): |
| source = beam.io.BigQuerySource('dataset.table') |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', False), |
| DisplayDataItemMatcher('table', 'dataset.table')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| source = beam.io.BigQuerySource('project:dataset.table') |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', False), |
| DisplayDataItemMatcher('table', 'project:dataset.table')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| source = beam.io.BigQuerySource('xyz.com:project:dataset.table') |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', |
| False), |
| DisplayDataItemMatcher('table', |
| 'xyz.com:project:dataset.table')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_parse_table_reference(self): |
| source = beam.io.BigQuerySource('dataset.table') |
| self.assertEqual(source.table_reference.datasetId, 'dataset') |
| self.assertEqual(source.table_reference.tableId, 'table') |
| |
| source = beam.io.BigQuerySource('project:dataset.table') |
| self.assertEqual(source.table_reference.projectId, 'project') |
| self.assertEqual(source.table_reference.datasetId, 'dataset') |
| self.assertEqual(source.table_reference.tableId, 'table') |
| |
| source = beam.io.BigQuerySource('xyz.com:project:dataset.table') |
| self.assertEqual(source.table_reference.projectId, 'xyz.com:project') |
| self.assertEqual(source.table_reference.datasetId, 'dataset') |
| self.assertEqual(source.table_reference.tableId, 'table') |
| |
| source = beam.io.BigQuerySource(query='my_query') |
| self.assertEqual(source.query, 'my_query') |
| self.assertIsNone(source.table_reference) |
| self.assertTrue(source.use_legacy_sql) |
| |
| def test_query_only_display_data(self): |
| source = beam.io.BigQuerySource(query='my_query') |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', False), |
| DisplayDataItemMatcher('query', 'my_query')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_specify_query_sql_format(self): |
| source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True) |
| self.assertEqual(source.query, 'my_query') |
| self.assertFalse(source.use_legacy_sql) |
| |
| def test_specify_query_flattened_records(self): |
| source = beam.io.BigQuerySource(query='my_query', flatten_results=False) |
| self.assertFalse(source.flatten_results) |
| |
| def test_specify_query_unflattened_records(self): |
| source = beam.io.BigQuerySource(query='my_query', flatten_results=True) |
| self.assertTrue(source.flatten_results) |
| |
| def test_specify_query_without_table(self): |
| source = beam.io.BigQuerySource(query='my_query') |
| self.assertEqual(source.query, 'my_query') |
| self.assertIsNone(source.table_reference) |
| |
| def test_date_partitioned_table_name(self): |
| source = beam.io.BigQuerySource('dataset.table$20030102', validate=True) |
| dd = DisplayData.create_from(source) |
| expected_items = [ |
| DisplayDataItemMatcher('validation', True), |
| DisplayDataItemMatcher('table', 'dataset.table$20030102')] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQuerySink(unittest.TestCase): |
| |
| def test_table_spec_display_data(self): |
| sink = beam.io.BigQuerySink('dataset.table') |
| dd = DisplayData.create_from(sink) |
| expected_items = [ |
| DisplayDataItemMatcher('table', 'dataset.table'), |
| DisplayDataItemMatcher('validation', False)] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_parse_schema_descriptor(self): |
| sink = beam.io.BigQuerySink( |
| 'dataset.table', schema='s:STRING, n:INTEGER') |
| self.assertEqual(sink.table_reference.datasetId, 'dataset') |
| self.assertEqual(sink.table_reference.tableId, 'table') |
| result_schema = { |
| field.name: field.type for field in sink.table_schema.fields} |
| self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema) |
| |
| def test_project_table_display_data(self): |
| sinkq = beam.io.BigQuerySink('PROJECT:dataset.table') |
| dd = DisplayData.create_from(sinkq) |
| expected_items = [ |
| DisplayDataItemMatcher('table', 'PROJECT:dataset.table'), |
| DisplayDataItemMatcher('validation', False)] |
| hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) |
| |
| def test_simple_schema_as_json(self): |
| sink = beam.io.BigQuerySink( |
| 'PROJECT:dataset.table', schema='s:STRING, n:INTEGER') |
| self.assertEqual( |
| json.dumps({'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}), |
| sink.schema_as_json()) |
| |
| def test_nested_schema_as_json(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE', description='s description') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='REQUIRED', description='n description') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='REQUIRED', description='r description', |
| fields=[string_field, number_field]) |
| schema = bigquery.TableSchema(fields=[record_field]) |
| sink = beam.io.BigQuerySink('dataset.table', schema=schema) |
| self.assertEqual( |
| {'fields': [ |
| {'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', |
| 'description': 'r description', 'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', |
| 'description': 's description'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', |
| 'description': 'n description'}]}]}, |
| json.loads(sink.schema_as_json())) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryReader(unittest.TestCase): |
| |
| def get_test_rows(self): |
| now = time.time() |
| dt = datetime.datetime.utcfromtimestamp(float(now)) |
| ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC') |
| expected_rows = [ |
| { |
| 'i': 1, |
| 's': 'abc', |
| 'f': 2.3, |
| 'b': True, |
| 't': ts, |
| 'dt': '2016-10-31', |
| 'ts': '22:39:12.627498', |
| 'dt_ts': '2008-12-25T07:30:00', |
| 'r': {'s2': 'b'}, |
| 'rpr': [{'s3': 'c', 'rpr2': [{'rs': ['d', 'e'], 's4': None}]}] |
| }, |
| { |
| 'i': 10, |
| 's': 'xyz', |
| 'f': -3.14, |
| 'b': False, |
| 'rpr': [], |
| 't': None, |
| 'dt': None, |
| 'ts': None, |
| 'dt_ts': None, |
| 'r': None, |
| }] |
| |
| nested_schema = [ |
| bigquery.TableFieldSchema( |
| name='s2', type='STRING', mode='NULLABLE')] |
| nested_schema_2 = [ |
| bigquery.TableFieldSchema( |
| name='s3', type='STRING', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='rpr2', type='RECORD', mode='REPEATED', fields=[ |
| bigquery.TableFieldSchema( |
| name='rs', type='STRING', mode='REPEATED'), |
| bigquery.TableFieldSchema( |
| name='s4', type='STRING', mode='NULLABLE')])] |
| |
| schema = bigquery.TableSchema( |
| fields=[ |
| bigquery.TableFieldSchema( |
| name='b', type='BOOLEAN', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='f', type='FLOAT', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='i', type='INTEGER', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='REQUIRED'), |
| bigquery.TableFieldSchema( |
| name='t', type='TIMESTAMP', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='dt', type='DATE', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='ts', type='TIME', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='dt_ts', type='DATETIME', mode='NULLABLE'), |
| bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', |
| fields=nested_schema), |
| bigquery.TableFieldSchema( |
| name='rpr', type='RECORD', mode='REPEATED', |
| fields=nested_schema_2)]) |
| |
| table_rows = [ |
| bigquery.TableRow(f=[ |
| bigquery.TableCell(v=to_json_value('true')), |
| bigquery.TableCell(v=to_json_value(str(2.3))), |
| bigquery.TableCell(v=to_json_value(str(1))), |
| bigquery.TableCell(v=to_json_value('abc')), |
| # For timestamps cannot use str() because it will truncate the |
| # number representing the timestamp. |
| bigquery.TableCell(v=to_json_value('%f' % now)), |
| bigquery.TableCell(v=to_json_value('2016-10-31')), |
| bigquery.TableCell(v=to_json_value('22:39:12.627498')), |
| bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')), |
| # For record we cannot use dict because it doesn't create nested |
| # schemas correctly so we have to use this f,v based format |
| bigquery.TableCell(v=to_json_value({'f': [{'v': 'b'}]})), |
| bigquery.TableCell(v=to_json_value([{'v':{'f':[{'v': 'c'}, {'v':[ |
| {'v':{'f':[{'v':[{'v':'d'}, {'v':'e'}]}, {'v':None}]}}]}]}}])) |
| ]), |
| bigquery.TableRow(f=[ |
| bigquery.TableCell(v=to_json_value('false')), |
| bigquery.TableCell(v=to_json_value(str(-3.14))), |
| bigquery.TableCell(v=to_json_value(str(10))), |
| bigquery.TableCell(v=to_json_value('xyz')), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| bigquery.TableCell(v=None), |
| # REPEATED field without any values. |
| bigquery.TableCell(v=None)])] |
| return table_rows, schema, expected_rows |
| |
| def test_read_from_table(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| |
| def test_read_from_query(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource(query='query').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertTrue(reader.use_legacy_sql) |
| self.assertTrue(reader.flatten_results) |
| |
| def test_read_from_query_sql_format(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource( |
| query='query', use_standard_sql=True).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertFalse(reader.use_legacy_sql) |
| self.assertTrue(reader.flatten_results) |
| |
| def test_read_from_query_unflatten_records(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| with beam.io.BigQuerySource( |
| query='query', flatten_results=False).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| self.assertEqual(schema, reader.schema) |
| self.assertTrue(reader.use_legacy_sql) |
| self.assertFalse(reader.flatten_results) |
| |
| def test_using_both_query_and_table_fails(self): |
| with self.assertRaisesRegexp( |
| ValueError, |
| r'Both a BigQuery table and a query were specified\. Please specify ' |
| r'only one of these'): |
| beam.io.BigQuerySource(table='dataset.table', query='query') |
| |
| def test_using_neither_query_nor_table_fails(self): |
| with self.assertRaisesRegexp( |
| ValueError, r'A BigQuery table or a query must be specified'): |
| beam.io.BigQuerySource() |
| |
| def test_read_from_table_as_tablerows(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, _ = self.get_test_rows() |
| client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema) |
| actual_rows = [] |
| # We set the coder to TableRowJsonCoder, which is a signal that |
| # the caller wants to see the rows as TableRows. |
| with beam.io.BigQuerySource( |
| 'dataset.table', coder=TableRowJsonCoder).reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, table_rows) |
| self.assertEqual(schema, reader.schema) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_read_from_table_and_job_complete_retry(self, patched_time_sleep): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| # Return jobComplete=False on first call to trigger the code path where |
| # query needs to handle waiting a bit. |
| client.jobs.GetQueryResults.side_effect = [ |
| bigquery.GetQueryResultsResponse( |
| jobComplete=False), |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema)] |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| self.assertEqual(actual_rows, expected_rows) |
| |
| def test_read_from_table_and_multiple_pages(self): |
| client = mock.Mock() |
| client.jobs.Insert.return_value = bigquery.Job( |
| jobReference=bigquery.JobReference( |
| jobId='somejob')) |
| table_rows, schema, expected_rows = self.get_test_rows() |
| # Return a pageToken on first call to trigger the code path where |
| # query needs to handle multiple pages of results. |
| client.jobs.GetQueryResults.side_effect = [ |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema, |
| pageToken='token'), |
| bigquery.GetQueryResultsResponse( |
| jobComplete=True, rows=table_rows, schema=schema)] |
| actual_rows = [] |
| with beam.io.BigQuerySource('dataset.table').reader(client) as reader: |
| for row in reader: |
| actual_rows.append(row) |
| # We return expected rows for each of the two pages of results so we |
| # adjust our expectation below accordingly. |
| self.assertEqual(actual_rows, expected_rows * 2) |
| |
| def test_table_schema_without_project(self): |
| # Reader should pick executing project by default. |
| source = beam.io.BigQuerySource(table='mydataset.mytable') |
| options = PipelineOptions(flags=['--project', 'myproject']) |
| source.pipeline_options = options |
| reader = source.reader() |
| self.assertEquals('SELECT * FROM [myproject:mydataset.mytable];', |
| reader.query) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryWriter(unittest.TestCase): |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_no_table_and_create_never(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table not found but create ' |
| r'disposition is CREATE_NEVER'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| create_disposition=create_disposition).writer(client): |
| pass |
| |
| def test_no_table_and_create_if_needed(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| client.tables.Insert.return_value = table |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| schema='somefield:INTEGER', |
| create_disposition=create_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tables.Insert.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_no_table_and_create_if_needed_and_no_schema( |
| self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table requires a schema\. None ' |
| r'can be inferred because the table does not exist'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| create_disposition=create_disposition).writer(client): |
| pass |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_table_not_empty_and_write_disposition_empty( |
| self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tabledata.List.return_value = bigquery.TableDataList(totalRows=1) |
| write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY |
| with self.assertRaisesRegexp( |
| RuntimeError, r'Table project:dataset\.table is not empty but write ' |
| r'disposition is WRITE_EMPTY'): |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| |
| def test_table_empty_and_write_disposition_empty(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tabledata.List.return_value = bigquery.TableDataList(totalRows=0) |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tabledata.List.called) |
| self.assertFalse(client.tables.Delete.called) |
| self.assertFalse(client.tables.Insert.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_table_with_write_disposition_truncate(self, _patched_sleep): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tables.Delete.called) |
| self.assertTrue(client.tables.Insert.called) |
| |
| def test_table_with_write_disposition_append(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| client.tables.Insert.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client): |
| pass |
| self.assertTrue(client.tables.Get.called) |
| self.assertFalse(client.tables.Delete.called) |
| self.assertFalse(client.tables.Insert.called) |
| |
| def test_rows_are_written(self): |
| client = mock.Mock() |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project', datasetId='dataset', tableId='table'), |
| schema=bigquery.TableSchema()) |
| client.tables.Get.return_value = table |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| |
| insert_response = mock.Mock() |
| insert_response.insertErrors = [] |
| client.tabledata.InsertAll.return_value = insert_response |
| |
| with beam.io.BigQuerySink( |
| 'project:dataset.table', |
| write_disposition=write_disposition).writer(client) as writer: |
| writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14}) |
| |
| sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} |
| expected_rows = [] |
| json_object = bigquery.JsonObject() |
| for k, v in iteritems(sample_row): |
| json_object.additionalProperties.append( |
| bigquery.JsonObject.AdditionalProperty( |
| key=k, value=to_json_value(v))) |
| expected_rows.append( |
| bigquery.TableDataInsertAllRequest.RowsValueListEntry( |
| insertId='_1', # First row ID generated with prefix '' |
| json=json_object)) |
| client.tabledata.InsertAll.assert_called_with( |
| bigquery.BigqueryTabledataInsertAllRequest( |
| projectId='project', datasetId='dataset', tableId='table', |
| tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( |
| rows=expected_rows))) |
| |
| def test_table_schema_without_project(self): |
| # Writer should pick executing project by default. |
| sink = beam.io.BigQuerySink(table='mydataset.mytable') |
| options = PipelineOptions(flags=['--project', 'myproject']) |
| sink.pipeline_options = options |
| writer = sink.writer() |
| self.assertEquals('myproject', writer.project_id) |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class TestBigQueryWrapper(unittest.TestCase): |
| |
| def test_delete_non_existing_dataset(self): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| wrapper._delete_dataset('', '') |
| self.assertTrue(client.datasets.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_dataset_retries_fail(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = ValueError("Cannot delete") |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| with self.assertRaises(ValueError): |
| wrapper._delete_dataset('', '') |
| self.assertEqual( |
| beam.io.gcp.bigquery.MAX_RETRIES + 1, |
| client.datasets.Delete.call_count) |
| self.assertTrue(client.datasets.Delete.called) |
| |
| def test_delete_non_existing_table(self): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_table_retries_fail(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = ValueError("Cannot delete") |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| with self.assertRaises(ValueError): |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Delete.side_effect = [ |
| HttpError( |
| response={'status': '408'}, url='', content=''), |
| bigquery.BigqueryDatasetsDeleteResponse() |
| ] |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| wrapper._delete_dataset('', '') |
| self.assertTrue(client.datasets.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_delete_table_retries_for_timeouts(self, patched_time_sleep): |
| client = mock.Mock() |
| client.tables.Delete.side_effect = [ |
| HttpError( |
| response={'status': '408'}, url='', content=''), |
| bigquery.BigqueryTablesDeleteResponse() |
| ] |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| wrapper._delete_table('', '', '') |
| self.assertTrue(client.tables.Delete.called) |
| |
| @mock.patch('time.sleep', return_value=None) |
| def test_temporary_dataset_is_unique(self, patched_time_sleep): |
| client = mock.Mock() |
| client.datasets.Get.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| with self.assertRaises(RuntimeError): |
| wrapper.create_temporary_dataset('project_id') |
| self.assertTrue(client.datasets.Get.called) |
| |
| def test_get_or_create_dataset_created(self): |
| client = mock.Mock() |
| client.datasets.Get.side_effect = HttpError( |
| response={'status': '404'}, url='', content='') |
| client.datasets.Insert.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') |
| self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') |
| |
| def test_get_or_create_dataset_fetched(self): |
| client = mock.Mock() |
| client.datasets.Get.return_value = bigquery.Dataset( |
| datasetReference=bigquery.DatasetReference( |
| projectId='project_id', datasetId='dataset_id')) |
| wrapper = beam.io.gcp.bigquery.BigQueryWrapper(client) |
| new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id') |
| self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') |
| |
| |
| @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') |
| class WriteToBigQuery(unittest.TestCase): |
| |
| def test_dofn_client_start_bundle_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| self.assertTrue(client.tables.Get.called) |
| |
| def test_dofn_client_start_bundle_create_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = None |
| client.tables.Insert.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| self.assertTrue(client.tables.Get.called) |
| self.assertTrue(client.tables.Insert.called) |
| |
| def test_dofn_client_process_performs_batching(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| client.tabledata.InsertAll.return_value = \ |
| bigquery.TableDataInsertAllResponse(insertErrors=[]) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| fn.process({'month': 1}) |
| |
| self.assertTrue(client.tables.Get.called) |
| # InsertRows not called as batch size is not hit yet |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| def test_dofn_client_process_flush_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| client.tabledata.InsertAll.return_value = ( |
| bigquery.TableDataInsertAllResponse(insertErrors=[])) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| fn.process({'month': 1}) |
| fn.process({'month': 2}) |
| self.assertTrue(client.tables.Get.called) |
| # InsertRows called as batch size is hit |
| self.assertTrue(client.tabledata.InsertAll.called) |
| |
| def test_dofn_client_finish_bundle_flush_called(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| client.tabledata.InsertAll.return_value = \ |
| bigquery.TableDataInsertAllResponse(insertErrors=[]) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| fn.process({'month': 1}) |
| |
| self.assertTrue(client.tables.Get.called) |
| # InsertRows not called as batch size is not hit |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| fn.finish_bundle() |
| # InsertRows called in finish bundle |
| self.assertTrue(client.tabledata.InsertAll.called) |
| |
| def test_dofn_client_no_records(self): |
| client = mock.Mock() |
| client.tables.Get.return_value = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId='project_id', datasetId='dataset_id', tableId='table_id')) |
| client.tabledata.InsertAll.return_value = \ |
| bigquery.TableDataInsertAllResponse(insertErrors=[]) |
| create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER |
| write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND |
| schema = {'fields': [ |
| {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| |
| fn = beam.io.gcp.bigquery.BigQueryWriteFn( |
| table_id='table_id', |
| dataset_id='dataset_id', |
| project_id='project_id', |
| batch_size=2, |
| schema=schema, |
| create_disposition=create_disposition, |
| write_disposition=write_disposition, |
| client=client) |
| |
| fn.start_bundle() |
| self.assertTrue(client.tables.Get.called) |
| # InsertRows not called as batch size is not hit |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| fn.finish_bundle() |
| # InsertRows not called in finish bundle as no records |
| self.assertFalse(client.tabledata.InsertAll.called) |
| |
| def test_noop_schema_parsing(self): |
| expected_table_schema = None |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( |
| schema=None) |
| self.assertEqual(expected_table_schema, table_schema) |
| |
| def test_dict_schema_parsing(self): |
| schema = {'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}, |
| {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [ |
| {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]} |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema) |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| expected_table_schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| self.assertEqual(expected_table_schema, table_schema) |
| |
| def test_string_schema_parsing(self): |
| schema = 's:STRING, n:INTEGER' |
| expected_dict_schema = {'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_table_schema_parsing(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| expected_dict_schema = {'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}, |
| {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [ |
| {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]} |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_table_schema_parsing_end_to_end(self): |
| string_field = bigquery.TableFieldSchema( |
| name='s', type='STRING', mode='NULLABLE') |
| nested_field = bigquery.TableFieldSchema( |
| name='x', type='INTEGER', mode='NULLABLE') |
| number_field = bigquery.TableFieldSchema( |
| name='n', type='INTEGER', mode='NULLABLE') |
| record_field = bigquery.TableFieldSchema( |
| name='r', type='RECORD', mode='NULLABLE', fields=[nested_field]) |
| schema = bigquery.TableSchema( |
| fields=[string_field, number_field, record_field]) |
| table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(table_schema, schema) |
| |
| def test_none_schema_parsing(self): |
| schema = None |
| expected_dict_schema = None |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| def test_noop_dict_schema_parsing(self): |
| schema = {'fields': [ |
| {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'}, |
| {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]} |
| expected_dict_schema = schema |
| dict_schema = ( |
| beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema)) |
| self.assertEqual(expected_dict_schema, dict_schema) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |