Merge pull request #8719 from tvalentyn/cp_pr_8621

[BEAM-7439] Cherry-pick PR/8621 to the release branch.
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index caf8101..3dd3912 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -23,9 +23,7 @@
 import base64
 import datetime
 import logging
-import os
 import random
-import sys
 import time
 import unittest
 
@@ -194,10 +192,6 @@
         'No encryption configuration found: %s' % table)
     self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName)
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3'
-                   'TODO: BEAM-6769')
   @attr('IT')
   def test_big_query_new_types(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index fb272f7..bfcbb6b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -33,6 +33,7 @@
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
+from apache_beam import coders
 from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
 from apache_beam.io.gcp import bigquery_file_loads as bqfl
 from apache_beam.io.gcp import bigquery
@@ -83,6 +84,23 @@
 _ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
 
 
+class CustomRowCoder(coders.Coder):
+  """
+  Custom row coder that also expects strings as input data when encoding
+  """
+
+  def __init__(self):
+    self.coder = bigquery_tools.RowAsDictJsonCoder()
+
+  def encode(self, table_row):
+    if type(table_row) == str:
+      table_row = json.loads(table_row)
+    return self.coder.encode(table_row)
+
+  def decode(self, encoded_table_row):
+    return self.coder.decode(encoded_table_row)
+
+
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
   maxDiff = None
@@ -104,7 +122,7 @@
   def test_files_created(self):
     """Test that the files are created and written."""
 
-    fn = bqfl.WriteRecordsToFile()
+    fn = bqfl.WriteRecordsToFile(coder=CustomRowCoder())
     self.tmpdir = self._new_tempdir()
 
     def check_files_created(output_pcs):
@@ -133,7 +151,7 @@
     file length is very small, so only a couple records fit in each file.
     """
 
-    fn = bqfl.WriteRecordsToFile(max_file_size=50)
+    fn = bqfl.WriteRecordsToFile(max_file_size=50, coder=CustomRowCoder())
     self.tmpdir = self._new_tempdir()
 
     def check_many_files(output_pcs):
@@ -163,12 +181,13 @@
   def test_records_are_spilled(self):
     """Forces records to be written to many files.
 
-    For each destination multiple files are necessary, and at most two files can
-    be created. This forces records to be spilled to the next stage of
+    For each destination multiple files are necessary, and at most two files
+    can be created. This forces records to be spilled to the next stage of
     processing.
     """
 
-    fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2)
+    fn = bqfl.WriteRecordsToFile(max_files_per_bundle=2,
+                                 coder=CustomRowCoder())
     self.tmpdir = self._new_tempdir()
 
     def check_many_files(output_pcs):
@@ -222,7 +241,7 @@
   def test_files_are_created(self):
     """Test that the files are created and written."""
 
-    fn = bqfl.WriteGroupedRecordsToFile()
+    fn = bqfl.WriteGroupedRecordsToFile(coder=CustomRowCoder())
     self.tmpdir = self._new_tempdir()
 
     def check_files_created(output_pc):
@@ -249,7 +268,8 @@
     For each destination multiple files are necessary. This is because the max
     file length is very small, so only a couple records fit in each file.
     """
-    fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50)
+    fn = bqfl.WriteGroupedRecordsToFile(max_file_size=50,
+                                        coder=CustomRowCoder())
     self.tmpdir = self._new_tempdir()
 
     def check_multiple_files(output_pc):
@@ -296,7 +316,8 @@
         destination,
         custom_gcs_temp_location=self._new_tempdir(),
         test_client=bq_client,
-        validate=False)
+        validate=False,
+        coder=CustomRowCoder())
 
     # Need to test this with the DirectRunner to avoid serializing mocks
     with TestPipeline('DirectRunner') as p:
@@ -379,25 +400,25 @@
     pipeline_verifiers = [
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_1,
+            query="SELECT name, language FROM %s" % output_table_1,
             data=[(d['name'], d['language'])
                   for d in _ELEMENTS
                   if 'language' in d]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_2,
+            query="SELECT name, foundation FROM %s" % output_table_2,
             data=[(d['name'], d['foundation'])
                   for d in _ELEMENTS
                   if 'foundation' in d]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_3,
+            query="SELECT name, language FROM %s" % output_table_3,
             data=[(d['name'], d['language'])
                   for d in _ELEMENTS
                   if 'language' in d]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_4,
+            query="SELECT name, foundation FROM %s" % output_table_4,
             data=[(d['name'], d['foundation'])
                   for d in _ELEMENTS
                   if 'foundation' in d])]
@@ -466,11 +487,11 @@
     pipeline_verifiers = [
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_1,
+            query="SELECT name, language FROM %s" % output_table_1,
             data=[]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_2,
+            query="SELECT name, foundation FROM %s" % output_table_2,
             data=[])]
 
     args = self.test_pipeline.get_full_options_as_args(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
new file mode 100644
index 0000000..39e0c36
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery sources and sinks."""
+from __future__ import absolute_import
+
+import base64
+import logging
+import random
+import time
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class BigQueryReadIntegrationTests(unittest.TestCase):
+  BIG_QUERY_DATASET_ID = 'python_read_table_'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.bigquery_client = BigQueryWrapper()
+    self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+                                  str(int(time.time())),
+                                  random.randint(0, 10000))
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    logging.info("Created dataset %s in project %s",
+                 self.dataset_id, self.project)
+
+  def tearDown(self):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id,
+        deleteContents=True)
+    try:
+      logging.info("Deleting dataset %s in project %s",
+                   self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      logging.debug('Failed to clean up dataset %s in project %s',
+                    self.dataset_id, self.project)
+
+  def create_table(self, tablename):
+    table_schema = bigquery.TableSchema()
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'number'
+    table_field.type = 'INTEGER'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'str'
+    table_field.type = 'STRING'
+    table_schema.fields.append(table_field)
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=self.project,
+            datasetId=self.dataset_id,
+            tableId=tablename),
+        schema=table_schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=self.project, datasetId=self.dataset_id, table=table)
+    self.bigquery_client.client.tables.Insert(request)
+    table_data = [
+        {'number': 1, 'str': 'abc'},
+        {'number': 2, 'str': 'def'}
+    ]
+    self.bigquery_client.insert_rows(
+        self.project, self.dataset_id, tablename, table_data)
+
+  def create_table_new_types(self, table_name):
+    table_schema = bigquery.TableSchema()
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'bytes'
+    table_field.type = 'BYTES'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'date'
+    table_field.type = 'DATE'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'time'
+    table_field.type = 'TIME'
+    table_schema.fields.append(table_field)
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=self.project,
+            datasetId=self.dataset_id,
+            tableId=table_name),
+        schema=table_schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=self.project, datasetId=self.dataset_id, table=table)
+    self.bigquery_client.client.tables.Insert(request)
+    table_data = [
+        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+         'time': '23:59:59'},
+        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+    ]
+    # bigquery client expects base64 encoded bytes
+    for row in table_data:
+      row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
+    self.bigquery_client.insert_rows(
+        self.project, self.dataset_id, table_name, table_data)
+
+  @attr('IT')
+  def test_big_query_read(self):
+    table_name = 'python_write_table'
+    self.create_table(table_name)
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
+          query='SELECT number, str FROM `%s`' % table_id,
+          use_standard_sql=True)))
+      assert_that(result, equal_to([{'number': 1, 'str': 'abc'},
+                                    {'number': 2, 'str': 'def'}]))
+
+  @attr('IT')
+  def test_big_query_read_new_types(self):
+    table_name = 'python_new_types_table'
+    self.create_table_new_types(table_name)
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    args = self.test_pipeline.get_full_options_as_args()
+
+    expected_data = [
+        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+         'time': '23:59:59'},
+        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+    ]
+    # bigquery io returns bytes as base64 encoded values
+    for row in expected_data:
+      row['bytes'] = base64.b64encode(row['bytes'])
+
+    with beam.Pipeline(argv=args) as p:
+      result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
+          query='SELECT bytes, date, time FROM `%s`' % table_id,
+          use_standard_sql=True)))
+      assert_that(result, equal_to(expected_data))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 40c5f67..380a4aa 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -521,13 +521,13 @@
             expected_properties=additional_bq_parameters),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_1,
+            query="SELECT name, language FROM %s" % output_table_1,
             data=[(d['name'], d['language'])
                   for d in _ELEMENTS
                   if 'language' in d]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_2,
+            query="SELECT name, language FROM %s" % output_table_2,
             data=[(d['name'], d['language'])
                   for d in _ELEMENTS
                   if 'language' in d])]
@@ -574,13 +574,13 @@
     pipeline_verifiers = [
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_1,
+            query="SELECT name, language FROM %s" % output_table_1,
             data=[(d['name'], d['language'])
                   for d in _ELEMENTS
                   if 'language' in d]),
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT * FROM %s" % output_table_2,
+            query="SELECT name, foundation FROM %s" % output_table_2,
             data=[(d['name'], d['foundation'])
                   for d in _ELEMENTS
                   if 'foundation' in d])]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 1999838..0887329 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -925,6 +925,12 @@
       if self.schema is None:
         self.schema = schema
       for row in rows:
+        # return base64 encoded bytes as byte type on python 3
+        # which matches the behavior of Beam Java SDK
+        for i in range(len(row.f)):
+          if self.schema.fields[i].type == 'BYTES':
+            row.f[i].v.string_value = row.f[i].v.string_value.encode('utf-8')
+
         if self.row_as_dict:
           yield self.client.convert_row_to_dict(row, schema)
         else:
@@ -998,6 +1004,12 @@
     # This code will catch this error to emit an error that explains
     # to the programmer that they have used NAN/INF values.
     try:
+      # on python 3 base64-encoded bytes are decoded to strings
+      # before being send to bq
+      if sys.version_info[0] > 2:
+        for field, value in iteritems(table_row):
+          if type(value) == bytes:
+            table_row[field] = value.decode('utf-8')
       return json.dumps(
           table_row, allow_nan=False, default=default_encoder).encode('utf-8')
     except ValueError as e:
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
new file mode 100644
index 0000000..43c81f5
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -0,0 +1,251 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery sources and sinks."""
+from __future__ import absolute_import
+
+import base64
+import datetime
+import logging
+import random
+import time
+import unittest
+
+import hamcrest as hc
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class BigQueryWriteIntegrationTests(unittest.TestCase):
+  BIG_QUERY_DATASET_ID = 'python_write_to_table_'
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+
+    self.bigquery_client = BigQueryWrapper()
+    self.dataset_id = '%s%s%d' % (self.BIG_QUERY_DATASET_ID,
+                                  str(int(time.time())),
+                                  random.randint(0, 10000))
+    self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+    logging.info("Created dataset %s in project %s",
+                 self.dataset_id, self.project)
+
+  def tearDown(self):
+    request = bigquery.BigqueryDatasetsDeleteRequest(
+        projectId=self.project, datasetId=self.dataset_id,
+        deleteContents=True)
+    try:
+      logging.info("Deleting dataset %s in project %s",
+                   self.dataset_id, self.project)
+      self.bigquery_client.client.datasets.Delete(request)
+    except HttpError:
+      logging.debug('Failed to clean up dataset %s in project %s',
+                    self.dataset_id, self.project)
+
+  def create_table(self, table_name):
+    table_schema = bigquery.TableSchema()
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'bytes'
+    table_field.type = 'BYTES'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'date'
+    table_field.type = 'DATE'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'time'
+    table_field.type = 'TIME'
+    table_schema.fields.append(table_field)
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=self.project,
+            datasetId=self.dataset_id,
+            tableId=table_name),
+        schema=table_schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=self.project, datasetId=self.dataset_id, table=table)
+    self.bigquery_client.client.tables.Insert(request)
+
+  @attr('IT')
+  def test_big_query_write(self):
+    table_name = 'python_write_table'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {'number': 1, 'str': 'abc'},
+        {'number': 2, 'str': 'def'},
+    ]
+    table_schema = {"fields": [
+        {"name": "number", "type": "INTEGER"},
+        {"name": "str", "type": "STRING"}]}
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT number, str FROM %s" % table_id,
+            data=[(1, 'abc',), (2, 'def',)])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      (p | 'create' >> beam.Create(input_data)
+       | 'write' >> beam.io.WriteToBigQuery(
+           table_id,
+           schema=table_schema,
+           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+  @attr('IT')
+  def test_big_query_write_schema_autodetect(self):
+    if self.runner_name == 'TestDataflowRunner':
+      self.skipTest('DataflowRunner does not support schema autodetection')
+
+    table_name = 'python_write_table'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {'number': 1, 'str': 'abc'},
+        {'number': 2, 'str': 'def'},
+    ]
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT number, str FROM %s" % table_id,
+            data=[(1, 'abc',), (2, 'def',)])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers),
+        experiments='use_beam_bq_sink')
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      (p | 'create' >> beam.Create(input_data)
+       | 'write' >> beam.io.WriteToBigQuery(
+           table_id,
+           method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
+           schema=beam.io.gcp.bigquery.SCHEMA_AUTODETECT,
+           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+  @attr('IT')
+  def test_big_query_write_new_types(self):
+    table_name = 'python_new_types_table'
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+         'time': '23:59:59'},
+        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+    ]
+    # bigquery io expects bytes to be base64 encoded values
+    for row in input_data:
+      row['bytes'] = base64.b64encode(row['bytes'])
+
+    table_schema = {"fields": [
+        {"name": "bytes", "type": "BYTES"},
+        {"name": "date", "type": "DATE"},
+        {"name": "time", "type": "TIME"}]}
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT bytes, date, time FROM %s" % table_id,
+            data=[(b'xyw', datetime.date(2011, 1, 1),
+                   datetime.time(23, 59, 59, 999999), ),
+                  (b'abc', datetime.date(2000, 1, 1),
+                   datetime.time(0, 0, 0), ),
+                  (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
+                   datetime.time(23, 59, 59), ),
+                  (b'\xab\xac\xad', datetime.date(2000, 1, 1),
+                   datetime.time(0, 0, 0), )])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      (p | 'create' >> beam.Create(input_data)
+       | 'write' >> beam.io.WriteToBigQuery(
+           table_id,
+           schema=table_schema,
+           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+           write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
+
+  @attr('IT')
+  def test_big_query_write_without_schema(self):
+    table_name = 'python_no_schema_table'
+    self.create_table(table_name)
+    table_id = '{}.{}'.format(self.dataset_id, table_name)
+
+    input_data = [
+        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
+        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
+        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
+         'time': '23:59:59'},
+        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
+    ]
+    # bigquery io expects bytes to be base64 encoded values
+    for row in input_data:
+      row['bytes'] = base64.b64encode(row['bytes'])
+
+    pipeline_verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT bytes, date, time FROM %s" % table_id,
+            data=[(b'xyw', datetime.date(2011, 1, 1),
+                   datetime.time(23, 59, 59, 999999), ),
+                  (b'abc', datetime.date(2000, 1, 1),
+                   datetime.time(0, 0, 0), ),
+                  (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
+                   datetime.time(23, 59, 59), ),
+                  (b'\xab\xac\xad', datetime.date(2000, 1, 1),
+                   datetime.time(0, 0, 0), )])]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=hc.all_of(*pipeline_verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # pylint: disable=expression-not-assigned
+      (p | 'create' >> beam.Create(input_data)
+       | 'write' >> beam.io.WriteToBigQuery(
+           table_id,
+           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index a7c6230..ac22e5a 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -131,7 +131,7 @@
 
     self.project = project
     self.query = query
-    self.expected_data = [sorted(i) for i in data]
+    self.expected_data = data
 
   def _matches(self, _):
     logging.info('Start verify Bigquery data.')
@@ -141,7 +141,7 @@
     logging.info('Read from given query (%s), total rows %d',
                  self.query, len(response))
 
-    self.actual_data = [sorted(i) for i in response]
+    self.actual_data = response
 
     # Verify result
     return sorted(self.expected_data) == sorted(self.actual_data)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ff058fc..7430afe 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -655,6 +655,9 @@
     if (not isinstance(transform, beam.io.WriteToBigQuery)
         or 'use_beam_bq_sink' in experiments):
       return self.apply_PTransform(transform, pcoll, options)
+    if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
+      raise RuntimeError(
+          'Schema auto-detection is not supported on the native sink')
     standard_options = options.view_as(StandardOptions)
     if standard_options.streaming:
       if (transform.write_disposition ==
@@ -663,12 +666,15 @@
       return self.apply_PTransform(transform, pcoll, options)
     else:
       from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
+      schema = None
+      if transform.schema:
+        schema = parse_table_schema_from_json(json.dumps(transform.schema))
       return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
           beam.io.BigQuerySink(
               transform.table_reference.tableId,
               transform.table_reference.datasetId,
               transform.table_reference.projectId,
-              parse_table_schema_from_json(json.dumps(transform.schema)),
+              schema,
               transform.create_disposition,
               transform.write_disposition,
               kms_key=transform.kms_key))
diff --git a/sdks/python/test-suites/direct/py35/build.gradle b/sdks/python/test-suites/direct/py35/build.gradle
index 56b77a3..f4dbb38 100644
--- a/sdks/python/test-suites/direct/py35/build.gradle
+++ b/sdks/python/test-suites/direct/py35/build.gradle
@@ -32,6 +32,8 @@
         "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
         "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
         "apache_beam.io.gcp.bigquery_io_read_it_test",
+        "apache_beam.io.gcp.bigquery_read_it_test",
+        "apache_beam.io.gcp.bigquery_write_it_test",
         "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
     ]
     def testOpts = [
diff --git a/sdks/python/test-suites/direct/py36/build.gradle b/sdks/python/test-suites/direct/py36/build.gradle
index cc55c5a..2be9d00 100644
--- a/sdks/python/test-suites/direct/py36/build.gradle
+++ b/sdks/python/test-suites/direct/py36/build.gradle
@@ -32,6 +32,8 @@
         "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
         "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
         "apache_beam.io.gcp.bigquery_io_read_it_test",
+        "apache_beam.io.gcp.bigquery_read_it_test",
+        "apache_beam.io.gcp.bigquery_write_it_test",
     ]
     def testOpts = [
         "--tests=${batchTests.join(',')}",