blob: 6716aa1bc10f643378160762ae5eff065fce85de [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Integration tests for BigQuery's JSON data type
"""
import argparse
import json
import logging
import time
import unittest
from random import randint
import pytest
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.api_core import exceptions as gexc
from google.cloud import bigquery
except ImportError:
gexc = None
bigquery = None
# pylint: enable=wrong-import-order, wrong-import-position
_LOGGER = logging.getLogger(__name__)
PROJECT = 'apache-beam-testing'
DATASET_ID = 'bq_jsontype_test_nodelete'
JSON_TABLE_NAME = 'json_data'
JSON_TABLE_DESTINATION = f"{PROJECT}:{DATASET_ID}.{JSON_TABLE_NAME}"
STREAMING_TEST_TABLE = "py_streaming_test" \
f"{time.time_ns() // 1000}_{randint(0,32)}"
FILE_LOAD_TABLE = "py_fileload_test" \
f"{time.time_ns() // 1000}_{randint(0,32)}"
class BigQueryJsonIT(unittest.TestCase):
created_tables = set()
@classmethod
def setUpClass(cls):
cls.test_pipeline = TestPipeline(is_integration_test=True)
@classmethod
def tearDownClass(cls):
if cls.created_tables:
client = bigquery.Client(project=PROJECT)
for ref in cls.created_tables:
try:
client.delete_table(ref[len(PROJECT) + 1:]) # need dataset:table
except gexc.NotFound:
pass # just skip
def run_test_write(self, options):
json_table_schema = self.generate_schema()
rows_to_write = []
json_data = self.generate_data()
for country_code, country in json_data.items():
cities_to_write = []
for city_name, city in country["cities"].items():
cities_to_write.append({'city_name': city_name, 'city': city})
rows_to_write.append({
'country_code': country_code,
'country': country["country"],
'stats': country["stats"],
'cities': cities_to_write,
'landmarks': country["landmarks"]
})
parser = argparse.ArgumentParser()
parser.add_argument('--write_method')
parser.add_argument('--output')
parser.add_argument('--unescape', required=False)
known_args, pipeline_args = parser.parse_known_args(options)
self.created_tables.add(known_args.output)
with beam.Pipeline(argv=pipeline_args) as p:
_ = (
p
| "Create rows with JSON data" >> beam.Create(rows_to_write)
| "Write to BigQuery" >> beam.io.WriteToBigQuery(
method=known_args.write_method,
table=known_args.output,
schema=json_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
extra_opts = {
'read_method': "EXPORT",
'input': known_args.output,
'unescape': known_args.unescape
}
read_options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.read_and_validate_rows(read_options)
def read_and_validate_rows(self, options):
json_data = self.generate_data()
parser = argparse.ArgumentParser()
parser.add_argument('--read_method')
parser.add_argument('--query')
parser.add_argument('--input')
parser.add_argument('--unescape', required=False)
known_args, pipeline_args = parser.parse_known_args(options)
# TODO(yathu) remove this conversion when FILE_LOAD produces unescaped
# JSON string
def maybe_unescape(value):
if known_args.unescape:
value = bytes(value, "utf-8").decode("unicode_escape")[1:-1]
return json.loads(value)
class CompareJson(beam.DoFn, unittest.TestCase):
def process(self, row):
country_code = row["country_code"]
expected = json_data[country_code]
# Test country (JSON String)
country_actual = maybe_unescape(row["country"])
country_expected = json.loads(expected["country"])
self.assertTrue(country_expected == country_actual)
# Test stats (JSON String in BigQuery struct)
for stat, value in row["stats"].items():
stats_actual = maybe_unescape(value)
stats_expected = json.loads(expected["stats"][stat])
self.assertTrue(stats_expected == stats_actual)
# Test cities (JSON String in BigQuery array of structs)
for city_row in row["cities"]:
city = city_row["city"]
city_name = city_row["city_name"]
city_actual = maybe_unescape(city)
city_expected = json.loads(expected["cities"][city_name])
self.assertTrue(city_expected == city_actual)
# Test landmarks (JSON String in BigQuery array)
landmarks_actual = row["landmarks"]
landmarks_expected = expected["landmarks"]
for i in range(len(landmarks_actual)):
l_actual = maybe_unescape(landmarks_actual[i])
l_expected = json.loads(landmarks_expected[i])
self.assertTrue(l_expected == l_actual)
method = ReadFromBigQuery.Method.DIRECT_READ if \
known_args.read_method == "DIRECT_READ" else \
ReadFromBigQuery.Method.EXPORT
if known_args.query:
json_query_data = self.generate_query_data()
with beam.Pipeline(argv=pipeline_args) as p:
data = p | 'Read rows' >> ReadFromBigQuery(
query=known_args.query, method=method, use_standard_sql=True)
assert_that(data, equal_to(json_query_data))
else:
with beam.Pipeline(argv=pipeline_args) as p:
_ = p | 'Read rows' >> ReadFromBigQuery(
table=known_args.input,
method=method,
) | 'Validate rows' >> beam.ParDo(CompareJson())
@pytest.mark.it_postcommit
def test_direct_read(self):
extra_opts = {
'read_method': "DIRECT_READ",
'input': JSON_TABLE_DESTINATION,
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.read_and_validate_rows(options)
@pytest.mark.it_postcommit
def test_export_read(self):
extra_opts = {
'read_method': "EXPORT",
'input': JSON_TABLE_DESTINATION,
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.read_and_validate_rows(options)
@pytest.mark.it_postcommit
def test_query_read(self):
extra_opts = {
'query': "SELECT "
"country_code, "
"country.past_leaders[2] AS past_leader, "
"stats.gdp_per_capita[\"gdp_per_capita\"] AS gdp, "
"cities[OFFSET(1)].city.name AS city_name, "
"landmarks[OFFSET(1)][\"name\"] AS landmark_name "
f"FROM `{PROJECT}.{DATASET_ID}.{JSON_TABLE_NAME}`",
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.read_and_validate_rows(options)
@pytest.mark.it_postcommit
def test_streaming_inserts(self):
extra_opts = {
'output': f"{PROJECT}:{DATASET_ID}.{STREAMING_TEST_TABLE}",
'write_method': "STREAMING_INSERTS"
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.run_test_write(options)
@pytest.mark.it_postcommit
def test_file_loads_write(self):
extra_opts = {
'output': f"{PROJECT}:{DATASET_ID}.{FILE_LOAD_TABLE}",
'write_method': "FILE_LOADS",
"unescape": "True"
}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
self.run_test_write(options)
# Schema for writing to BigQuery
def generate_schema(self):
from apache_beam.io.gcp.internal.clients.bigquery import TableFieldSchema
from apache_beam.io.gcp.internal.clients.bigquery import TableSchema
json_fields = [
TableFieldSchema(name='country_code', type='STRING', mode='NULLABLE'),
TableFieldSchema(name='country', type='JSON', mode='NULLABLE'),
TableFieldSchema(
name='stats',
type='STRUCT',
mode='NULLABLE',
fields=[
TableFieldSchema(
name="gdp_per_capita", type='JSON', mode='NULLABLE'),
TableFieldSchema(
name="co2_emissions", type='JSON', mode='NULLABLE'),
]),
TableFieldSchema(
name='cities',
type='STRUCT',
mode='REPEATED',
fields=[
TableFieldSchema(
name="city_name", type='STRING', mode='NULLABLE'),
TableFieldSchema(name="city", type='JSON', mode='NULLABLE'),
]),
TableFieldSchema(name='landmarks', type='JSON', mode='REPEATED'),
]
schema = TableSchema(fields=json_fields)
return schema
# Expected data for query test
def generate_query_data(self):
query_data = [{
'country_code': 'usa',
'past_leader': '\"George W. Bush\"',
'gdp': '58559.675',
'city_name': '\"Los Angeles\"',
'landmark_name': '\"Golden Gate Bridge\"'
},
{
'country_code': 'aus',
'past_leader': '\"Kevin Rudd\"',
'gdp': '58043.581',
'city_name': '\"Melbourne\"',
'landmark_name': '\"Great Barrier Reef\"'
},
{
'country_code': 'special',
'past_leader': '\"!@#$%^&*()_+\"',
'gdp': '421.7',
'city_name': '\"Bikini Bottom\"',
'landmark_name': "\"Willy Wonka's Factory\""
}]
return query_data
def generate_data(self):
# Raw country data
usa = {
"name": "United States of America",
"population": 329484123,
"cities": {
"nyc": {
"name": "New York City", "state": "NY", "population": 8622357
},
"la": {
"name": "Los Angeles", "state": "CA", "population": 4085014
},
"chicago": {
"name": "Chicago", "state": "IL", "population": 2670406
},
},
"past_leaders": [
"Donald Trump", "Barack Obama", "George W. Bush", "Bill Clinton"
],
"in_northern_hemisphere": True
}
aus = {
"name": "Australia",
"population": 25687041,
"cities": {
"sydney": {
"name": "Sydney",
"state": "New South Wales",
"population": 5367206
},
"melbourne": {
"name": "Melbourne", "state": "Victoria", "population": 5159211
},
"brisbane": {
"name": "Brisbane",
"state": "Queensland",
"population": 2560720
}
},
"past_leaders": [
"Malcolm Turnbull",
"Tony Abbot",
"Kevin Rudd",
],
"in_northern_hemisphere": False
}
special = {
"name": "newline\n, form\f, tab\t, \"quotes\", "
"\\backslash\\, backspace\b, \u0000_hex_\u0f0f",
"population": -123456789,
"cities": {
"basingse": {
"name": "Ba Sing Se",
"state": "The Earth Kingdom",
"population": 200000
},
"bikinibottom": {
"name": "Bikini Bottom",
"state": "The Pacific Ocean",
"population": 50000
}
},
"past_leaders": [
"1",
"2",
"!@#$%^&*()_+",
],
"in_northern_hemisphere": True
}
landmarks = {
"usa_0": {
"name": "Statue of Liberty", "cool rating": None
},
"usa_1": {
"name": "Golden Gate Bridge", "cool rating": "very cool"
},
"usa_2": {
"name": "Grand Canyon", "cool rating": "very very cool"
},
"aus_0": {
"name": "Sydney Opera House", "cool rating": "amazing"
},
"aus_1": {
"name": "Great Barrier Reef", "cool rating": None
},
"special_0": {
"name": "Hogwarts School of WitchCraft and Wizardry",
"cool rating": "magical"
},
"special_1": {
"name": "Willy Wonka's Factory", "cool rating": None
},
"special_2": {
"name": "Rivendell", "cool rating": "precious"
},
}
stats = {
"usa_gdp_per_capita": {
"gdp_per_capita": 58559.675, "currency": "constant 2015 US$"
},
"usa_co2_emissions": {
"co2 emissions": 15.241,
"measurement": "metric tons per capita",
"year": 2018
},
"aus_gdp_per_capita": {
"gdp_per_capita": 58043.581, "currency": "constant 2015 US$"
},
"aus_co2_emissions": {
"co2 emissions": 15.476,
"measurement": "metric tons per capita",
"year": 2018
},
"special_gdp_per_capita": {
"gdp_per_capita": 421.70, "currency": "constant 200 BC gold"
},
"special_co2_emissions": {
"co2 emissions": -10.79,
"measurement": "metric tons per capita",
"year": 2018
}
}
data = {
"usa": {
"country": json.dumps(usa),
"cities": {
"nyc": json.dumps(usa["cities"]["nyc"]),
"la": json.dumps(usa["cities"]["la"]),
"chicago": json.dumps(usa["cities"]["chicago"])
},
"landmarks": [
json.dumps(landmarks["usa_0"]),
json.dumps(landmarks["usa_1"]),
json.dumps(landmarks["usa_2"])
],
"stats": {
"gdp_per_capita": json.dumps(stats["usa_gdp_per_capita"]),
"co2_emissions": json.dumps(stats["usa_co2_emissions"])
}
},
"aus": {
"country": json.dumps(aus),
"cities": {
"sydney": json.dumps(aus["cities"]["sydney"]),
"melbourne": json.dumps(aus["cities"]["melbourne"]),
"brisbane": json.dumps(aus["cities"]["brisbane"])
},
"landmarks": [
json.dumps(landmarks["aus_0"]), json.dumps(landmarks["aus_1"])
],
"stats": {
"gdp_per_capita": json.dumps(stats["aus_gdp_per_capita"]),
"co2_emissions": json.dumps(stats["aus_co2_emissions"])
}
},
"special": {
"country": json.dumps(special),
"cities": {
"basingse": json.dumps(special["cities"]["basingse"]),
"bikinibottom": json.dumps(special["cities"]["bikinibottom"])
},
"landmarks": [
json.dumps(landmarks["special_0"]),
json.dumps(landmarks["special_1"]),
json.dumps(landmarks["special_2"])
],
"stats": {
"gdp_per_capita": json.dumps(stats["special_gdp_per_capita"]),
"co2_emissions": json.dumps(stats["special_co2_emissions"])
}
}
}
return data
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()