blob: 0eb3351ee84c6b58b7832e3ec24ac0de00ae5a26 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import typing
import unittest.mock
import mock
import numpy as np
import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options import value_provider
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryToSchema(unittest.TestCase):
def test_check_schema_conversions(self):
fields = [
bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
self.assertEqual(
usertype.__annotations__,
{
'stn': typing.Optional[str],
'temp': typing.Sequence[np.float64],
'count': typing.Optional[np.int64]
})
def test_check_conversion_with_selected_fields(self):
fields = [
bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema, selected_fields=['stn', 'count'])
self.assertEqual(
usertype.__annotations__, {
'stn': typing.Optional[str], 'count': typing.Optional[np.int64]
})
def test_check_conversion_with_empty_schema(self):
fields = []
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
self.assertEqual(usertype.__annotations__, {})
def test_check_schema_conversions_with_timestamp(self):
fields = [
bigquery.TableFieldSchema(name='stn', type='STRING', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(
name='times', type='TIMESTAMP', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
self.assertEqual(
usertype.__annotations__,
{
'stn': typing.Optional[str],
'temp': typing.Sequence[np.float64],
'times': typing.Optional[apache_beam.utils.timestamp.Timestamp]
})
def test_unsupported_type(self):
fields = [
bigquery.TableFieldSchema(
name='number', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
def test_unsupported_mode(self):
fields = [
bigquery.TableFieldSchema(name='number', type='INTEGER', mode="NESTED"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported mode: 'NESTED'"):
bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_export(self, get_table):
fields = [
bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
table = apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.Table(
schema=schema)
get_table.return_value = table
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="EXPORT",
project="project",
output_type='BEAM_ROW')
@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
fields = [
bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
table = apache_beam.io.gcp.internal.clients.bigquery. \
bigquery_v2_messages.Table(
schema=schema)
get_table.return_value = table
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="DIRECT_READ",
project="project",
output_type='BEAM_ROW')
def test_unsupported_value_provider(self):
with self.assertRaisesRegex(TypeError,
'ReadFromBigQuery: table must be of type string'
'; got ValueProvider instead'):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=value_provider.ValueProvider(), output_type='BEAM_ROW')
def test_unsupported_callable(self):
def filterTable(table):
if table is not None:
return table
res = filterTable
with self.assertRaisesRegex(TypeError,
'ReadFromBigQuery: table must be of type string'
'; got a callable instead'):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=res, output_type='BEAM_ROW')
def test_unsupported_query_export(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="EXPORT",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
def test_unsupported_query_direct_read(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="DIRECT_READ",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
def test_geography_type_support(self):
"""Test that GEOGRAPHY type is properly supported in schema conversion."""
fields = [
bigquery.TableFieldSchema(
name='location', type='GEOGRAPHY', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='locations', type='GEOGRAPHY', mode="REPEATED"),
bigquery.TableFieldSchema(
name='required_location', type='GEOGRAPHY', mode="REQUIRED")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
the_table_schema=schema)
expected_annotations = {
'location': typing.Optional[str],
'locations': typing.Sequence[str],
'required_location': str
}
self.assertEqual(usertype.__annotations__, expected_annotations)
def test_geography_in_bq_to_python_types_mapping(self):
"""Test that GEOGRAPHY is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES
self.assertIn("GEOGRAPHY", BIG_QUERY_TO_PYTHON_TYPES)
self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["GEOGRAPHY"], str)
def test_geography_field_type_conversion(self):
"""Test bq_field_to_type function with GEOGRAPHY fields."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type
# Test required GEOGRAPHY field
result = bq_field_to_type("GEOGRAPHY", "REQUIRED")
self.assertEqual(result, str)
# Test nullable GEOGRAPHY field
result = bq_field_to_type("GEOGRAPHY", "NULLABLE")
self.assertEqual(result, typing.Optional[str])
# Test repeated GEOGRAPHY field
result = bq_field_to_type("GEOGRAPHY", "REPEATED")
self.assertEqual(result, typing.Sequence[str])
# Test GEOGRAPHY field with None mode (should default to nullable)
result = bq_field_to_type("GEOGRAPHY", None)
self.assertEqual(result, typing.Optional[str])
# Test GEOGRAPHY field with empty mode (should default to nullable)
result = bq_field_to_type("GEOGRAPHY", "")
self.assertEqual(result, typing.Optional[str])
def test_convert_to_usertype_with_geography(self):
"""Test convert_to_usertype function with GEOGRAPHY fields."""
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='location', type='GEOGRAPHY', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='name', type='STRING', mode="REQUIRED")
])
conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)
# Verify the transform is created successfully
self.assertIsNotNone(conversion_transform)
# The transform should be a ParDo with BeamSchemaConversionDoFn
self.assertIsInstance(conversion_transform, beam.ParDo)
def test_beam_schema_conversion_dofn_with_geography(self):
"""Test BeamSchemaConversionDoFn with GEOGRAPHY data."""
from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn
# Create a user type with GEOGRAPHY field
fields = [
bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='location', type='GEOGRAPHY', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
# Create the DoFn
dofn = BeamSchemaConversionDoFn(usertype)
# Test processing a dictionary with GEOGRAPHY data
input_dict = {'id': 1, 'location': 'POINT(30 10)'}
results = list(dofn.process(input_dict))
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result.id, 1)
self.assertEqual(result.location, 'POINT(30 10)')
def test_geography_with_complex_wkt(self):
"""Test GEOGRAPHY type with complex Well-Known Text geometries."""
fields = [
bigquery.TableFieldSchema(
name='simple_point', type='GEOGRAPHY', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='linestring', type='GEOGRAPHY', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='polygon', type='GEOGRAPHY', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='multigeometry', type='GEOGRAPHY', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)
# All GEOGRAPHY fields should map to Optional[str]
expected_annotations = {
'simple_point': typing.Optional[str],
'linestring': typing.Optional[str],
'polygon': typing.Optional[str],
'multigeometry': typing.Optional[str]
}
self.assertEqual(usertype.__annotations__, expected_annotations)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()