blob: 6a61cc26165e7e63b8443fe876ed563a61d85438 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for BigQuery file loads utilities."""
from __future__ import absolute_import
import json
import logging
import os
import random
import sys
import time
import unittest
import mock
from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.core.core.allof import all_of
from hamcrest.core.core.is_ import is_
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery_file_loads as bqfl
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import combiners
from apache_beam.typehints.typehints import Tuple
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
_LOGGER = logging.getLogger(__name__)
_DESTINATION_ELEMENT_PAIRS = [
# DESTINATION 1
('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
('project1:dataset1.table1', '{"name":"beam", "language":"java"}'),
('project1:dataset1.table1', '{"name":"beam", "language":"go"}'),
('project1:dataset1.table1', '{"name":"flink", "language":"java"}'),
('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'),
# DESTINATION 3
('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'),
# DESTINATION 1
('project1:dataset1.table1', '{"name":"spark", "language":"py"}'),
('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'),
# DESTINATION 2
('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'),
('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'),
('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'),
]
_NAME_LANGUAGE_ELEMENTS = [
json.loads(elm[1])
for elm in _DESTINATION_ELEMENT_PAIRS if "language" in elm[1]
]
_DISTINCT_DESTINATIONS = list(
set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
class CustomRowCoder(coders.Coder):
"""
Custom row coder that also expects strings as input data when encoding
"""
def __init__(self):
self.coder = bigquery_tools.RowAsDictJsonCoder()
def encode(self, table_row):
if type(table_row) == str:
table_row = json.loads(table_row)
return self.coder.encode(table_row)
def decode(self, encoded_table_row):
return self.coder.decode(encoded_table_row)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
maxDiff = None
def _consume_input(self, fn, checks=None):
if checks is None:
return
with TestPipeline() as p:
output_pcs = (
p
| beam.Create(_DESTINATION_ELEMENT_PAIRS)
| beam.ParDo(fn, self.tmpdir)
.with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG))
checks(output_pcs)
return output_pcs
def test_files_created(self):
"""Test that the files are created and written."""
fn = bqfl.WriteRecordsToFile(coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_files_created(output_pcs):
dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
files = dest_file_pc | "GetFiles" >> beam.Map(lambda x: x[1][0])
file_count = files | "CountFiles" >> combiners.Count.Globally()
_ = files | "FilesExist" >> beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
assert_that(file_count, equal_to([3]), label='check file count')
destinations = (
dest_file_pc
| "GetDests" >> beam.Map(
lambda x: bigquery_tools.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
self._consume_input(fn, check_files_created)
def test_many_files(self):
"""Forces records to be written to many files.
For each destination multiple files are necessary. This is because the max
file length is very small, so only a couple records fit in each file.
"""
fn = bqfl.WriteRecordsToFile(max_file_size=50, coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
files_per_dest = (dest_file_pc
| beam.Map(lambda x: x).with_output_types(
beam.typehints.KV[str, Tuple[str, int]])
| combiners.Count.PerKey())
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1]))
)
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 4),
('project1:dataset1.table2', 2),
('project1:dataset1.table3', 1)]))
# Check that the files exist
_ = dest_file_pc | beam.Map(lambda x: x[1][0]) | beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
self._consume_input(fn, check_many_files)
def test_records_are_spilled(self):
"""Forces records to be written to many files.
For each destination multiple files are necessary, and at most two files
can be created. This forces records to be spilled to the next stage of
processing.
"""
fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2,
coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
spilled_records_pc = output_pcs[
bqfl.WriteRecordsToFile.UNWRITTEN_RECORD_TAG]
spilled_records_count = (spilled_records_pc |
combiners.Count.Globally())
assert_that(spilled_records_count, equal_to([3]), label='spilled count')
files_per_dest = (dest_file_pc
| beam.Map(lambda x: x).with_output_types(
beam.typehints.KV[str, Tuple[str, int]])
| combiners.Count.PerKey())
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1])))
# Only table1 and table3 get files. table2 records get spilled.
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 1),
('project1:dataset1.table3', 1)]),
label='file count')
# Check that the files exist
_ = dest_file_pc | beam.Map(lambda x: x[1][0]) | beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
self._consume_input(fn, check_many_files)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteGroupedRecordsToFile(_TestCaseWithTempDirCleanUp):
def _consume_input(self, fn, input, checks):
if checks is None:
return
with TestPipeline() as p:
res = (p
| beam.Create(input)
| beam.GroupByKey()
| beam.ParDo(fn, self.tmpdir))
checks(res)
return res
def test_files_are_created(self):
"""Test that the files are created and written."""
fn = bqfl.WriteGroupedRecordsToFile(coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_files_created(output_pc):
files = output_pc | "GetFiles" >> beam.Map(lambda x: x[1][0])
file_count = files | "CountFiles" >> combiners.Count.Globally()
_ = files | "FilesExist" >> beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
assert_that(file_count, equal_to([3]), label='check file count')
destinations = (
output_pc
| "GetDests" >> beam.Map(
lambda x: bigquery_tools.get_hashable_destination(x[0])))
assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
label='check destinations ')
self._consume_input(
fn, _DESTINATION_ELEMENT_PAIRS, check_files_created)
def test_multiple_files(self):
"""Forces records to be written to many files.
For each destination multiple files are necessary. This is because the max
file length is very small, so only a couple records fit in each file.
"""
fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50,
coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_multiple_files(output_pc):
files_per_dest = output_pc | combiners.Count.PerKey()
files_per_dest = (
files_per_dest
| "GetDests" >> beam.Map(
lambda x: (bigquery_tools.get_hashable_destination(x[0]),
x[1])))
assert_that(files_per_dest,
equal_to([('project1:dataset1.table1', 4),
('project1:dataset1.table2', 2),
('project1:dataset1.table3', 1), ]))
# Check that the files exist
_ = output_pc | beam.Map(lambda x: x[1][0]) | beam.Map(os.path.exists)
self._consume_input(fn, _DESTINATION_ELEMENT_PAIRS, check_multiple_files)
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestPartitionFiles(unittest.TestCase):
_ELEMENTS = [('destination0', [('file0', 50), ('file1', 50),
('file2', 50), ('file3', 50)]),
('destination1', [('file0', 50), ('file1', 50)])]
def test_partition(self):
partition = bqfl.PartitionFiles.Partition(1000, 1)
self.assertEqual(partition.can_accept(50), True)
self.assertEqual(partition.can_accept(2000), False)
self.assertEqual(partition.can_accept(1000), True)
partition.add('file1', 50)
self.assertEqual(partition.files, ['file1'])
self.assertEqual(partition.size, 50)
self.assertEqual(partition.can_accept(50), False)
self.assertEqual(partition.can_accept(0), False)
def test_partition_files_dofn_file_split(self):
"""Force partitions to split based on max_files"""
multiple_partitions_result = [('destination0', ['file0', 'file1']),
('destination0', ['file2', 'file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
with TestPipeline() as p:
destination_file_pairs = p | beam.Create(self._ELEMENTS)
partitioned_files = (
destination_file_pairs
| beam.ParDo(bqfl.PartitionFiles(1000, 2))
.with_outputs(bqfl.PartitionFiles.MULTIPLE_PARTITIONS_TAG,
bqfl.PartitionFiles.SINGLE_PARTITION_TAG))
multiple_partitions = partitioned_files[bqfl.PartitionFiles\
.MULTIPLE_PARTITIONS_TAG]
single_partition = partitioned_files[bqfl.PartitionFiles\
.SINGLE_PARTITION_TAG]
assert_that(multiple_partitions, equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(single_partition, equal_to(single_partition_result),
label='CheckSinglePartition')
def test_partition_files_dofn_size_split(self):
"""Force partitions to split based on max_partition_size"""
multiple_partitions_result = [('destination0', ['file0', 'file1', 'file2']),
('destination0', ['file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
with TestPipeline() as p:
destination_file_pairs = p | beam.Create(self._ELEMENTS)
partitioned_files = (
destination_file_pairs
| beam.ParDo(bqfl.PartitionFiles(150, 10))
.with_outputs(bqfl.PartitionFiles.MULTIPLE_PARTITIONS_TAG,
bqfl.PartitionFiles.SINGLE_PARTITION_TAG))
multiple_partitions = partitioned_files[bqfl.PartitionFiles\
.MULTIPLE_PARTITIONS_TAG]
single_partition = partitioned_files[bqfl.PartitionFiles\
.SINGLE_PARTITION_TAG]
assert_that(multiple_partitions, equal_to(multiple_partitions_result),
label='CheckMultiplePartitions')
assert_that(single_partition, equal_to(single_partition_result),
label='CheckSinglePartition')
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
def test_records_traverse_transform_with_mocks(self):
destination = 'project1:dataset1.table1'
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference
mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference
bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job
bq_client.jobs.Insert.return_value = result_job
transform = bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
coder=CustomRowCoder())
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner') as p:
outputs = p | beam.Create(_ELEMENTS) | transform
dest_files = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS]
dest_job = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS]
jobs = dest_job | "GetJobs" >> beam.Map(lambda x: x[1])
files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1][0])
destinations = (
dest_files
| "GetDests" >> beam.Map(
lambda x: (
bigquery_tools.get_hashable_destination(x[0]), x[1]))
| "GetUniques" >> combiners.Count.PerKey()
| "GetFinalDests" >>beam.Keys())
# All files exist
_ = (files | beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True))))
# One file per destination
assert_that(files | combiners.Count.Globally(),
equal_to([1]),
label='CountFiles')
assert_that(destinations,
equal_to([destination]),
label='CheckDestinations')
assert_that(jobs,
equal_to([job_reference]), label='CheckJobs')
@unittest.skipIf(sys.version_info[0] == 2,
'Mock pickling problems in Py 2')
@mock.patch('time.sleep')
def test_wait_for_job_completion(self, sleep_mock):
job_references = [bigquery_api.JobReference(),
bigquery_api.JobReference()]
job_references[0].projectId = 'project1'
job_references[0].jobId = 'jobId1'
job_references[1].projectId = 'project1'
job_references[1].jobId = 'jobId2'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
job_2_done = mock.Mock()
job_2_done.status.state = 'DONE'
job_2_done.status.errorResult = None
job_1_done = mock.Mock()
job_1_done.status.state = 'DONE'
job_1_done.status.errorResult = None
bq_client = mock.Mock()
bq_client.jobs.Get.side_effect = [
job_1_waiting,
job_2_done,
job_1_done,
job_2_done]
waiting_dofn = bqfl.WaitForBQJobs(bq_client)
dest_list = [(i, job) for i, job in enumerate(job_references)]
with TestPipeline('DirectRunner') as p:
references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
outputs = (p
| beam.Create([''])
| beam.ParDo(waiting_dofn, references))
assert_that(outputs,
equal_to(dest_list))
sleep_mock.assert_called_once()
@unittest.skipIf(sys.version_info[0] == 2,
'Mock pickling problems in Py 2')
@mock.patch('time.sleep')
def test_one_job_failed_after_waiting(self, sleep_mock):
job_references = [bigquery_api.JobReference(),
bigquery_api.JobReference()]
job_references[0].projectId = 'project1'
job_references[0].jobId = 'jobId1'
job_references[1].projectId = 'project1'
job_references[1].jobId = 'jobId2'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
job_2_done = mock.Mock()
job_2_done.status.state = 'DONE'
job_2_done.status.errorResult = None
job_1_error = mock.Mock()
job_1_error.status.state = 'DONE'
job_1_error.status.errorResult = 'Some problems happened'
bq_client = mock.Mock()
bq_client.jobs.Get.side_effect = [
job_1_waiting,
job_2_done,
job_1_error,
job_2_done]
waiting_dofn = bqfl.WaitForBQJobs(bq_client)
dest_list = [(i, job) for i, job in enumerate(job_references)]
with self.assertRaises(Exception):
with TestPipeline('DirectRunner') as p:
references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
_ = (p
| beam.Create([''])
| beam.ParDo(waiting_dofn, references))
sleep_mock.assert_called_once()
def test_multiple_partition_files(self):
destination = 'project1:dataset1.table1'
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = mock.Mock()
result_job.jobReference = job_reference
mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference
bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job
bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None
with TestPipeline('DirectRunner') as p:
outputs = (p
| beam.Create(_ELEMENTS)
| bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
coder=CustomRowCoder(),
max_file_size=45,
max_partition_size=80,
max_files_per_partition=2))
dest_files = outputs[
bqfl.BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS]
dest_load_jobs = outputs[
bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS]
dest_copy_jobs = outputs[
bqfl.BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS]
load_jobs = dest_load_jobs | "GetLoadJobs" >> beam.Map(lambda x: x[1])
copy_jobs = dest_copy_jobs | "GetCopyJobs" >> beam.Map(lambda x: x[1])
files = dest_files | "GetFiles" >> beam.Map(lambda x: x[1][0])
destinations = (
dest_files
| "GetDests" >> beam.Map(
lambda x: (
bigquery_tools.get_hashable_destination(x[0]), x[1]))
| "GetUniques" >> combiners.Count.PerKey()
| "GetFinalDests" >>beam.Keys())
# All files exist
_ = (files | beam.Map(
lambda x: hamcrest_assert(os.path.exists(x), is_(True))))
# One file per destination
assert_that(files | "CountFiles" >> combiners.Count.Globally(),
equal_to([6]),
label='CheckFileCount')
assert_that(destinations,
equal_to([destination]),
label='CheckDestinations')
assert_that(load_jobs | "CountLoadJobs" >> combiners.Count.Globally(),
equal_to([6]), label='CheckLoadJobCount')
assert_that(copy_jobs | "CountCopyJobs" >> combiners.Count.Globally(),
equal_to([6]), label='CheckCopyJobCount')
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class BigQueryFileLoadsIT(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_file_loads_'
BIG_QUERY_SCHEMA = (
'{"fields": [{"name": "name","type": "STRING"},'
'{"name": "language","type": "STRING"}]}'
)
BIG_QUERY_SCHEMA_2 = (
'{"fields": [{"name": "name","type": "STRING"},'
'{"name": "foundation","type": "STRING"}]}'
)
BIG_QUERY_STREAMING_SCHEMA = (
{'fields': [{'name': 'Integr', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
)
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
str(int(time.time())),
random.randint(0, 10000))
self.bigquery_client = bigquery_tools.BigQueryWrapper()
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)
_LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)
@attr('IT')
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
output_table_3 = '%s%s' % (self.output_table, 3)
output_table_4 = '%s%s' % (self.output_table, 4)
schema1 = bigquery.WriteToBigQuery.get_dict_table_schema(
bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA))
schema2 = bigquery.WriteToBigQuery.get_dict_table_schema(
bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA_2))
schema_kv_pairs = [(output_table_1, schema1),
(output_table_2, schema2),
(output_table_3, schema1),
(output_table_4, schema2)]
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_3,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_4,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=all_of(*pipeline_verifiers),
experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)
schema_map_pcv = beam.pvalue.AsDict(
p | "MakeSchemas" >> beam.Create(schema_kv_pairs))
table_record_pcv = beam.pvalue.AsDict(
p | "MakeTables" >> beam.Create([('table1', output_table_1),
('table2', output_table_2)]))
# Get all input in same machine
input = (input
| beam.Map(lambda x: (None, x))
| beam.GroupByKey()
| beam.FlatMap(lambda elm: elm[1]))
_ = (input |
"WriteWithMultipleDestsFreely" >> bigquery.WriteToBigQuery(
table=lambda x, tables: (tables['table1']
if 'language' in x
else tables['table2']),
table_side_inputs=(table_record_pcv,),
schema=lambda dest, schema_map: schema_map.get(dest, None),
schema_side_inputs=(schema_map_pcv,),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
_ = (input |
"WriteWithMultipleDests" >> bigquery.WriteToBigQuery(
table=lambda x: (output_table_3
if 'language' in x
else output_table_4),
schema=lambda dest, schema_map: schema_map.get(dest, None),
schema_side_inputs=(schema_map_pcv,),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
max_file_size=20,
max_files_per_bundle=-1))
@attr('IT')
def test_bqfl_streaming(self):
if isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest("TestStream is not supported on TestDataflowRunner")
output_table = '%s_%s' % (self.output_table, 'ints')
_SIZE = 100
schema = self.BIG_QUERY_STREAMING_SCHEMA
l = [{'Integr': i} for i in range(_SIZE)]
state_matcher = PipelineStateMatcher(PipelineState.RUNNING)
bq_matcher = BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT Integr FROM %s"
% output_table,
data=[(i,) for i in range(100)])
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=all_of(state_matcher, bq_matcher),
experiments='use_beam_bq_sink',
streaming=True)
with beam.Pipeline(argv=args) as p:
stream_source = (TestStream()
.advance_watermark_to(0)
.advance_processing_time(100)
.add_elements(l[:_SIZE//4])
.advance_processing_time(100)
.advance_watermark_to(100)
.add_elements(l[_SIZE//4:2*_SIZE//4])
.advance_processing_time(100)
.advance_watermark_to(200)
.add_elements(l[2*_SIZE//4:3*_SIZE//4])
.advance_processing_time(100)
.advance_watermark_to(300)
.add_elements(l[3*_SIZE//4:])
.advance_processing_time(100)
.advance_watermark_to_infinity())
_ = (p
| stream_source
| bigquery.WriteToBigQuery(output_table,
schema=schema,
method=bigquery.WriteToBigQuery \
.Method.FILE_LOADS,
triggering_frequency=100))
@attr('IT')
def test_one_job_fails_all_jobs_fail(self):
# If one of the import jobs fails, then other jobs must not be performed.
# This is to avoid reinsertion of some records when a pipeline fails and
# is rerun.
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
self.bigquery_client.get_or_create_table(
self.project, self.dataset_id, output_table_1.split('.')[1],
bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA),
None, None)
self.bigquery_client.get_or_create_table(
self.project, self.dataset_id, output_table_2.split('.')[1],
bigquery_tools.parse_table_schema_from_json(self.BIG_QUERY_SCHEMA_2),
None, None)
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, language FROM %s" % output_table_1,
data=[]),
BigqueryFullResultMatcher(
project=self.project,
query="SELECT name, foundation FROM %s" % output_table_2,
data=[])]
args = self.test_pipeline.get_full_options_as_args(
experiments='use_beam_bq_sink')
with self.assertRaises(Exception):
with beam.Pipeline(argv=args) as p:
input = p | beam.Create(_ELEMENTS)
input2 = p | "Broken record" >> beam.Create(['language_broken_record'])
input = (input, input2) | beam.Flatten()
_ = (input |
"WriteWithMultipleDests" >> bigquery.WriteToBigQuery(
table=lambda x: (output_table_1
if 'language' in x
else output_table_2),
create_disposition=(
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
hamcrest_assert(p, all_of(*pipeline_verifiers))
def tearDown(self):
request = bigquery_api.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
_LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
_LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()