Merge pull request #6564 from udim/pubsub-0-35-4

[BEAM-5513] Upgrade Python SDK to PubSub 0.35.4
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 6dc60d0..2fc19da 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -72,15 +72,15 @@
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -95,14 +95,15 @@
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -123,9 +124,9 @@
 
     # TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'fixed_window_duration': 1,
                   'user_activity_window_duration': 1,
                   'wait_until_finish_duration':
@@ -143,8 +144,6 @@
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index ab10942..e0e309b 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -73,15 +73,16 @@
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    unique_topic_name = self.INPUT_TOPIC + _unique_id
-    unique_subscrition_name = self.INPUT_SUB + _unique_id
-    self.input_topic = self.pubsub_client.topic(unique_topic_name)
-    self.input_sub = self.input_topic.subscription(unique_subscrition_name)
 
-    self.input_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic])
-    self.input_sub.create()
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
+
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project,
+                                          self.INPUT_SUB + _unique_id),
+        self.input_topic.name)
 
     # Set up BigQuery environment
     from google.cloud import bigquery
@@ -96,14 +97,15 @@
     """Inject game events as test data to PubSub."""
 
     logging.debug('Injecting %d game events to topic %s',
-                  message_count, topic.full_name)
+                  message_count, topic.name)
 
     for _ in range(message_count):
-      topic.publish(self.INPUT_EVENT % self._test_timestamp)
+      self.pub_client.publish(topic.name,
+                              self.INPUT_EVENT % self._test_timestamp)
 
   def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub])
-    test_utils.cleanup_topics([self.input_topic])
+    test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+    test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
   def _cleanup_dataset(self):
     self.dataset.delete()
@@ -131,9 +133,9 @@
                                         teams_query,
                                         self.DEFAULT_EXPECTED_CHECKSUM)
 
-    extra_opts = {'subscription': self.input_sub.full_name,
+    extra_opts = {'subscription': self.input_sub.name,
                   'dataset': self.dataset.name,
-                  'topic': self.input_topic.full_name,
+                  'topic': self.input_topic.name,
                   'team_window_duration': 1,
                   'wait_until_finish_duration':
                       self.WAIT_UNTIL_FINISH_DURATION,
@@ -151,8 +153,6 @@
                     self.dataset.name, self.OUTPUT_TABLE_TEAMS)
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_topic,
-                                               self.input_sub])
     self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 3c0cfa9..78e89a1 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -52,31 +52,31 @@
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
 
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+        self.output_topic.name)
 
   def _inject_numbers(self, topic, num_messages):
     """Inject numbers as test data to PubSub."""
-    logging.debug('Injecting %d numbers to topic %s',
-                  num_messages, topic.full_name)
+    logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
     for n in range(num_messages):
-      topic.publish(str(n))
-
-  def _cleanup_pubsub(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+      self.pub_client.publish(self.input_topic.name, str(n))
 
   def tearDown(self):
-    self._cleanup_pubsub()
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   @attr('IT')
   def test_streaming_wordcount_it(self):
@@ -86,17 +86,16 @@
     # Set extra options to the pipeline for test purpose
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
     pubsub_msg_verifier = PubSubMessageMatcher(self.project,
-                                               OUTPUT_SUB + self.uuid,
+                                               self.output_sub.name,
                                                expected_msg,
                                                timeout=400)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
 
     # Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 2414194..a1644ab 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,11 +38,10 @@
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.annotations import deprecated
 
-# The protobuf library is only used for running on Dataflow.
 try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
+  from google.cloud import pubsub
 except ImportError:
-  pubsub_pb2 = None
+  pubsub = None
 
 __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
            'WriteStringsToPubSub', 'WriteToPubSub']
@@ -92,7 +91,7 @@
     Returns:
       A new PubsubMessage object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
@@ -109,7 +108,7 @@
       https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
       containing the payload of this object.
     """
-    msg = pubsub_pb2.PubsubMessage()
+    msg = pubsub.types.pubsub_pb2.PubsubMessage()
     msg.data = self.data
     for key, value in self.attributes.iteritems():
       msg.attributes[key] = value
@@ -117,9 +116,9 @@
 
   @staticmethod
   def _from_message(msg):
-    """Construct from ``google.cloud.pubsub.message.Message``.
+    """Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.
 
-    https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html
+    https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 9bb81fc..5b060e5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -100,21 +100,25 @@
 
     # Set up PubSub environment.
     from google.cloud import pubsub
-    self.pubsub_client = pubsub.Client(project=self.project)
-    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
-    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
-    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
-    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
+    self.pub_client = pubsub.PublisherClient()
+    self.input_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+    self.output_topic = self.pub_client.create_topic(
+        self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
 
-    self.input_topic.create()
-    self.output_topic.create()
-    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
-    self.input_sub.create()
-    self.output_sub.create()
+    self.sub_client = pubsub.SubscriberClient()
+    self.input_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+        self.input_topic.name)
+    self.output_sub = self.sub_client.create_subscription(
+        self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+        self.output_topic.name)
 
   def tearDown(self):
-    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
-    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+    test_utils.cleanup_subscriptions(self.sub_client,
+                                     [self.input_sub, self.output_sub])
+    test_utils.cleanup_topics(self.pub_client,
+                              [self.input_topic, self.output_topic])
 
   def _test_streaming(self, with_attributes):
     """Runs IT pipeline with message verifier.
@@ -139,21 +143,20 @@
       strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
     pubsub_msg_verifier = PubSubMessageMatcher(
         self.project,
-        OUTPUT_SUB + self.uuid,
+        self.output_sub.name,
         expected_messages,
         timeout=MESSAGE_MATCHER_TIMEOUT_S,
         with_attributes=with_attributes,
         strip_attributes=strip_attributes)
-    extra_opts = {'input_subscription': self.input_sub.full_name,
-                  'output_topic': self.output_topic.full_name,
+    extra_opts = {'input_subscription': self.input_sub.name,
+                  'output_topic': self.output_topic.name,
                   'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
     # Generate input data and inject to PubSub.
-    test_utils.wait_for_subscriptions_created([self.input_sub])
     for msg in self.INPUT_MESSAGES[self.runner_name]:
-      self.input_topic.publish(msg.data, **msg.attributes)
+      self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes)
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 6e19950..a95ffc6 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -20,12 +20,9 @@
 
 from __future__ import absolute_import
 
-import functools
 import logging
 import unittest
 from builtins import object
-from builtins import range
-from builtins import zip
 
 import hamcrest as hc
 import mock
@@ -43,6 +40,7 @@
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
 from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
+from apache_beam.testing import test_utils
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import TestWindowedValue
 from apache_beam.testing.util import assert_that
@@ -54,18 +52,10 @@
 from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-# The protobuf library is only used for running on Dataflow.
-try:
-  from google.cloud.proto.pubsub.v1 import pubsub_pb2
-except ImportError:
-  pubsub_pb2 = None
 
 
 class TestPubsubMessage(unittest.TestCase):
@@ -81,8 +71,7 @@
     with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
       _ = PubsubMessage(None, {})
 
-  @unittest.skipIf(pubsub_pb2 is None,
-                   'PubSub proto dependencies are not installed')
+  @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
   def test_proto_conversion(self):
     data = 'data'
     attributes = {'k1': 'v1', 'k2': 'v2'}
@@ -220,7 +209,7 @@
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
 
   def test_expand(self):
     p = TestPipeline()
@@ -240,7 +229,7 @@
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
-    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
     self.assertEqual(True, write_transform.dofn.with_attributes)
     # TODO(BEAM-4275): These properties aren't supported yet in direct runner.
     self.assertEqual(None, write_transform.dofn.id_label)
@@ -333,118 +322,25 @@
 }
 
 
-class FakePubsubTopic(object):
-
-  def __init__(self, name, client):
-    self.name = name
-    self.client = client
-
-  def subscription(self, name):
-    return FakePubsubSubscription(name, self.name, self.client)
-
-  def batch(self):
-    if self.client.batch is None:
-      self.client.batch = FakeBatch(self.client)
-    return self.client.batch
-
-
-class FakePubsubSubscription(object):
-
-  def __init__(self, name, topic, client):
-    self.name = name
-    self.topic = topic
-    self.client = client
-
-  def create(self):
-    pass
-
-
-class FakeAutoAck(object):
-
-  def __init__(self, sub, **unused_kwargs):
-    self.sub = sub
-
-  def __enter__(self):
-    messages = self.sub.client.messages_read
-    self.ack_id_to_msg = dict(zip(range(len(messages)), messages))
-    return self.ack_id_to_msg
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    pass
-
-
-class FakeBatch(object):
-  """Context manager that accept Pubsub client writes via publish().
-
-  Verifies writes on exit.
-  """
-
-  def __init__(self, client):
-    self.client = client
-    self.published = []
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exc_type, exc_val, exc_tb):
-    if exc_type is not None:
-      return  # Exception will be raised.
-    hc.assert_that(self.published,
-                   hc.only_contains(*self.client.messages_write))
-
-  def publish(self, message, **attrs):
-    self.published.append([message, attrs])
-
-
-class FakePubsubClient(object):
-
-  def __init__(self, messages_read=None, messages_write=None, project=None,
-               **unused_kwargs):
-    """Creates a Pubsub client fake.
-
-    Args:
-      messages_read: List of PubsubMessage objects to return.
-      messages_write: List of [data, attributes] pairs, corresponding to
-        messages expected to be written to the client.
-      project: Name of GCP project.
-    """
-    self.messages_read = messages_read
-    self.messages_write = messages_write
-    self.project = project
-    self.batch = None
-
-  def topic(self, name):
-    return FakePubsubTopic(name, self)
-
-
-def create_client_message(data, message_id, attributes, publish_time):
-  """Returns a message as it would be returned from Cloud Pub/Sub client.
-
-  This is what the reader sees.
-  """
-  msg = pubsub.message.Message(data, message_id, attributes)
-  msg._service_timestamp = publish_time
-  return msg
-
-
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class TestReadFromPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
-    publish_time = '2018-03-12T13:37:01.234567Z'
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     attributes = {'key': 'value'}
-    payloads = [create_client_message(
-        data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(PubsubMessage(data, attributes),
                           timestamp.Timestamp(1520861821.234567),
                           [window.GlobalWindow()])]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -453,17 +349,18 @@
                               None, None, with_attributes=True))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_strings_success(self, mock_pubsub):
     data = u'🤷 ¯\\_(ツ)_/¯'
     data_encoded = data.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)
+    ])
     expected_elements = [data]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -472,16 +369,16 @@
                                      None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_data_success(self, mock_pubsub):
     data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
     expected_elements = [data_encoded]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -489,24 +386,26 @@
              | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
     assert_that(pcoll, equal_to(expected_elements))
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp(micros=int(attributes['time']) * 1000),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -516,24 +415,26 @@
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    publish_time_secs = 1337000000
+    publish_time_nanos = 133700000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(attributes['time']),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -543,24 +444,27 @@
                  with_attributes=True, timestamp_attribute='time'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {}
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
     publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
     expected_elements = [
         TestWindowedValue(
             PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(publish_time),
             [window.GlobalWindow()]),
     ]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -570,18 +474,20 @@
                  with_attributes=True, timestamp_attribute='nonexistent'))
     assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
+    mock_pubsub.return_value.acknowledge.assert_has_calls([
+        mock.call(mock.ANY, [ack_id])])
 
-  @mock.patch('google.cloud.pubsub')
   def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
     data = 'data'
-    message_id = 'message_id'
     attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
+    publish_time_secs = 1520861821
+    publish_time_nanos = 234567000
+    ack_id = 'ack_id'
+    pull_response = test_utils.create_pull_response([
+        test_utils.PullResponseMessage(
+            data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+    ])
+    mock_pubsub.return_value.pull.return_value = pull_response
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -591,20 +497,10 @@
              with_attributes=True, timestamp_attribute='time'))
     with self.assertRaisesRegexp(ValueError, r'parse'):
       p.run()
+    mock_pubsub.return_value.acknowledge.assert_not_called()
 
-  @mock.patch('google.cloud.pubsub')
-  def test_read_message_id_label_unsupported(self, mock_pubsub):
+  def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
     # id_label is unsupported in DirectRunner.
-    data = 'data'
-    message_id = 'message_id'
-    attributes = {'time': '1337 unparseable'}
-    publish_time = '2018-03-12T13:37:01.234567Z'
-    payloads = [
-        create_client_message(data, message_id, attributes, publish_time)]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
-    mock_pubsub.subscription.AutoAck = FakeAutoAck
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label'))
@@ -614,16 +510,12 @@
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.PublisherClient')
 class TestWriteToPubSub(unittest.TestCase):
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_success(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -632,15 +524,12 @@
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=False))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_deprecated(self, mock_pubsub):
     data = 'data'
     payloads = [data]
-    expected_payloads = [[data, {}]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -648,16 +537,13 @@
          | Create(payloads)
          | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_success(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
@@ -666,15 +552,14 @@
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
                          with_attributes=True))
     p.run()
+    mock_pubsub.return_value.publish.assert_has_calls([
+        mock.call(mock.ANY, data, **attributes)])
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_with_attributes_error(self, mock_pubsub):
     data = 'data'
     # Sending raw data when WriteToPubSub expects a PubsubMessage object.
     payloads = [data]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient)
-
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     _ = (p
@@ -685,15 +570,10 @@
                                  r'str.*has no attribute.*data'):
       p.run()
 
-  @mock.patch('google.cloud.pubsub')
   def test_write_messages_unsupported_features(self, mock_pubsub):
     data = 'data'
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
-    expected_payloads = [[data, attributes]]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient,
-                                           messages_write=expected_payloads)
 
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 6217faf..7a0b5c8 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -31,12 +31,10 @@
 
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 DEFAULT_TIMEOUT = 5 * 60
 MAX_MESSAGES_IN_ONE_PULL = 50
@@ -49,8 +47,9 @@
   subscription until all expected messages are shown or timeout.
   """
 
-  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT,
-               with_attributes=False, strip_attributes=None):
+  def __init__(self, project, sub_name, expected_msg,
+               timeout=DEFAULT_TIMEOUT, with_attributes=False,
+               strip_attributes=None):
     """Initialize PubSubMessageMatcher object.
 
     Args:
@@ -59,8 +58,9 @@
       expected_msg: A string list that contains expected message data pulled
         from the subscription. See also: with_attributes.
       timeout: Timeout in seconds to wait for all expected messages appears.
-      with_attributes: Whether expected_msg is a list of
-        ``PubsubMessage`` objects.
+      with_attributes: If True, will match against both message data and
+        attributes. If True, expected_msg should be a list of ``PubsubMessage``
+        objects. Otherwise, it should be a list of ``bytes``.
       strip_attributes: List of strings. If with_attributes==True, strip the
         attributes keyed by these values from incoming messages.
         If a key is missing, will add an attribute with an error message as
@@ -86,28 +86,26 @@
 
   def _matches(self, _):
     if self.messages is None:
-      self.messages = self._wait_for_messages(self._get_subscription(),
-                                              len(self.expected_msg),
+      self.messages = self._wait_for_messages(len(self.expected_msg),
                                               self.timeout)
     return Counter(self.messages) == Counter(self.expected_msg)
 
-  def _get_subscription(self):
-    return pubsub.Client(project=self.project).subscription(self.sub_name)
-
-  def _wait_for_messages(self, subscription, expected_num, timeout):
+  def _wait_for_messages(self, expected_num, timeout):
     """Wait for messages from given subscription."""
-    logging.debug('Start pulling messages from %s', subscription.full_name)
     total_messages = []
+
+    sub_client = pubsub.SubscriberClient()
     start_time = time.time()
     while time.time() - start_time <= timeout:
-      pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
-      for ack_id, message in pulled:
-        subscription.acknowledge([ack_id])
+      response = sub_client.pull(self.sub_name,
+                                 max_messages=MAX_MESSAGES_IN_ONE_PULL,
+                                 return_immediately=True)
+      for rm in response.received_messages:
+        msg = PubsubMessage._from_message(rm.message)
         if not self.with_attributes:
-          total_messages.append(message.data)
+          total_messages.append(msg.data)
           continue
 
-        msg = PubsubMessage._from_message(message)
         if self.strip_attributes:
           for attr in self.strip_attributes:
             try:
@@ -117,12 +115,16 @@
                                       'expected attribute not found.')
         total_messages.append(msg)
 
+      ack_ids = [rm.ack_id for rm in response.received_messages]
+      if ack_ids:
+        sub_client.acknowledge(self.sub_name, ack_ids)
       if len(total_messages) >= expected_num:
-        return total_messages
+        break
       time.sleep(1)
 
-    logging.error('Timeout after %d sec. Received %d messages from %s.',
-                  timeout, len(total_messages), subscription.full_name)
+    if time.time() - start_time > timeout:
+      logging.error('Timeout after %d sec. Received %d messages from %s.',
+                    timeout, len(total_messages), self.sub_name)
     return total_messages
 
   def describe_to(self, description):
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 0e59481..0f9351d 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -27,17 +27,19 @@
 
 from apache_beam.io.gcp.pubsub import PubsubMessage
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.testing.test_utils import PullResponseMessage
+from apache_beam.testing.test_utils import create_pull_response
 
 # Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
 try:
   from google.cloud import pubsub
 except ImportError:
   pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
 
 
 @unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
+@mock.patch('time.sleep', return_value=None)
+@mock.patch('google.cloud.pubsub.SubscriberClient')
 class PubSubMatcherTest(unittest.TestCase):
 
   def setUp(self):
@@ -48,90 +50,75 @@
         'mock_project', 'mock_sub_name', ['mock_expected_msg'],
         with_attributes=with_attributes, strip_attributes=strip_attributes)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a', 'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
-        [(1, pubsub.message.Message(b'a', 'unused_id'))],
-        [(2, pubsub.message.Message(b'b', 'unused_id'))],
+        create_pull_response([PullResponseMessage(b'a', {})]),
+        create_pull_response([PullResponseMessage(b'b', {})]),
     ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 2)
+    self.assertEqual(mock_sub.acknowledge.call_count, 2)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True)
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['k'] = 'v'  # Unexpected.
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Unexpected attribute 'k'.
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+    ]
     with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['timestamp'] = 'bar'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
+    ])]
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
     self.init_matcher(with_attributes=True,
                       strip_attributes=['id', 'timestamp'])
     self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
     mock_sub = mock_get_sub.return_value
-    # msg_a is missing attribute 'timestamp'.
-    msg_a = pubsub.message.Message(b'a', 'unused_id')
-    msg_a.attributes['id'] = 'foo'
-    msg_a.attributes['k'] = 'v'
-    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    # Message is missing attribute 'timestamp'.
+    mock_sub.pull.side_effect = [create_pull_response([
+        PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
+    ])]
     with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
     self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a']
     mock_sub = mock_get_sub.return_value
-    mock_sub.pull.return_value = [
-        (1, pubsub.message.Message(b'c', 'unused_id')),
-        (1, pubsub.message.Message(b'd', 'unused_id')),
+    mock_sub.pull.side_effect = [
+        create_pull_response([PullResponseMessage(b'c', {}),
+                              PullResponseMessage(b'd', {})]),
     ]
     with self.assertRaises(AssertionError) as error:
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
@@ -140,10 +127,9 @@
     self.assertTrue(
         '\nExpected: Expected 1 messages.\n     but: Got 2 messages.'
         in str(error.exception.args[0]))
+    self.assertEqual(mock_sub.pull.call_count, 1)
+    self.assertEqual(mock_sub.acknowledge.call_count, 1)
 
-  @mock.patch('time.sleep', return_value=None)
-  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
-              'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
     self.init_matcher()
     mock_sub = mock_get_sub.return_value
@@ -152,6 +138,7 @@
     with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertTrue(mock_sub.pull.called)
+    self.assertEqual(mock_sub.acknowledge.call_count, 0)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 00e37f3..d410992 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -25,6 +25,7 @@
 
 import itertools
 import logging
+import time
 
 from google.protobuf import wrappers_pb2
 
@@ -264,11 +265,12 @@
 
 
 class _DirectWriteToPubSubFn(DoFn):
-  _topic = None
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
 
   def __init__(self, sink):
     self.project = sink.project
-    self.topic_name = sink.topic_name
+    self.short_topic_name = sink.topic_name
     self.id_label = sink.id_label
     self.timestamp_attribute = sink.timestamp_attribute
     self.with_attributes = sink.with_attributes
@@ -282,30 +284,33 @@
                                 'supported for PubSub writes')
 
   def start_bundle(self):
-    from google.cloud import pubsub
-
-    if self._topic is None:
-      self._topic = pubsub.Client(project=self.project).topic(
-          self.topic_name)
     self._buffer = []
 
   def process(self, elem):
     self._buffer.append(elem)
-    if len(self._buffer) >= 100:
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
       self._flush()
 
   def finish_bundle(self):
     self._flush()
 
   def _flush(self):
-    if self._buffer:
-      with self._topic.batch() as batch:
-        for elem in self._buffer:
-          if self.with_attributes:
-            batch.publish(elem.data, **elem.attributes)
-          else:
-            batch.publish(elem)
-      self._buffer = []
+    from google.cloud import pubsub
+    pub_client = pubsub.PublisherClient()
+    topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+    if self.with_attributes:
+      futures = [pub_client.publish(topic, elem.data, **elem.attributes)
+                 for elem in self._buffer]
+    else:
+      futures = [pub_client.publish(topic, elem)
+                 for elem in self._buffer]
+
+    timer_start = time.time()
+    for future in futures:
+      remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+      future.result(remaining)
+    self._buffer = []
 
 
 def _get_pubsub_transform_overrides(pipeline_options):
diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
index 8facca8..23dfeab 100644
--- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
@@ -52,5 +52,6 @@
     finally:
       if not PipelineState.is_terminal(self.result.state):
         self.result.cancel()
+        self.result.wait_until_finish()
 
     return self.result
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ef12e2c..fad0704 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -377,20 +377,44 @@
 
 
 class _PubSubSubscriptionWrapper(object):
-  """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+  """Wrapper for managing temporary PubSub subscriptions."""
 
-  def __init__(self, subscription, should_cleanup):
-    self.subscription = subscription
-    self.should_cleanup = should_cleanup
+  def __init__(self, project, short_topic_name, short_sub_name):
+    """Initialize subscription wrapper.
+
+    If sub_name is None, will create a temporary subscription to topic_name.
+
+    Args:
+      project: GCP project name for topic and subscription. May be None.
+        Required if sub_name is None.
+      short_topic_name: Valid topic name without
+        'projects/{project}/topics/' prefix. May be None.
+        Required if sub_name is None.
+      short_sub_name: Valid subscription name without
+        'projects/{project}/subscriptions/' prefix. May be None.
+    """
+    from google.cloud import pubsub
+    self.sub_client = pubsub.SubscriberClient()
+
+    if short_sub_name is None:
+      self.sub_name = self.sub_client.subscription_path(
+          project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+      topic_name = self.sub_client.topic_path(project, short_topic_name)
+      self.sub_client.create_subscription(self.sub_name, topic_name)
+      self._should_cleanup = True
+    else:
+      self.sub_name = self.sub_client.subscription_path(project, short_sub_name)
+      self._should_cleanup = False
 
   def __del__(self):
-    if self.should_cleanup:
-      self.subscription.delete()
+    if self._should_cleanup:
+      self.sub_client.delete_subscription(self.sub_name)
 
 
 class _PubSubReadEvaluator(_TransformEvaluator):
   """TransformEvaluator for PubSub read."""
 
+  # A mapping of transform to _PubSubSubscriptionWrapper.
   _subscription_cache = {}
 
   def __init__(self, evaluation_context, applied_ptransform,
@@ -404,26 +428,16 @@
     if self.source.id_label:
       raise NotImplementedError(
           'DirectRunner: id_label is not supported for PubSub reads')
-    self._subscription = _PubSubReadEvaluator.get_subscription(
+    self._sub_name = _PubSubReadEvaluator.get_subscription(
         self._applied_ptransform, self.source.project, self.source.topic_name,
         self.source.subscription_name)
 
   @classmethod
-  def get_subscription(cls, transform, project, topic, subscription_name):
+  def get_subscription(cls, transform, project, topic, short_sub_name):
     if transform not in cls._subscription_cache:
-      from google.cloud import pubsub
-      should_create = not subscription_name
-      if should_create:
-        subscription_name = 'beam_%d_%x' % (
-            int(time.time()), random.randrange(1 << 32))
-      wrapper = _PubSubSubscriptionWrapper(
-          pubsub.Client(project=project).topic(topic).subscription(
-              subscription_name),
-          should_create)
-      if should_create:
-        wrapper.subscription.create()
+      wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
       cls._subscription_cache[transform] = wrapper
-    return cls._subscription_cache[transform].subscription
+    return cls._subscription_cache[transform].sub_name
 
   def start_bundle(self):
     pass
@@ -438,28 +452,34 @@
     # evaluator fails with an exception before emitting a bundle. However,
     # the DirectRunner currently doesn't retry work items anyway, so the
     # pipeline would enter an inconsistent state on any error.
-    with pubsub.subscription.AutoAck(
-        self._subscription, return_immediately=True,
-        max_messages=10) as results:
-      def _get_element(message):
-        parsed_message = PubsubMessage._from_message(message)
-        if (timestamp_attribute and
-            timestamp_attribute in parsed_message.attributes):
-          rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+    sub_client = pubsub.SubscriberClient()
+    response = sub_client.pull(self._sub_name, max_messages=10,
+                               return_immediately=True)
+
+    def _get_element(message):
+      parsed_message = PubsubMessage._from_message(message)
+      if (timestamp_attribute and
+          timestamp_attribute in parsed_message.attributes):
+        rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+        try:
+          timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+        except ValueError:
           try:
-            timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
-          except ValueError:
-            try:
-              timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
-            except ValueError as e:
-              raise ValueError('Bad timestamp value: %s' % e)
-        else:
-          timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+            timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+          except ValueError as e:
+            raise ValueError('Bad timestamp value: %s' % e)
+      else:
+        timestamp = Timestamp(message.publish_time.seconds,
+                              message.publish_time.nanos // 1000)
 
-        return timestamp, parsed_message
+      return timestamp, parsed_message
 
-      return [_get_element(message)
-              for unused_ack_id, message in iteritems(results)]
+    results = [_get_element(rm.message) for rm in response.received_messages]
+    ack_ids = [rm.ack_id for rm in response.received_messages]
+    if ack_ids:
+      sub_client.acknowledge(self._sub_name, ack_ids)
+
+    return results
 
   def finish_bundle(self):
     data = self._read_from_pubsub(self.source.timestamp_attribute)
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 490d079..1f0e99e 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -24,11 +24,9 @@
 
 import hashlib
 import imp
-import logging
 import os
 import shutil
 import tempfile
-import time
 from builtins import object
 
 from mock import Mock
@@ -136,46 +134,61 @@
   FileSystems.delete(file_paths)
 
 
-def wait_for_subscriptions_created(subs, timeout=60):
-  """Wait for all PubSub subscriptions are created."""
-  return _wait_until_all_exist(subs, timeout)
-
-
-def wait_for_topics_created(topics, timeout=60):
-  """Wait for all PubSub topics are created."""
-  return _wait_until_all_exist(topics, timeout)
-
-
-def _wait_until_all_exist(components, timeout):
-  unchecked_components = set(components)
-  start_time = time.time()
-  while time.time() - start_time <= timeout:
-    unchecked_components = set(
-        [c for c in unchecked_components if not c.exists()])
-    if len(unchecked_components) == 0:
-      return True
-    time.sleep(2)
-
-  raise RuntimeError(
-      'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
-      'They are %s.' % (timeout, len(unchecked_components),
-                        len(components), list(unchecked_components)))
-
-
-def cleanup_subscriptions(subs):
+def cleanup_subscriptions(sub_client, subs):
   """Cleanup PubSub subscriptions if exist."""
-  _cleanup_pubsub(subs)
+  for sub in subs:
+    sub_client.delete_subscription(sub.name)
 
 
-def cleanup_topics(topics):
+def cleanup_topics(pub_client, topics):
   """Cleanup PubSub topics if exist."""
-  _cleanup_pubsub(topics)
+  for topic in topics:
+    pub_client.delete_topic(topic.name)
 
 
-def _cleanup_pubsub(components):
-  for c in components:
-    if c.exists():
-      c.delete()
-    else:
-      logging.debug('Cannot delete topic/subscription. %s does not exist.',
-                    c.full_name)
+class PullResponseMessage(object):
+  """Data representing a pull request response.
+
+  Utility class for ``create_pull_response``.
+  """
+  def __init__(self, data, attributes=None,
+               publish_time_secs=None, publish_time_nanos=None, ack_id=None):
+    self.data = data
+    self.attributes = attributes
+    self.publish_time_secs = publish_time_secs
+    self.publish_time_nanos = publish_time_nanos
+    self.ack_id = ack_id
+
+
+def create_pull_response(responses):
+  """Create an instance of ``google.cloud.pubsub.types.ReceivedMessage``.
+
+  Used to simulate the response from pubsub.SubscriberClient().pull().
+
+  Args:
+    responses: list of ``PullResponseMessage``
+
+  Returns:
+    An instance of ``google.cloud.pubsub.types.PullResponse`` populated with
+    responses.
+  """
+  from google.cloud import pubsub
+
+  res = pubsub.types.PullResponse()
+  for response in responses:
+    received_message = res.received_messages.add()
+
+    message = received_message.message
+    message.data = response.data
+    if response.attributes is not None:
+      for k, v in response.attributes.items():
+        message.attributes[k] = v
+    if response.publish_time_secs is not None:
+      message.publish_time.seconds = response.publish_time_secs
+    if response.publish_time_nanos is not None:
+      message.publish_time.nanos = response.publish_time_nanos
+
+    if response.ack_id is not None:
+      received_message.ack_id = response.ack_id
+
+  return res
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
index bef4078..2b16c30c 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -82,56 +82,19 @@
         self.assertEqual(f.readline(), b'line2\n')
         self.assertEqual(f.readline(), b'line3\n')
 
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    sub2 = mock.MagicMock()
-    sub2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
-    self.assertTrue(sub1.exists.called)
-    self.assertTrue(sub2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_fails(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    topic2 = mock.MagicMock()
-    topic2.exists.return_value = False
-    with self.assertRaises(RuntimeError) as error:
-      utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
-    self.assertTrue(topic1.exists.called)
-    self.assertTrue(topic2.exists.called)
-    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
-    sub1 = mock.MagicMock()
-    sub1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([sub1], timeout=0.1))
-
-  @mock.patch('time.sleep', return_value=None)
-  def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
-    topic1 = mock.MagicMock()
-    topic1.exists.return_value = True
-    self.assertTrue(
-        utils.wait_for_subscriptions_created([topic1], timeout=0.1))
-    self.assertTrue(topic1.exists.called)
-
   def test_cleanup_subscriptions(self):
-    mock_sub = mock.MagicMock()
-    mock_sub.exist.return_value = True
-    utils.cleanup_subscriptions([mock_sub])
-    self.assertTrue(mock_sub.delete.called)
+    sub_client = mock.Mock()
+    sub = mock.Mock()
+    sub.name = 'test_sub'
+    utils.cleanup_subscriptions(sub_client, [sub])
+    sub_client.delete_subscription.assert_called_with(sub.name)
 
   def test_cleanup_topics(self):
-    mock_topics = mock.MagicMock()
-    mock_topics.exist.return_value = True
-    utils.cleanup_subscriptions([mock_topics])
-    self.assertTrue(mock_topics.delete.called)
+    pub_client = mock.Mock()
+    topic = mock.Mock()
+    topic.name = 'test_topic'
+    utils.cleanup_topics(pub_client, [topic])
+    pub_client.delete_topic.assert_called_with(topic.name)
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index a32bc44..8e352a6 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -47,10 +47,9 @@
 # GCP extra features
 google-apitools==0.5.20
 googledatastore==7.0.1
-google-cloud-pubsub==0.26.0
+google-cloud-pubsub==0.35.4
 google-cloud-bigquery==0.25.0
 proto-google-cloud-datastore-v1==0.90.4
-proto-google-cloud-pubsub-v1==0.15.4
 
 # Optional packages
 cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 9ee4bdf..ce94945 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -141,8 +141,7 @@
     'google-apitools>=0.5.18,<=0.5.20',
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
     'googledatastore==7.0.1',
-    'google-cloud-pubsub==0.26.0',
-    'proto-google-cloud-pubsub-v1==0.15.4',
+    'google-cloud-pubsub==0.35.4',
     # GCP packages required by tests
     'google-cloud-bigquery==0.25.0',
 ]