blob: 6cf4529f588fcfcbae3270c06f3ec93bdf5ba1b1 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for BigQuery sources and sinks."""
from __future__ import absolute_import
import decimal
import json
import logging
import os
import random
import re
import time
import unittest
import uuid
# patches unittest.TestCase to be python3 compatible
import future.tests.base # pylint: disable=unused-import
import hamcrest as hc
import mock
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import _StreamToBigQuery
from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.tests import utils
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
# pylint: enable=wrong-import-order, wrong-import-position
_LOGGER = logging.getLogger(__name__)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestTableRowJsonCoder(unittest.TestCase):
def test_row_as_table_row(self):
schema_definition = [
('s', 'STRING'),
('i', 'INTEGER'),
('f', 'FLOAT'),
('b', 'BOOLEAN'),
('n', 'NUMERIC'),
('r', 'RECORD'),
('g', 'GEOGRAPHY')]
data_definition = [
'abc',
123,
123.456,
True,
decimal.Decimal('987654321.987654321'),
{'a': 'b'},
'LINESTRING(1 2, 3 4, 5 6, 7 8)']
str_def = ('{"s": "abc", '
'"i": 123, '
'"f": 123.456, '
'"b": true, '
'"n": "987654321.987654321", '
'"r": {"a": "b"}, '
'"g": "LINESTRING(1 2, 3 4, 5 6, 7 8)"}')
schema = bigquery.TableSchema(
fields=[bigquery.TableFieldSchema(name=k, type=v)
for k, v in schema_definition])
coder = TableRowJsonCoder(table_schema=schema)
def value_or_decimal_to_json(val):
if isinstance(val, decimal.Decimal):
return to_json_value(str(val))
else:
return to_json_value(val)
test_row = bigquery.TableRow(
f=[bigquery.TableCell(v=value_or_decimal_to_json(e))
for e in data_definition])
self.assertEqual(str_def, coder.encode(test_row))
self.assertEqual(test_row, coder.decode(coder.encode(test_row)))
# A coder without schema can still decode.
self.assertEqual(
test_row, TableRowJsonCoder().decode(coder.encode(test_row)))
def test_row_and_no_schema(self):
coder = TableRowJsonCoder()
test_row = bigquery.TableRow(
f=[bigquery.TableCell(v=to_json_value(e))
for e in ['abc', 123, 123.456, True]])
with self.assertRaisesRegex(AttributeError,
r'^The TableRowJsonCoder requires'):
coder.encode(test_row)
def json_compliance_exception(self, value):
with self.assertRaisesRegex(ValueError, re.escape(JSON_COMPLIANCE_ERROR)):
schema_definition = [('f', 'FLOAT')]
schema = bigquery.TableSchema(
fields=[bigquery.TableFieldSchema(name=k, type=v)
for k, v in schema_definition])
coder = TableRowJsonCoder(table_schema=schema)
test_row = bigquery.TableRow(
f=[bigquery.TableCell(v=to_json_value(value))])
coder.encode(test_row)
def test_invalid_json_nan(self):
self.json_compliance_exception(float('nan'))
def test_invalid_json_inf(self):
self.json_compliance_exception(float('inf'))
def test_invalid_json_neg_inf(self):
self.json_compliance_exception(float('-inf'))
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySource(unittest.TestCase):
def test_display_data_item_on_validate_true(self):
source = beam.io.BigQuerySource('dataset.table', validate=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_table_reference_display_data(self):
source = beam.io.BigQuerySource('dataset.table')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'dataset.table')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
source = beam.io.BigQuerySource('project:dataset.table')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'project:dataset.table')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation',
False),
DisplayDataItemMatcher('table',
'xyz.com:project:dataset.table')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_parse_table_reference(self):
source = beam.io.BigQuerySource('dataset.table')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
source = beam.io.BigQuerySource('project:dataset.table')
self.assertEqual(source.table_reference.projectId, 'project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
source = beam.io.BigQuerySource('xyz.com:project:dataset.table')
self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')
source = beam.io.BigQuerySource(query='my_query')
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
self.assertTrue(source.use_legacy_sql)
def test_query_only_display_data(self):
source = beam.io.BigQuerySource(query='my_query')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('query', 'my_query')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_specify_query_sql_format(self):
source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
self.assertEqual(source.query, 'my_query')
self.assertFalse(source.use_legacy_sql)
def test_specify_query_flattened_records(self):
source = beam.io.BigQuerySource(query='my_query', flatten_results=False)
self.assertFalse(source.flatten_results)
def test_specify_query_unflattened_records(self):
source = beam.io.BigQuerySource(query='my_query', flatten_results=True)
self.assertTrue(source.flatten_results)
def test_specify_query_without_table(self):
source = beam.io.BigQuerySource(query='my_query')
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
def test_date_partitioned_table_name(self):
source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table$20030102')]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySink(unittest.TestCase):
def test_table_spec_display_data(self):
sink = beam.io.BigQuerySink('dataset.table')
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher('table', 'dataset.table'),
DisplayDataItemMatcher('validation', False)]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_parse_schema_descriptor(self):
sink = beam.io.BigQuerySink(
'dataset.table', schema='s:STRING, n:INTEGER')
self.assertEqual(sink.table_reference.datasetId, 'dataset')
self.assertEqual(sink.table_reference.tableId, 'table')
result_schema = {
field.name: field.type for field in sink.table_schema.fields}
self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema)
def test_project_table_display_data(self):
sinkq = beam.io.BigQuerySink('PROJECT:dataset.table')
dd = DisplayData.create_from(sinkq)
expected_items = [
DisplayDataItemMatcher('table', 'PROJECT:dataset.table'),
DisplayDataItemMatcher('validation', False)]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_simple_schema_as_json(self):
sink = beam.io.BigQuerySink(
'PROJECT:dataset.table', schema='s:STRING, n:INTEGER')
self.assertEqual(
json.dumps({'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}),
sink.schema_as_json())
def test_nested_schema_as_json(self):
string_field = bigquery.TableFieldSchema(
name='s', type='STRING', mode='NULLABLE', description='s description')
number_field = bigquery.TableFieldSchema(
name='n', type='INTEGER', mode='REQUIRED', description='n description')
record_field = bigquery.TableFieldSchema(
name='r', type='RECORD', mode='REQUIRED', description='r description',
fields=[string_field, number_field])
schema = bigquery.TableSchema(fields=[record_field])
sink = beam.io.BigQuerySink('dataset.table', schema=schema)
self.assertEqual(
{'fields': [
{'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED',
'description': 'r description', 'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE',
'description': 's description'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED',
'description': 'n description'}]}]},
json.loads(sink.schema_as_json()))
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteToBigQuery(unittest.TestCase):
def _cleanup_files(self):
if os.path.exists('insert_calls1'):
os.remove('insert_calls1')
if os.path.exists('insert_calls2'):
os.remove('insert_calls2')
def setUp(self):
self._cleanup_files()
def tearDown(self):
self._cleanup_files()
def test_noop_schema_parsing(self):
expected_table_schema = None
table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
schema=None)
self.assertEqual(expected_table_schema, table_schema)
def test_dict_schema_parsing(self):
schema = {'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema)
string_field = bigquery.TableFieldSchema(
name='s', type='STRING', mode='NULLABLE')
nested_field = bigquery.TableFieldSchema(
name='x', type='INTEGER', mode='NULLABLE')
number_field = bigquery.TableFieldSchema(
name='n', type='INTEGER', mode='NULLABLE')
record_field = bigquery.TableFieldSchema(
name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
expected_table_schema = bigquery.TableSchema(
fields=[string_field, number_field, record_field])
self.assertEqual(expected_table_schema, table_schema)
def test_string_schema_parsing(self):
schema = 's:STRING, n:INTEGER'
expected_dict_schema = {'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
dict_schema = (
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(expected_dict_schema, dict_schema)
def test_table_schema_parsing(self):
string_field = bigquery.TableFieldSchema(
name='s', type='STRING', mode='NULLABLE')
nested_field = bigquery.TableFieldSchema(
name='x', type='INTEGER', mode='NULLABLE')
number_field = bigquery.TableFieldSchema(
name='n', type='INTEGER', mode='NULLABLE')
record_field = bigquery.TableFieldSchema(
name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
schema = bigquery.TableSchema(
fields=[string_field, number_field, record_field])
expected_dict_schema = {'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
dict_schema = (
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(expected_dict_schema, dict_schema)
def test_table_schema_parsing_end_to_end(self):
string_field = bigquery.TableFieldSchema(
name='s', type='STRING', mode='NULLABLE')
nested_field = bigquery.TableFieldSchema(
name='x', type='INTEGER', mode='NULLABLE')
number_field = bigquery.TableFieldSchema(
name='n', type='INTEGER', mode='NULLABLE')
record_field = bigquery.TableFieldSchema(
name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
schema = bigquery.TableSchema(
fields=[string_field, number_field, record_field])
table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(table_schema, schema)
def test_none_schema_parsing(self):
schema = None
expected_dict_schema = None
dict_schema = (
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(expected_dict_schema, dict_schema)
def test_noop_dict_schema_parsing(self):
schema = {'fields': [
{'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
expected_dict_schema = schema
dict_schema = (
beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
self.assertEqual(expected_dict_schema, dict_schema)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryStreamingInsertTransformTests(unittest.TestCase):
def test_dofn_client_process_performs_batching(self):
client = mock.Mock()
client.tables.Get.return_value = bigquery.Table(
tableReference=bigquery.TableReference(
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = \
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
batch_size=2,
create_disposition=create_disposition,
write_disposition=write_disposition,
kms_key=None,
test_client=client)
fn.process(('project_id:dataset_id.table_id', {'month': 1}))
# InsertRows not called as batch size is not hit yet
self.assertFalse(client.tabledata.InsertAll.called)
def test_dofn_client_process_flush_called(self):
client = mock.Mock()
client.tables.Get.return_value = bigquery.Table(
tableReference=bigquery.TableReference(
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = (
bigquery.TableDataInsertAllResponse(insertErrors=[]))
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
batch_size=2,
create_disposition=create_disposition,
write_disposition=write_disposition,
kms_key=None,
test_client=client)
fn.start_bundle()
fn.process(('project_id:dataset_id.table_id', ({'month': 1}, 'insertid1')))
fn.process(('project_id:dataset_id.table_id', ({'month': 2}, 'insertid2')))
# InsertRows called as batch size is hit
self.assertTrue(client.tabledata.InsertAll.called)
def test_dofn_client_finish_bundle_flush_called(self):
client = mock.Mock()
client.tables.Get.return_value = bigquery.Table(
tableReference=bigquery.TableReference(
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = \
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
batch_size=2,
create_disposition=create_disposition,
write_disposition=write_disposition,
kms_key=None,
test_client=client)
fn.start_bundle()
# Destination is a tuple of (destination, schema) to ensure the table is
# created.
fn.process(('project_id:dataset_id.table_id', ({'month': 1}, 'insertid3')))
self.assertTrue(client.tables.Get.called)
# InsertRows not called as batch size is not hit
self.assertFalse(client.tabledata.InsertAll.called)
fn.finish_bundle()
# InsertRows called in finish bundle
self.assertTrue(client.tabledata.InsertAll.called)
def test_dofn_client_no_records(self):
client = mock.Mock()
client.tables.Get.return_value = bigquery.Table(
tableReference=bigquery.TableReference(
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = \
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
batch_size=2,
create_disposition=create_disposition,
write_disposition=write_disposition,
kms_key=None,
test_client=client)
fn.start_bundle()
# InsertRows not called as batch size is not hit
self.assertFalse(client.tabledata.InsertAll.called)
fn.finish_bundle()
# InsertRows not called in finish bundle as no records
self.assertFalse(client.tabledata.InsertAll.called)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
def test_failure_has_same_insert_ids(self):
tempdir = '%s%s' % (self._new_tempdir(), os.sep)
file_name_1 = os.path.join(tempdir, 'file1')
file_name_2 = os.path.join(tempdir, 'file2')
def store_callback(arg):
insert_ids = [r.insertId for r in arg.tableDataInsertAllRequest.rows]
colA_values = [r.json.additionalProperties[0].value.string_value
for r in arg.tableDataInsertAllRequest.rows]
json_output = {'insertIds': insert_ids,
'colA_values': colA_values}
# The first time we try to insert, we save those insertions in
# file insert_calls1.
if not os.path.exists(file_name_1):
with open(file_name_1, 'w') as f:
json.dump(json_output, f)
raise RuntimeError()
else:
with open(file_name_2, 'w') as f:
json.dump(json_output, f)
res = mock.Mock()
res.insertErrors = []
return res
client = mock.Mock()
client.tabledata.InsertAll = mock.Mock(side_effect=store_callback)
# Using the bundle based direct runner to avoid pickling problems
# with mocks.
with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
_ = (p
| beam.Create([{'columnA':'value1', 'columnB':'value2'},
{'columnA':'value3', 'columnB':'value4'},
{'columnA':'value5', 'columnB':'value6'}])
| _StreamToBigQuery(
'project:dataset.table',
[], [],
'anyschema',
None,
'CREATE_NEVER', None,
None, None,
[], test_client=client))
with open(file_name_1) as f1, open(file_name_2) as f2:
self.assertEqual(
json.load(f1),
json.load(f2))
class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
# Prevent nose from finding and running tests that were not
# specified in the Gradle file.
# See "More tests may be found" in:
# https://nose.readthedocs.io/en/latest/doc_tests/test_multiprocess
# /multiprocess.html#other-differences-in-test-running
_multiprocess_can_split_ = True
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
str(int(time.time())),
random.randint(0, 10000))
self.bigquery_client = bigquery_tools.BigQueryWrapper()
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)
_LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
@attr('IT')
def test_value_provider_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
schema = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY'},
'clustering': {'fields': ['language']}}
table_ref = bigquery_tools.parse_table_reference(output_table_1)
table_ref2 = bigquery_tools.parse_table_reference(output_table_2)
pipeline_verifiers = [
BigQueryTableMatcher(
project=self.project,
dataset=table_ref.datasetId,
table=table_ref.tableId,
expected_properties=additional_bq_parameters),
BigQueryTableMatcher(
project=self.project,
dataset=table_ref2.datasetId,
table=table_ref2.tableId,
expected_properties=additional_bq_parameters),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_2,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d])]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers),
experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
input = p | beam.Create([row for row in _ELEMENTS if 'language' in row])
_ = (input
| "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=value_provider.StaticValueProvider(
str, '%s:%s' % (self.project, output_table_1)),
schema=value_provider.StaticValueProvider(dict, schema),
additional_bq_parameters=additional_bq_parameters,
method='STREAMING_INSERTS'))
_ = (input
| "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=value_provider.StaticValueProvider(
str, '%s:%s' % (self.project, output_table_2)),
schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
additional_bq_parameters=lambda _: additional_bq_parameters,
method='FILE_LOADS'))
@attr('IT')
def test_multiple_destinations_transform(self):
streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest("TestStream is not supported on TestDataflowRunner")
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
full_output_table_1 = '%s:%s' % (self.project, output_table_1)
full_output_table_2 = '%s:%s' % (self.project, output_table_2)
schema1 = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
schema2 = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'foundation', 'type': 'STRING', 'mode': 'NULLABLE'}]}
bad_record = {'language': 1, 'manguage': 2}
if streaming:
pipeline_verifiers = [
PipelineStateMatcher(PipelineState.RUNNING),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
else:
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers),
experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
if streaming:
_SIZE = len(_ELEMENTS)
test_stream = (TestStream()
.advance_watermark_to(0)
.add_elements(_ELEMENTS[:_SIZE//2])
.advance_watermark_to(100)
.add_elements(_ELEMENTS[_SIZE//2:])
.advance_watermark_to_infinity())
input = p | test_stream
else:
input = p | beam.Create(_ELEMENTS)
schema_table_pcv = beam.pvalue.AsDict(
p | "MakeSchemas" >> beam.Create([(full_output_table_1, schema1),
(full_output_table_2, schema2)]))
table_record_pcv = beam.pvalue.AsDict(
p | "MakeTables" >> beam.Create([('table1', full_output_table_1),
('table2', full_output_table_2)]))
input2 = p | "Broken record" >> beam.Create([bad_record])
input = (input, input2) | beam.Flatten()
r = (input
| "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=lambda x, tables: (tables['table1']
if 'language' in x
else tables['table2']),
table_side_inputs=(table_record_pcv,),
schema=lambda dest, table_map: table_map.get(dest, None),
schema_side_inputs=(schema_table_pcv,),
method='STREAMING_INSERTS'))
assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
equal_to([(full_output_table_1, bad_record)]))
def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
_LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
_LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class PubSubBigQueryIT(unittest.TestCase):
INPUT_TOPIC = 'psit_topic_output'
INPUT_SUB = 'psit_subscription_input'
BIG_QUERY_DATASET_ID = 'python_pubsub_bq_'
SCHEMA = {'fields': [
{'name': 'number', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
_SIZE = 4
WAIT_UNTIL_FINISH_DURATION = 15 * 60 * 1000
def setUp(self):
# Set up PubSub
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self.uuid = str(uuid.uuid4())
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
self.pub_client.topic_path(self.project, self.INPUT_TOPIC + self.uuid))
self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
self.sub_client.subscription_path(self.project,
self.INPUT_SUB + self.uuid),
self.input_topic.name)
# Set up BQ
self.dataset_ref = utils.create_bq_dataset(self.project,
self.BIG_QUERY_DATASET_ID)
self.output_table = "%s.output_table" % (self.dataset_ref.dataset_id)
def tearDown(self):
# Tear down PubSub
test_utils.cleanup_topics(self.pub_client,
[self.input_topic])
test_utils.cleanup_subscriptions(self.sub_client,
[self.input_sub])
# Tear down BigQuery
utils.delete_bq_dataset(self.project, self.dataset_ref)
def _run_pubsub_bq_pipeline(self, method, triggering_frequency=None):
l = [i for i in range(self._SIZE)]
matchers = [
PipelineStateMatcher(PipelineState.RUNNING),
BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT number FROM %s" % self.output_table,
data=[(i,) for i in l])]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*matchers),
wait_until_finish_duration=self.WAIT_UNTIL_FINISH_DURATION,
experiments='use_beam_bq_sink',
streaming=True)
def add_schema_info(element):
yield {'number': element}
messages = [str(i).encode('utf-8') for i in l]
for message in messages:
self.pub_client.publish(self.input_topic.name, message)
with beam.Pipeline(argv=args) as p:
mesages = (p
| ReadFromPubSub(subscription=self.input_sub.name)
| beam.ParDo(add_schema_info))
_ = mesages | WriteToBigQuery(
self.output_table,
schema=self.SCHEMA,
method=method,
triggering_frequency=triggering_frequency)
@attr('IT')
def test_streaming_inserts(self):
self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.STREAMING_INSERTS)
@attr('IT')
def test_file_loads(self):
if isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest('https://issuetracker.google.com/issues/118375066')
self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.FILE_LOADS,
triggering_frequency=20)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()