| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit tests for BigQuery sources and sinks.""" |
| # pytype: skip-file |
| |
| import base64 |
| import datetime |
| import logging |
| import random |
| import time |
| import unittest |
| import uuid |
| from decimal import Decimal |
| from functools import wraps |
| |
| import pytest |
| |
| import apache_beam as beam |
| import apache_beam.io.gcp.bigquery |
| from apache_beam.io.gcp import bigquery_schema_tools |
| from apache_beam.io.gcp import bigquery_tools |
| from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper |
| from apache_beam.io.gcp.internal.clients import bigquery |
| from apache_beam.options.value_provider import StaticValueProvider |
| from apache_beam.runners.interactive import interactive_beam |
| from apache_beam.runners.interactive.interactive_runner import InteractiveRunner |
| from apache_beam.testing.test_pipeline import TestPipeline |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| |
| # Protect against environments where bigquery library is not available. |
| # pylint: disable=wrong-import-order, wrong-import-position |
| try: |
| from apitools.base.py.exceptions import HttpError |
| except ImportError: |
| HttpError = None |
| # pylint: enable=wrong-import-order, wrong-import-position |
| |
| _LOGGER = logging.getLogger(__name__) |
| |
| |
| def skip(runners): |
| if not isinstance(runners, list): |
| runners = [runners] |
| |
| def inner(fn): |
| @wraps(fn) |
| def wrapped(self): |
| if self.runner_name in runners: |
| self.skipTest( |
| 'This test doesn\'t work on these runners: {}'.format(runners)) |
| else: |
| return fn(self) |
| |
| return wrapped |
| |
| return inner |
| |
| |
| def datetime_to_utc(element): |
| for k, v in element.items(): |
| if isinstance(v, (datetime.time, datetime.date)): |
| element[k] = str(v) |
| if isinstance(v, datetime.datetime) and v.tzinfo: |
| # For datetime objects, we'll |
| offset = v.utcoffset() |
| utc_dt = (v - offset).strftime('%Y-%m-%d %H:%M:%S.%f UTC') |
| element[k] = utc_dt |
| return element |
| |
| |
| class BigQueryReadIntegrationTests(unittest.TestCase): |
| BIG_QUERY_DATASET_ID = 'python_read_table_' |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.test_pipeline = TestPipeline(is_integration_test=True) |
| cls.args = cls.test_pipeline.get_full_options_as_args() |
| cls.runner_name = type(cls.test_pipeline.runner).__name__ |
| cls.project = cls.test_pipeline.get_option('project') |
| |
| cls.bigquery_client = BigQueryWrapper() |
| cls.dataset_id = '%s%s%d' % ( |
| cls.BIG_QUERY_DATASET_ID, |
| str(int(time.time())), |
| random.randint(0, 10000)) |
| cls.bigquery_client.get_or_create_dataset(cls.project, cls.dataset_id) |
| _LOGGER.info( |
| "Created dataset %s in project %s", cls.dataset_id, cls.project) |
| |
| @classmethod |
| def tearDownClass(cls): |
| request = bigquery.BigqueryDatasetsDeleteRequest( |
| projectId=cls.project, datasetId=cls.dataset_id, deleteContents=True) |
| try: |
| _LOGGER.info( |
| "Deleting dataset %s in project %s", cls.dataset_id, cls.project) |
| cls.bigquery_client.client.datasets.Delete(request) |
| except HttpError: |
| _LOGGER.debug( |
| 'Failed to clean up dataset %s in project %s', |
| cls.dataset_id, |
| cls.project) |
| |
| |
| class ReadTests(BigQueryReadIntegrationTests): |
| TABLE_DATA = [{ |
| 'number': 1, 'str': 'abc' |
| }, { |
| 'number': 2, 'str': 'def' |
| }, { |
| 'number': 3, 'str': u'你好' |
| }, { |
| 'number': 4, 'str': u'привет' |
| }] |
| |
| @classmethod |
| def setUpClass(cls): |
| super(ReadTests, cls).setUpClass() |
| cls.table_name = 'python_read_table' |
| cls.create_table(cls.table_name) |
| |
| table_id = '{}.{}'.format(cls.dataset_id, cls.table_name) |
| cls.query = 'SELECT number, str FROM `%s`' % table_id |
| |
| @classmethod |
| def create_table(cls, table_name): |
| table_schema = bigquery.TableSchema() |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'number' |
| table_field.type = 'INTEGER' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'str' |
| table_field.type = 'STRING' |
| table_schema.fields.append(table_field) |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId=cls.project, datasetId=cls.dataset_id, |
| tableId=table_name), |
| schema=table_schema) |
| request = bigquery.BigqueryTablesInsertRequest( |
| projectId=cls.project, datasetId=cls.dataset_id, table=table) |
| cls.bigquery_client.client.tables.Insert(request) |
| cls.bigquery_client.insert_rows( |
| cls.project, cls.dataset_id, table_name, cls.TABLE_DATA) |
| |
| @skip(['PortableRunner', 'FlinkRunner']) |
| @pytest.mark.it_postcommit |
| def test_native_source(self): |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'read' >> beam.io.Read( |
| beam.io.BigQuerySource(query=self.query, use_standard_sql=True))) |
| assert_that(result, equal_to(self.TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source(self): |
| query = StaticValueProvider(str, self.query) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'read with value provider query' >> beam.io.ReadFromBigQuery( |
| query=query, use_standard_sql=True, project=self.project)) |
| assert_that(result, equal_to(self.TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_table_schema_retrieve(self): |
| the_table = bigquery_tools.BigQueryWrapper().get_table( |
| project_id="apache-beam-testing", |
| dataset_id="beam_bigquery_io_test", |
| table_id="dfsqltable_3c7d6fd5_16e0460dfd0") |
| table = the_table.schema |
| utype = bigquery_schema_tools.\ |
| generate_user_type_from_bq_schema(table) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( |
| gcs_location="gs://bqio_schema_test", |
| dataset="beam_bigquery_io_test", |
| table="dfsqltable_3c7d6fd5_16e0460dfd0", |
| project="apache-beam-testing", |
| output_type='BEAM_ROW')) |
| assert_that( |
| result, |
| equal_to([ |
| utype(id=3, name='customer1', type='test'), |
| utype(id=1, name='customer1', type='test'), |
| utype(id=2, name='customer2', type='test'), |
| utype(id=4, name='customer2', type='test') |
| ])) |
| |
| @pytest.mark.it_postcommit |
| def test_table_schema_retrieve_specifying_only_table(self): |
| the_table = bigquery_tools.BigQueryWrapper().get_table( |
| project_id="apache-beam-testing", |
| dataset_id="beam_bigquery_io_test", |
| table_id="dfsqltable_3c7d6fd5_16e0460dfd0") |
| table = the_table.schema |
| utype = bigquery_schema_tools.\ |
| generate_user_type_from_bq_schema(table) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( |
| gcs_location="gs://bqio_schema_test", |
| table="apache-beam-testing:" |
| "beam_bigquery_io_test." |
| "dfsqltable_3c7d6fd5_16e0460dfd0", |
| output_type='BEAM_ROW')) |
| assert_that( |
| result, |
| equal_to([ |
| utype(id=3, name='customer1', type='test'), |
| utype(id=1, name='customer1', type='test'), |
| utype(id=2, name='customer2', type='test'), |
| utype(id=4, name='customer2', type='test') |
| ])) |
| |
| @pytest.mark.it_postcommit |
| def test_table_schema_retrieve_with_direct_read(self): |
| the_table = bigquery_tools.BigQueryWrapper().get_table( |
| project_id="apache-beam-testing", |
| dataset_id="beam_bigquery_io_test", |
| table_id="dfsqltable_3c7d6fd5_16e0460dfd0") |
| table = the_table.schema |
| utype = bigquery_schema_tools.\ |
| generate_user_type_from_bq_schema(table) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | apache_beam.io.gcp.bigquery.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table="apache-beam-testing:" |
| "beam_bigquery_io_test." |
| "dfsqltable_3c7d6fd5_16e0460dfd0", |
| output_type='BEAM_ROW')) |
| assert_that( |
| result, |
| equal_to([ |
| utype(id=3, name='customer1', type='test'), |
| utype(id=1, name='customer1', type='test'), |
| utype(id=2, name='customer2', type='test'), |
| utype(id=4, name='customer2', type='test') |
| ])) |
| |
| |
| class ReadUsingStorageApiTests(BigQueryReadIntegrationTests): |
| TABLE_DATA = [{ |
| 'number': 1, |
| 'string': u'你好', |
| 'time': '12:44:31', |
| 'datetime': '2018-12-31 12:44:31', |
| 'rec': None |
| }, |
| { |
| 'number': 4, |
| 'string': u'привет', |
| 'time': '12:44:31', |
| 'datetime': '2018-12-31 12:44:31', |
| 'rec': { |
| 'rec_datetime': '2018-12-31 12:44:31', |
| 'rec_rec': { |
| 'rec_rec_datetime': '2018-12-31 12:44:31' |
| } |
| }, |
| }] |
| |
| @classmethod |
| def setUpClass(cls): |
| super(ReadUsingStorageApiTests, cls).setUpClass() |
| cls.table_name = 'python_read_table' |
| cls._create_table(cls.table_name) |
| |
| table_id = '{}.{}'.format(cls.dataset_id, cls.table_name) |
| cls.query = 'SELECT * FROM `%s`' % table_id |
| |
| # Materializing the newly created Table to ensure the Read API can stream. |
| cls.temp_table_reference = cls._execute_query(cls.project, cls.query) |
| |
| @classmethod |
| def tearDownClass(cls): |
| cls.bigquery_client.clean_up_temporary_dataset(cls.project) |
| super(ReadUsingStorageApiTests, cls).tearDownClass() |
| |
| @classmethod |
| def _create_table(cls, table_name): |
| table_schema = bigquery.TableSchema() |
| |
| number = bigquery.TableFieldSchema() |
| number.name = 'number' |
| number.type = 'INTEGER' |
| table_schema.fields.append(number) |
| |
| string = bigquery.TableFieldSchema() |
| string.name = 'string' |
| string.type = 'STRING' |
| table_schema.fields.append(string) |
| |
| time = bigquery.TableFieldSchema() |
| time.name = 'time' |
| time.type = 'TIME' |
| table_schema.fields.append(time) |
| |
| datetime = bigquery.TableFieldSchema() |
| datetime.name = 'datetime' |
| datetime.type = 'DATETIME' |
| table_schema.fields.append(datetime) |
| |
| rec = bigquery.TableFieldSchema() |
| rec.name = 'rec' |
| rec.type = 'RECORD' |
| rec_datetime = bigquery.TableFieldSchema() |
| rec_datetime.name = 'rec_datetime' |
| rec_datetime.type = 'DATETIME' |
| rec.fields.append(rec_datetime) |
| rec_rec = bigquery.TableFieldSchema() |
| rec_rec.name = 'rec_rec' |
| rec_rec.type = 'RECORD' |
| rec_rec_datetime = bigquery.TableFieldSchema() |
| rec_rec_datetime.name = 'rec_rec_datetime' |
| rec_rec_datetime.type = 'DATETIME' |
| rec_rec.fields.append(rec_rec_datetime) |
| rec.fields.append(rec_rec) |
| table_schema.fields.append(rec) |
| |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId=cls.project, datasetId=cls.dataset_id, |
| tableId=table_name), |
| schema=table_schema) |
| request = bigquery.BigqueryTablesInsertRequest( |
| projectId=cls.project, datasetId=cls.dataset_id, table=table) |
| cls.bigquery_client.client.tables.Insert(request) |
| cls.bigquery_client.insert_rows( |
| cls.project, cls.dataset_id, table_name, cls.TABLE_DATA) |
| |
| @classmethod |
| def _setup_temporary_dataset(cls, project, query): |
| location = cls.bigquery_client.get_query_location(project, query, False) |
| cls.bigquery_client.create_temporary_dataset(project, location) |
| |
| @classmethod |
| def _execute_query(cls, project, query): |
| query_job_name = bigquery_tools.generate_bq_job_name( |
| 'materializing_table_before_reading', |
| str(uuid.uuid4())[0:10], |
| bigquery_tools.BigQueryJobTypes.QUERY, |
| '%s_%s' % (int(time.time()), random.randint(0, 1000))) |
| cls._setup_temporary_dataset(cls.project, cls.query) |
| job = cls.bigquery_client._start_query_job( |
| project, |
| query, |
| use_legacy_sql=False, |
| flatten_results=False, |
| job_id=query_job_name, |
| priority=beam.io.BigQueryQueryPriority.BATCH) |
| job_ref = job.jobReference |
| cls.bigquery_client.wait_for_bq_job(job_ref, max_retries=0) |
| return cls.bigquery_client._get_temp_table(project) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source(self): |
| EXPECTED_TABLE_DATA = [ |
| { |
| 'number': 1, |
| 'string': u'你好', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': '2018-12-31T12:44:31', |
| 'rec': None, |
| }, |
| { |
| 'number': 4, |
| 'string': u'привет', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': '2018-12-31T12:44:31', |
| 'rec': { |
| 'rec_datetime': '2018-12-31T12:44:31', |
| 'rec_rec': { |
| 'rec_rec_datetime': '2018-12-31T12:44:31', |
| } |
| }, |
| } |
| ] |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table=self.temp_table_reference)) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_native_datetime(self): |
| EXPECTED_TABLE_DATA = [ |
| { |
| 'number': 1, |
| 'string': u'你好', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec': None, |
| }, |
| { |
| 'number': 4, |
| 'string': u'привет', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec': { |
| 'rec_datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec_rec': { |
| 'rec_rec_datetime': datetime.datetime( |
| 2018, 12, 31, 12, 44, 31) |
| } |
| }, |
| } |
| ] |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table=self.temp_table_reference, |
| use_native_datetime=True)) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_column_selection(self): |
| EXPECTED_TABLE_DATA = [{'number': 1}, {'number': 4}] |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table=self.temp_table_reference, |
| selected_fields=['number'])) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_row_restriction(self): |
| EXPECTED_TABLE_DATA = [{ |
| 'number': 1, |
| 'string': u'你好', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec': None |
| }] |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table=self.temp_table_reference, |
| row_restriction='number < 2', |
| use_native_datetime=True)) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_column_selection_and_row_restriction(self): |
| EXPECTED_TABLE_DATA = [{'string': u'привет'}] |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| table=self.temp_table_reference, |
| row_restriction='number > 2', |
| selected_fields=['string'])) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_very_selective_filters(self): |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| project=self.temp_table_reference.projectId, |
| dataset=self.temp_table_reference.datasetId, |
| table=self.temp_table_reference.tableId, |
| row_restriction='number > 4', |
| selected_fields=['string'])) |
| assert_that(result, equal_to([])) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_query(self): |
| EXPECTED_TABLE_DATA = [ |
| { |
| 'number': 1, |
| 'string': u'你好', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec': None, |
| }, |
| { |
| 'number': 4, |
| 'string': u'привет', |
| 'time': datetime.time(12, 44, 31), |
| 'datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec': { |
| 'rec_datetime': datetime.datetime(2018, 12, 31, 12, 44, 31), |
| 'rec_rec': { |
| 'rec_rec_datetime': datetime.datetime( |
| 2018, 12, 31, 12, 44, 31) |
| } |
| }, |
| } |
| ] |
| query = StaticValueProvider(str, self.query) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Direct read with query' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| use_native_datetime=True, |
| use_standard_sql=True, |
| project=self.project, |
| query=query)) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source_with_query_and_filters(self): |
| EXPECTED_TABLE_DATA = [{'string': u'привет'}] |
| query = StaticValueProvider(str, self.query) |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p | 'Direct read with query' >> beam.io.ReadFromBigQuery( |
| method=beam.io.ReadFromBigQuery.Method.DIRECT_READ, |
| row_restriction='number > 2', |
| selected_fields=['string'], |
| use_standard_sql=True, |
| project=self.project, |
| query=query)) |
| assert_that(result, equal_to(EXPECTED_TABLE_DATA)) |
| |
| |
| class ReadNewTypesTests(BigQueryReadIntegrationTests): |
| @classmethod |
| def setUpClass(cls): |
| super(ReadNewTypesTests, cls).setUpClass() |
| cls.table_name = 'python_new_types' |
| cls.create_table(cls.table_name) |
| |
| table_id = '{}.{}'.format(cls.dataset_id, cls.table_name) |
| cls.query = 'SELECT float, numeric, bytes, date, time, datetime,' \ |
| 'timestamp, geo FROM `%s`' % table_id |
| |
| @classmethod |
| def create_table(cls, table_name): |
| table_schema = bigquery.TableSchema() |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'float' |
| table_field.type = 'FLOAT' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'numeric' |
| table_field.type = 'NUMERIC' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'bytes' |
| table_field.type = 'BYTES' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'date' |
| table_field.type = 'DATE' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'time' |
| table_field.type = 'TIME' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'datetime' |
| table_field.type = 'DATETIME' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'timestamp' |
| table_field.type = 'TIMESTAMP' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'geo' |
| table_field.type = 'GEOGRAPHY' |
| table_schema.fields.append(table_field) |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId=cls.project, datasetId=cls.dataset_id, |
| tableId=table_name), |
| schema=table_schema) |
| request = bigquery.BigqueryTablesInsertRequest( |
| projectId=cls.project, datasetId=cls.dataset_id, table=table) |
| cls.bigquery_client.client.tables.Insert(request) |
| row_data = { |
| 'float': 0.33, |
| 'numeric': Decimal('10'), |
| 'bytes': base64.b64encode(b'\xab\xac').decode('utf-8'), |
| 'date': '3000-12-31', |
| 'time': '23:59:59', |
| 'datetime': '2018-12-31T12:44:31', |
| 'timestamp': '2018-12-31 12:44:31.744957 UTC', |
| 'geo': 'POINT(30 10)' |
| } |
| |
| table_data = [row_data] |
| # add rows with only one key value pair and None values for all other keys |
| for key, value in row_data.items(): |
| table_data.append({key: value}) |
| |
| cls.bigquery_client.insert_rows( |
| cls.project, cls.dataset_id, table_name, table_data) |
| |
| def get_expected_data(self, native=True): |
| byts = b'\xab\xac' |
| expected_row = { |
| 'float': 0.33, |
| 'numeric': Decimal('10'), |
| 'bytes': base64.b64encode(byts) if native else byts, |
| 'date': '3000-12-31', |
| 'time': '23:59:59', |
| 'datetime': '2018-12-31T12:44:31', |
| 'timestamp': '2018-12-31 12:44:31.744957 UTC', |
| 'geo': 'POINT(30 10)' |
| } |
| |
| expected_data = [expected_row] |
| |
| # add rows with only one key value pair and None values for all other keys |
| for key, value in expected_row.items(): |
| row = {k: None for k in expected_row} |
| row[key] = value |
| expected_data.append(row) |
| |
| return expected_data |
| |
| @skip(['PortableRunner', 'FlinkRunner']) |
| @pytest.mark.it_postcommit |
| def test_native_source(self): |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p |
| | 'read' >> beam.io.Read( |
| beam.io.BigQuerySource(query=self.query, use_standard_sql=True))) |
| assert_that(result, equal_to(self.get_expected_data())) |
| |
| @pytest.mark.it_postcommit |
| def test_iobase_source(self): |
| with beam.Pipeline(argv=self.args) as p: |
| result = ( |
| p |
| | 'read' >> beam.io.ReadFromBigQuery( |
| query=self.query, |
| use_standard_sql=True, |
| project=self.project, |
| bigquery_job_labels={'launcher': 'apache_beam_tests'}) |
| | beam.Map(datetime_to_utc)) |
| assert_that(result, equal_to(self.get_expected_data(native=False))) |
| |
| |
| class ReadAllBQTests(BigQueryReadIntegrationTests): |
| TABLE_DATA_1 = [{ |
| 'number': 1, 'str': 'abc' |
| }, { |
| 'number': 2, 'str': 'def' |
| }, { |
| 'number': 3, 'str': u'你好' |
| }, { |
| 'number': 4, 'str': u'привет' |
| }] |
| |
| TABLE_DATA_2 = [{ |
| 'number': 10, 'str': 'abcd' |
| }, { |
| 'number': 20, 'str': 'defg' |
| }, { |
| 'number': 30, 'str': u'你好' |
| }, { |
| 'number': 40, 'str': u'привет' |
| }] |
| |
| TABLE_DATA_3 = [{'number': 10, 'str': 'abcde', 'extra': 3}] |
| |
| @classmethod |
| def setUpClass(cls): |
| super(ReadAllBQTests, cls).setUpClass() |
| cls.SCHEMA_BQ = cls.create_bq_schema() |
| cls.SCHEMA_BQ_WITH_EXTRA = cls.create_bq_schema(True) |
| |
| cls.table_name1 = 'python_rd_table_1' |
| cls.table_schema1 = cls.create_table( |
| cls.table_name1, cls.TABLE_DATA_1, cls.SCHEMA_BQ) |
| table_id1 = '{}.{}'.format(cls.dataset_id, cls.table_name1) |
| cls.query1 = 'SELECT number, str FROM `%s`' % table_id1 |
| |
| cls.table_name2 = 'python_rd_table_2' |
| cls.table_schema2 = cls.create_table( |
| cls.table_name2, cls.TABLE_DATA_2, cls.SCHEMA_BQ) |
| table_id2 = '{}.{}'.format(cls.dataset_id, cls.table_name2) |
| cls.query2 = 'SELECT number, str FROM %s' % table_id2 |
| |
| cls.table_name3 = 'python_rd_table_3' |
| cls.table_schema3 = cls.create_table( |
| cls.table_name3, cls.TABLE_DATA_3, cls.SCHEMA_BQ_WITH_EXTRA) |
| table_id3 = '{}.{}'.format(cls.dataset_id, cls.table_name3) |
| cls.query3 = 'SELECT number, str, extra FROM `%s`' % table_id3 |
| |
| @classmethod |
| def create_table(cls, table_name, data, table_schema): |
| table = bigquery.Table( |
| tableReference=bigquery.TableReference( |
| projectId=cls.project, datasetId=cls.dataset_id, |
| tableId=table_name), |
| schema=table_schema) |
| request = bigquery.BigqueryTablesInsertRequest( |
| projectId=cls.project, datasetId=cls.dataset_id, table=table) |
| cls.bigquery_client.client.tables.Insert(request) |
| cls.bigquery_client.insert_rows( |
| cls.project, cls.dataset_id, table_name, data) |
| return table_schema |
| |
| @classmethod |
| def create_bq_schema(cls, with_extra=False): |
| table_schema = bigquery.TableSchema() |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'number' |
| table_field.type = 'INTEGER' |
| table_field.mode = 'NULLABLE' |
| table_schema.fields.append(table_field) |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'str' |
| table_field.type = 'STRING' |
| table_field.mode = 'NULLABLE' |
| table_schema.fields.append(table_field) |
| if with_extra: |
| table_field = bigquery.TableFieldSchema() |
| table_field.name = 'extra' |
| table_field.type = 'INTEGER' |
| table_field.mode = 'NULLABLE' |
| table_schema.fields.append(table_field) |
| return table_schema |
| |
| @skip(['PortableRunner', 'FlinkRunner']) |
| @pytest.mark.it_postcommit |
| def test_read_queries(self): |
| # TODO(https://github.com/apache/beam/issues/20610): Remove experiment when |
| # tests run on r_v2. |
| args = self.args + ["--experiments=use_runner_v2"] |
| with beam.Pipeline(argv=args) as p: |
| result = ( |
| p |
| | beam.Create([ |
| beam.io.ReadFromBigQueryRequest(query=self.query1), |
| beam.io.ReadFromBigQueryRequest( |
| query=self.query2, use_standard_sql=False), |
| beam.io.ReadFromBigQueryRequest( |
| table='%s.%s' % (self.dataset_id, self.table_name3)) |
| ]) |
| | beam.io.ReadAllFromBigQuery()) |
| assert_that( |
| result, |
| equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3)) |
| |
| |
| class ReadInteractiveRunnerTests(BigQueryReadIntegrationTests): |
| @skip(['PortableRunner', 'FlinkRunner']) |
| @pytest.mark.it_postcommit |
| def test_read_in_interactive_runner(self): |
| p = beam.Pipeline(InteractiveRunner(), argv=self.args) |
| pcoll = p | beam.io.ReadFromBigQuery(query="SELECT 1") |
| result = interactive_beam.collect(pcoll) |
| assert result.iloc[0, 0] == 1 |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |