Merge pull request #8719 from tvalentyn/cp_pr_8621
[BEAM-7439] Cherry-pick PR/8621 to the release branch.
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index caf8101..3dd3912 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -23,9 +23,7 @@
import base64
import datetime
import logging
-import os
import random
-import sys
import time
import unittest
@@ -194,10 +192,6 @@
'No encryption configuration found: %s' % table)
self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-6769')
@attr('IT')
def test_big_query_new_types(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index fb272f7..bfcbb6b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -33,6 +33,7 @@
from nose.plugins.attrib import attr
import apache_beam as beam
+from apache_beam import coders
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery_file_loads as bqfl
from apache_beam.io.gcp import bigquery
@@ -83,6 +84,23 @@
_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
+class CustomRowCoder(coders.Coder):
+ """
+ Custom row coder that also expects strings as input data when encoding
+ """
+
+ def __init__(self):
+ self.coder = bigquery_tools.RowAsDictJsonCoder()
+
+ def encode(self, table_row):
+ if type(table_row) == str:
+ table_row = json.loads(table_row)
+ return self.coder.encode(table_row)
+
+ def decode(self, encoded_table_row):
+ return self.coder.decode(encoded_table_row)
+
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
maxDiff = None
@@ -104,7 +122,7 @@
def test_files_created(self):
"""Test that the files are created and written."""
- fn = bqfl.WriteRecordsToFile()
+ fn = bqfl.WriteRecordsToFile(coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_files_created(output_pcs):
@@ -133,7 +151,7 @@
file length is very small, so only a couple records fit in each file.
"""
- fn = bqfl.WriteRecordsToFile(max_file_size=50)
+ fn = bqfl.WriteRecordsToFile(max_file_size=50, coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
@@ -163,12 +181,13 @@
def test_records_are_spilled(self):
"""Forces records to be written to many files.
- For each destination multiple files are necessary, and at most two files can
- be created. This forces records to be spilled to the next stage of
+ For each destination multiple files are necessary, and at most two files
+ can be created. This forces records to be spilled to the next stage of
processing.
"""
- fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2)
+ fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2,
+ coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_many_files(output_pcs):
@@ -222,7 +241,7 @@
def test_files_are_created(self):
"""Test that the files are created and written."""
- fn = bqfl.WriteGroupedRecordsToFile()
+ fn = bqfl.WriteGroupedRecordsToFile(coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_files_created(output_pc):
@@ -249,7 +268,8 @@
For each destination multiple files are necessary. This is because the max
file length is very small, so only a couple records fit in each file.
"""
- fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50)
+ fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50,
+ coder=CustomRowCoder())
self.tmpdir = self._new_tempdir()
def check_multiple_files(output_pc):
@@ -296,7 +316,8 @@
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
- validate=False)
+ validate=False,
+ coder=CustomRowCoder())
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner') as p:
@@ -379,25 +400,25 @@
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_1,
+ query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_2,
+ query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_3,
+ query="SELECT name, language FROM %s" % output_table_3,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_4,
+ query="SELECT name, foundation FROM %s" % output_table_4,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
@@ -466,11 +487,11 @@
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_1,
+ query="SELECT name, language FROM %s" % output_table_1,
data=[]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_2,
+ query="SELECT name, foundation FROM %s" % output_table_2,
data=[])]
args = self.test_pipeline.get_full_options_as_args(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
new file mode 100644
index 0000000..39e0c36
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery sources and sinks."""
+from __future__ import absolute_import
+
+import base64
+import logging
+import random
+import time
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class BigQueryReadIntegrationTests(unittest.TestCase):
+ BIG_QUERY_DATASET_ID = 'python_read_table_'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ self.project = self.test_pipeline.get_option('project')
+
+ self.bigquery_client = BigQueryWrapper()
+ self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+ str(int(time.time())),
+ random.randint(0, 10000))
+ self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+ logging.info("Created dataset %s in project %s",
+ self.dataset_id, self.project)
+
+ def tearDown(self):
+ request = bigquery.BigqueryDatasetsDeleteRequest(
+ projectId=self.project, datasetId=self.dataset_id,
+ deleteContents=True)
+ try:
+ logging.info("Deleting dataset %s in project %s",
+ self.dataset_id, self.project)
+ self.bigquery_client.client.datasets.Delete(request)
+ except HttpError:
+ logging.debug('Failed to clean up dataset %s in project %s',
+ self.dataset_id, self.project)
+
+ def create_table(self, tablename):
+ table_schema = bigquery.TableSchema()
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'number'
+ table_field.type = 'INTEGER'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'str'
+ table_field.type = 'STRING'
+ table_schema.fields.append(table_field)
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=self.project,
+ datasetId=self.dataset_id,
+ tableId=tablename),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=self.project, datasetId=self.dataset_id, table=table)
+ self.bigquery_client.client.tables.Insert(request)
+ table_data = [
+ {'number': 1, 'str': 'abc'},
+ {'number': 2, 'str': 'def'}
+ ]
+ self.bigquery_client.insert_rows(
+ self.project, self.dataset_id, tablename, table_data)
+
+ def create_table_new_types(self, table_name):
+ table_schema = bigquery.TableSchema()
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'bytes'
+ table_field.type = 'BYTES'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'date'
+ table_field.type = 'DATE'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'time'
+ table_field.type = 'TIME'
+ table_schema.fields.append(table_field)
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=self.project,
+ datasetId=self.dataset_id,
+ tableId=table_name),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=self.project, datasetId=self.dataset_id, table=table)
+ self.bigquery_client.client.tables.Insert(request)
+ table_data = [
+ {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+ {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+ {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+ 'time': '23:59:59'},
+ {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+ ]
+ # bigquery client expects base64 encoded bytes
+ for row in table_data:
+ row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
+ self.bigquery_client.insert_rows(
+ self.project, self.dataset_id, table_name, table_data)
+
+ @attr('IT')
+ def test_big_query_read(self):
+ table_name = 'python_write_table'
+ self.create_table(table_name)
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ with beam.Pipeline(argv=args) as p:
+ result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
+ query='SELECT number, str FROM `%s`' % table_id,
+ use_standard_sql=True)))
+ assert_that(result, equal_to([{'number': 1, 'str': 'abc'},
+ {'number': 2, 'str': 'def'}]))
+
+ @attr('IT')
+ def test_big_query_read_new_types(self):
+ table_name = 'python_new_types_table'
+ self.create_table_new_types(table_name)
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ args = self.test_pipeline.get_full_options_as_args()
+
+ expected_data = [
+ {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+ {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+ {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+ 'time': '23:59:59'},
+ {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+ ]
+ # bigquery io returns bytes as base64 encoded values
+ for row in expected_data:
+ row['bytes'] = base64.b64encode(row['bytes'])
+
+ with beam.Pipeline(argv=args) as p:
+ result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
+ query='SELECT bytes, date, time FROM `%s`' % table_id,
+ use_standard_sql=True)))
+ assert_that(result, equal_to(expected_data))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 40c5f67..380a4aa 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -521,13 +521,13 @@
expected_properties=additional_bq_parameters),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_1,
+ query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_2,
+ query="SELECT name, language FROM %s" % output_table_2,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d])]
@@ -574,13 +574,13 @@
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_1,
+ query="SELECT name, language FROM %s" % output_table_1,
data=[(d['name'], d['language'])
for d in _ELEMENTS
if 'language' in d]),
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT * FROM %s" % output_table_2,
+ query="SELECT name, foundation FROM %s" % output_table_2,
data=[(d['name'], d['foundation'])
for d in _ELEMENTS
if 'foundation' in d])]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 1999838..0887329 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -925,6 +925,12 @@
if self.schema is None:
self.schema = schema
for row in rows:
+ # return base64 encoded bytes as byte type on python 3
+ # which matches the behavior of Beam Java SDK
+ for i in range(len(row.f)):
+ if self.schema.fields[i].type == 'BYTES':
+ row.f[i].v.string_value = row.f[i].v.string_value.encode('utf-8')
+
if self.row_as_dict:
yield self.client.convert_row_to_dict(row, schema)
else:
@@ -998,6 +1004,12 @@
# This code will catch this error to emit an error that explains
# to the programmer that they have used NAN/INF values.
try:
+ # on python 3 base64-encoded bytes are decoded to strings
+ # before being send to bq
+ if sys.version_info[0] > 2:
+ for field, value in iteritems(table_row):
+ if type(value) == bytes:
+ table_row[field] = value.decode('utf-8')
return json.dumps(
table_row, allow_nan=False, default=default_encoder).encode('utf-8')
except ValueError as e:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
new file mode 100644
index 0000000..43c81f5
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -0,0 +1,251 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery sources and sinks."""
+from __future__ import absolute_import
+
+import base64
+import datetime
+import logging
+import random
+import time
+import unittest
+
+import hamcrest as hc
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class BigQueryWriteIntegrationTests(unittest.TestCase):
+ BIG_QUERY_DATASET_ID = 'python_write_to_table_'
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ self.project = self.test_pipeline.get_option('project')
+
+ self.bigquery_client = BigQueryWrapper()
+ self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+ str(int(time.time())),
+ random.randint(0, 10000))
+ self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+ logging.info("Created dataset %s in project %s",
+ self.dataset_id, self.project)
+
+ def tearDown(self):
+ request = bigquery.BigqueryDatasetsDeleteRequest(
+ projectId=self.project, datasetId=self.dataset_id,
+ deleteContents=True)
+ try:
+ logging.info("Deleting dataset %s in project %s",
+ self.dataset_id, self.project)
+ self.bigquery_client.client.datasets.Delete(request)
+ except HttpError:
+ logging.debug('Failed to clean up dataset %s in project %s',
+ self.dataset_id, self.project)
+
+ def create_table(self, table_name):
+ table_schema = bigquery.TableSchema()
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'bytes'
+ table_field.type = 'BYTES'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'date'
+ table_field.type = 'DATE'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'time'
+ table_field.type = 'TIME'
+ table_schema.fields.append(table_field)
+ table = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId=self.project,
+ datasetId=self.dataset_id,
+ tableId=table_name),
+ schema=table_schema)
+ request = bigquery.BigqueryTablesInsertRequest(
+ projectId=self.project, datasetId=self.dataset_id, table=table)
+ self.bigquery_client.client.tables.Insert(request)
+
+ @attr('IT')
+ def test_big_query_write(self):
+ table_name = 'python_write_table'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {'number': 1, 'str': 'abc'},
+ {'number': 2, 'str': 'def'},
+ ]
+ table_schema = {"fields": [
+ {"name": "number", "type": "INTEGER"},
+ {"name": "str", "type": "STRING"}]}
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT number, str FROM %s" % table_id,
+ data=[(1, 'abc',), (2, 'def',)])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ (p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ schema=table_schema,
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+ @attr('IT')
+ def test_big_query_write_schema_autodetect(self):
+ if self.runner_name == 'TestDataflowRunner':
+ self.skipTest('DataflowRunner does not support schema autodetection')
+
+ table_name = 'python_write_table'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {'number': 1, 'str': 'abc'},
+ {'number': 2, 'str': 'def'},
+ ]
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT number, str FROM %s" % table_id,
+ data=[(1, 'abc',), (2, 'def',)])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers),
+ experiments='use_beam_bq_sink')
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ (p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
+ schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+ @attr('IT')
+ def test_big_query_write_new_types(self):
+ table_name = 'python_new_types_table'
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+ {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+ {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+ 'time': '23:59:59'},
+ {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+ ]
+ # bigquery io expects bytes to be base64 encoded values
+ for row in input_data:
+ row['bytes'] = base64.b64encode(row['bytes'])
+
+ table_schema = {"fields": [
+ {"name": "bytes", "type": "BYTES"},
+ {"name": "date", "type": "DATE"},
+ {"name": "time", "type": "TIME"}]}
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT bytes, date, time FROM %s" % table_id,
+ data=[(b'xyw', datetime.date(2011, 1, 1),
+ datetime.time(23, 59, 59, 999999), ),
+ (b'abc', datetime.date(2000, 1, 1),
+ datetime.time(0, 0, 0), ),
+ (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
+ datetime.time(23, 59, 59), ),
+ (b'\xab\xac\xad', datetime.date(2000, 1, 1),
+ datetime.time(0, 0, 0), )])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ (p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ schema=table_schema,
+ create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+ @attr('IT')
+ def test_big_query_write_without_schema(self):
+ table_name = 'python_no_schema_table'
+ self.create_table(table_name)
+ table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+ input_data = [
+ {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+ {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+ {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+ 'time': '23:59:59'},
+ {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+ ]
+ # bigquery io expects bytes to be base64 encoded values
+ for row in input_data:
+ row['bytes'] = base64.b64encode(row['bytes'])
+
+ pipeline_verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT bytes, date, time FROM %s" % table_id,
+ data=[(b'xyw', datetime.date(2011, 1, 1),
+ datetime.time(23, 59, 59, 999999), ),
+ (b'abc', datetime.date(2000, 1, 1),
+ datetime.time(0, 0, 0), ),
+ (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
+ datetime.time(23, 59, 59), ),
+ (b'\xab\xac\xad', datetime.date(2000, 1, 1),
+ datetime.time(0, 0, 0), )])]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ # pylint: disable=expression-not-assigned
+ (p | 'create' >> beam.Create(input_data)
+ | 'write' >> beam.io.WriteToBigQuery(
+ table_id,
+ write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index a7c6230..ac22e5a 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -131,7 +131,7 @@
self.project = project
self.query = query
- self.expected_data = [sorted(i) for i in data]
+ self.expected_data = data
def _matches(self, _):
logging.info('Start verify Bigquery data.')
@@ -141,7 +141,7 @@
logging.info('Read from given query (%s), total rows %d',
self.query, len(response))
- self.actual_data = [sorted(i) for i in response]
+ self.actual_data = response
# Verify result
return sorted(self.expected_data) == sorted(self.actual_data)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ff058fc..7430afe 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -655,6 +655,9 @@
if (not isinstance(transform, beam.io.WriteToBigQuery)
or 'use_beam_bq_sink' in experiments):
return self.apply_PTransform(transform, pcoll, options)
+ if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
+ raise RuntimeError(
+ 'Schema auto-detection is not supported on the native sink')
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
if (transform.write_disposition ==
@@ -663,12 +666,15 @@
return self.apply_PTransform(transform, pcoll, options)
else:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+ schema = None
+ if transform.schema:
+ schema = parse_table_schema_from_json(json.dumps(transform.schema))
return pcoll | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
transform.table_reference.tableId,
transform.table_reference.datasetId,
transform.table_reference.projectId,
- parse_table_schema_from_json(json.dumps(transform.schema)),
+ schema,
transform.create_disposition,
transform.write_disposition,
kms_key=transform.kms_key))
diff --git a/sdks/python/test-suites/direct/py35/build.gradle b/sdks/python/test-suites/direct/py35/build.gradle
index 56b77a3..f4dbb38 100644
--- a/sdks/python/test-suites/direct/py35/build.gradle
+++ b/sdks/python/test-suites/direct/py35/build.gradle
@@ -32,6 +32,8 @@
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
"apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
"apache_beam.io.gcp.bigquery_io_read_it_test",
+ "apache_beam.io.gcp.bigquery_read_it_test",
+ "apache_beam.io.gcp.bigquery_write_it_test",
"apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
]
def testOpts = [
diff --git a/sdks/python/test-suites/direct/py36/build.gradle b/sdks/python/test-suites/direct/py36/build.gradle
index cc55c5a..2be9d00 100644
--- a/sdks/python/test-suites/direct/py36/build.gradle
+++ b/sdks/python/test-suites/direct/py36/build.gradle
@@ -32,6 +32,8 @@
"apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
"apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
"apache_beam.io.gcp.bigquery_io_read_it_test",
+ "apache_beam.io.gcp.bigquery_read_it_test",
+ "apache_beam.io.gcp.bigquery_write_it_test",
]
def testOpts = [
"--tests=${batchTests.join(',')}",