Merge pull request #6564 from udim/pubsub-0-35-4
[BEAM-5513] Upgrade Python SDK to PubSub 0.35.4
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 6dc60d0..2fc19da 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -72,15 +72,15 @@
# Set up PubSub environment.
from google.cloud import pubsub
- self.pubsub_client = pubsub.Client(project=self.project)
- unique_topic_name = self.INPUT_TOPIC + _unique_id
- unique_subscrition_name = self.INPUT_SUB + _unique_id
- self.input_topic = self.pubsub_client.topic(unique_topic_name)
- self.input_sub = self.input_topic.subscription(unique_subscrition_name)
+ self.pub_client = pubsub.PublisherClient()
+ self.input_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
- self.input_topic.create()
- test_utils.wait_for_topics_created([self.input_topic])
- self.input_sub.create()
+ self.sub_client = pubsub.SubscriberClient()
+ self.input_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project,
+ self.INPUT_SUB + _unique_id),
+ self.input_topic.name)
# Set up BigQuery environment
from google.cloud import bigquery
@@ -95,14 +95,15 @@
"""Inject game events as test data to PubSub."""
logging.debug('Injecting %d game events to topic %s',
- message_count, topic.full_name)
+ message_count, topic.name)
for _ in range(message_count):
- topic.publish(self.INPUT_EVENT % self._test_timestamp)
+ self.pub_client.publish(topic.name,
+ self.INPUT_EVENT % self._test_timestamp)
def _cleanup_pubsub(self):
- test_utils.cleanup_subscriptions([self.input_sub])
- test_utils.cleanup_topics([self.input_topic])
+ test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+ test_utils.cleanup_topics(self.pub_client, [self.input_topic])
def _cleanup_dataset(self):
self.dataset.delete()
@@ -123,9 +124,9 @@
# TODO(mariagh): Add teams table verifier once game_stats.py is fixed.
- extra_opts = {'subscription': self.input_sub.full_name,
+ extra_opts = {'subscription': self.input_sub.name,
'dataset': self.dataset.name,
- 'topic': self.input_topic.full_name,
+ 'topic': self.input_topic.name,
'fixed_window_duration': 1,
'user_activity_window_duration': 1,
'wait_until_finish_duration':
@@ -143,8 +144,6 @@
self.dataset.name, self.OUTPUT_TABLE_TEAMS)
# Generate input data and inject to PubSub.
- test_utils.wait_for_subscriptions_created([self.input_topic,
- self.input_sub])
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
# Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index ab10942..e0e309b 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -73,15 +73,16 @@
# Set up PubSub environment.
from google.cloud import pubsub
- self.pubsub_client = pubsub.Client(project=self.project)
- unique_topic_name = self.INPUT_TOPIC + _unique_id
- unique_subscrition_name = self.INPUT_SUB + _unique_id
- self.input_topic = self.pubsub_client.topic(unique_topic_name)
- self.input_sub = self.input_topic.subscription(unique_subscrition_name)
- self.input_topic.create()
- test_utils.wait_for_topics_created([self.input_topic])
- self.input_sub.create()
+ self.pub_client = pubsub.PublisherClient()
+ self.input_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, self.INPUT_TOPIC + _unique_id))
+
+ self.sub_client = pubsub.SubscriberClient()
+ self.input_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project,
+ self.INPUT_SUB + _unique_id),
+ self.input_topic.name)
# Set up BigQuery environment
from google.cloud import bigquery
@@ -96,14 +97,15 @@
"""Inject game events as test data to PubSub."""
logging.debug('Injecting %d game events to topic %s',
- message_count, topic.full_name)
+ message_count, topic.name)
for _ in range(message_count):
- topic.publish(self.INPUT_EVENT % self._test_timestamp)
+ self.pub_client.publish(topic.name,
+ self.INPUT_EVENT % self._test_timestamp)
def _cleanup_pubsub(self):
- test_utils.cleanup_subscriptions([self.input_sub])
- test_utils.cleanup_topics([self.input_topic])
+ test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
+ test_utils.cleanup_topics(self.pub_client, [self.input_topic])
def _cleanup_dataset(self):
self.dataset.delete()
@@ -131,9 +133,9 @@
teams_query,
self.DEFAULT_EXPECTED_CHECKSUM)
- extra_opts = {'subscription': self.input_sub.full_name,
+ extra_opts = {'subscription': self.input_sub.name,
'dataset': self.dataset.name,
- 'topic': self.input_topic.full_name,
+ 'topic': self.input_topic.name,
'team_window_duration': 1,
'wait_until_finish_duration':
self.WAIT_UNTIL_FINISH_DURATION,
@@ -151,8 +153,6 @@
self.dataset.name, self.OUTPUT_TABLE_TEAMS)
# Generate input data and inject to PubSub.
- test_utils.wait_for_subscriptions_created([self.input_topic,
- self.input_sub])
self._inject_pubsub_game_events(self.input_topic, self.DEFAULT_INPUT_COUNT)
# Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 3c0cfa9..78e89a1 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -52,31 +52,31 @@
# Set up PubSub environment.
from google.cloud import pubsub
- self.pubsub_client = pubsub.Client(project=self.project)
- self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
- self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
- self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
- self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
+ self.pub_client = pubsub.PublisherClient()
+ self.input_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+ self.output_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
- self.input_topic.create()
- self.output_topic.create()
- test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
- self.input_sub.create()
- self.output_sub.create()
+ self.sub_client = pubsub.SubscriberClient()
+ self.input_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+ self.input_topic.name)
+ self.output_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+ self.output_topic.name)
def _inject_numbers(self, topic, num_messages):
"""Inject numbers as test data to PubSub."""
- logging.debug('Injecting %d numbers to topic %s',
- num_messages, topic.full_name)
+ logging.debug('Injecting %d numbers to topic %s', num_messages, topic.name)
for n in range(num_messages):
- topic.publish(str(n))
-
- def _cleanup_pubsub(self):
- test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
- test_utils.cleanup_topics([self.input_topic, self.output_topic])
+ self.pub_client.publish(self.input_topic.name, str(n))
def tearDown(self):
- self._cleanup_pubsub()
+ test_utils.cleanup_subscriptions(self.sub_client,
+ [self.input_sub, self.output_sub])
+ test_utils.cleanup_topics(self.pub_client,
+ [self.input_topic, self.output_topic])
@attr('IT')
def test_streaming_wordcount_it(self):
@@ -86,17 +86,16 @@
# Set extra options to the pipeline for test purpose
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
pubsub_msg_verifier = PubSubMessageMatcher(self.project,
- OUTPUT_SUB + self.uuid,
+ self.output_sub.name,
expected_msg,
timeout=400)
- extra_opts = {'input_subscription': self.input_sub.full_name,
- 'output_topic': self.output_topic.full_name,
+ extra_opts = {'input_subscription': self.input_sub.name,
+ 'output_topic': self.output_topic.name,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier,
pubsub_msg_verifier)}
# Generate input data and inject to PubSub.
- test_utils.wait_for_subscriptions_created([self.input_sub])
self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
# Get pipeline options from command argument: --test-pipeline-options,
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 2414194..a1644ab 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,11 +38,10 @@
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils.annotations import deprecated
-# The protobuf library is only used for running on Dataflow.
try:
- from google.cloud.proto.pubsub.v1 import pubsub_pb2
+ from google.cloud import pubsub
except ImportError:
- pubsub_pb2 = None
+ pubsub = None
__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
'WriteStringsToPubSub', 'WriteToPubSub']
@@ -92,7 +91,7 @@
Returns:
A new PubsubMessage object.
"""
- msg = pubsub_pb2.PubsubMessage()
+ msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
@@ -109,7 +108,7 @@
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
containing the payload of this object.
"""
- msg = pubsub_pb2.PubsubMessage()
+ msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.data = self.data
for key, value in self.attributes.iteritems():
msg.attributes[key] = value
@@ -117,9 +116,9 @@
@staticmethod
def _from_message(msg):
- """Construct from ``google.cloud.pubsub.message.Message``.
+ """Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.
- https://google-cloud-python.readthedocs.io/en/latest/pubsub/subscriber/api/message.html
+ https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 9bb81fc..5b060e5 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -100,21 +100,25 @@
# Set up PubSub environment.
from google.cloud import pubsub
- self.pubsub_client = pubsub.Client(project=self.project)
- self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
- self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
- self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
- self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
+ self.pub_client = pubsub.PublisherClient()
+ self.input_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
+ self.output_topic = self.pub_client.create_topic(
+ self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
- self.input_topic.create()
- self.output_topic.create()
- test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
- self.input_sub.create()
- self.output_sub.create()
+ self.sub_client = pubsub.SubscriberClient()
+ self.input_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project, INPUT_SUB + self.uuid),
+ self.input_topic.name)
+ self.output_sub = self.sub_client.create_subscription(
+ self.sub_client.subscription_path(self.project, OUTPUT_SUB + self.uuid),
+ self.output_topic.name)
def tearDown(self):
- test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
- test_utils.cleanup_topics([self.input_topic, self.output_topic])
+ test_utils.cleanup_subscriptions(self.sub_client,
+ [self.input_sub, self.output_sub])
+ test_utils.cleanup_topics(self.pub_client,
+ [self.input_topic, self.output_topic])
def _test_streaming(self, with_attributes):
"""Runs IT pipeline with message verifier.
@@ -139,21 +143,20 @@
strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
pubsub_msg_verifier = PubSubMessageMatcher(
self.project,
- OUTPUT_SUB + self.uuid,
+ self.output_sub.name,
expected_messages,
timeout=MESSAGE_MATCHER_TIMEOUT_S,
with_attributes=with_attributes,
strip_attributes=strip_attributes)
- extra_opts = {'input_subscription': self.input_sub.full_name,
- 'output_topic': self.output_topic.full_name,
+ extra_opts = {'input_subscription': self.input_sub.name,
+ 'output_topic': self.output_topic.name,
'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
'on_success_matcher': all_of(state_verifier,
pubsub_msg_verifier)}
# Generate input data and inject to PubSub.
- test_utils.wait_for_subscriptions_created([self.input_sub])
for msg in self.INPUT_MESSAGES[self.runner_name]:
- self.input_topic.publish(msg.data, **msg.attributes)
+ self.pub_client.publish(self.input_topic.name, msg.data, **msg.attributes)
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 6e19950..a95ffc6 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -20,12 +20,9 @@
from __future__ import absolute_import
-import functools
import logging
import unittest
from builtins import object
-from builtins import range
-from builtins import zip
import hamcrest as hc
import mock
@@ -43,6 +40,7 @@
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
from apache_beam.runners.direct.direct_runner import _get_transform_overrides
from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
+from apache_beam.testing import test_utils
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
@@ -54,18 +52,10 @@
from apache_beam.utils import timestamp
# Protect against environments where the PubSub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
-
-# The protobuf library is only used for running on Dataflow.
-try:
- from google.cloud.proto.pubsub.v1 import pubsub_pb2
-except ImportError:
- pubsub_pb2 = None
class TestPubsubMessage(unittest.TestCase):
@@ -81,8 +71,7 @@
with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
_ = PubsubMessage(None, {})
- @unittest.skipIf(pubsub_pb2 is None,
- 'PubSub proto dependencies are not installed')
+ @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
def test_proto_conversion(self):
data = 'data'
attributes = {'k1': 'v1', 'k2': 'v2'}
@@ -220,7 +209,7 @@
write_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
- self.assertEqual('a_topic', write_transform.dofn.topic_name)
+ self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
def test_expand(self):
p = TestPipeline()
@@ -240,7 +229,7 @@
write_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
- self.assertEqual('a_topic', write_transform.dofn.topic_name)
+ self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
self.assertEqual(True, write_transform.dofn.with_attributes)
# TODO(BEAM-4275): These properties aren't supported yet in direct runner.
self.assertEqual(None, write_transform.dofn.id_label)
@@ -333,118 +322,25 @@
}
-class FakePubsubTopic(object):
-
- def __init__(self, name, client):
- self.name = name
- self.client = client
-
- def subscription(self, name):
- return FakePubsubSubscription(name, self.name, self.client)
-
- def batch(self):
- if self.client.batch is None:
- self.client.batch = FakeBatch(self.client)
- return self.client.batch
-
-
-class FakePubsubSubscription(object):
-
- def __init__(self, name, topic, client):
- self.name = name
- self.topic = topic
- self.client = client
-
- def create(self):
- pass
-
-
-class FakeAutoAck(object):
-
- def __init__(self, sub, **unused_kwargs):
- self.sub = sub
-
- def __enter__(self):
- messages = self.sub.client.messages_read
- self.ack_id_to_msg = dict(zip(range(len(messages)), messages))
- return self.ack_id_to_msg
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- pass
-
-
-class FakeBatch(object):
- """Context manager that accept Pubsub client writes via publish().
-
- Verifies writes on exit.
- """
-
- def __init__(self, client):
- self.client = client
- self.published = []
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_type is not None:
- return # Exception will be raised.
- hc.assert_that(self.published,
- hc.only_contains(*self.client.messages_write))
-
- def publish(self, message, **attrs):
- self.published.append([message, attrs])
-
-
-class FakePubsubClient(object):
-
- def __init__(self, messages_read=None, messages_write=None, project=None,
- **unused_kwargs):
- """Creates a Pubsub client fake.
-
- Args:
- messages_read: List of PubsubMessage objects to return.
- messages_write: List of [data, attributes] pairs, corresponding to
- messages expected to be written to the client.
- project: Name of GCP project.
- """
- self.messages_read = messages_read
- self.messages_write = messages_write
- self.project = project
- self.batch = None
-
- def topic(self, name):
- return FakePubsubTopic(name, self)
-
-
-def create_client_message(data, message_id, attributes, publish_time):
- """Returns a message as it would be returned from Cloud Pub/Sub client.
-
- This is what the reader sees.
- """
- msg = pubsub.message.Message(data, message_id, attributes)
- msg._service_timestamp = publish_time
- return msg
-
-
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.SubscriberClient')
class TestReadFromPubSub(unittest.TestCase):
- @mock.patch('google.cloud.pubsub')
def test_read_messages_success(self, mock_pubsub):
data = 'data'
- message_id = 'message_id'
- publish_time = '2018-03-12T13:37:01.234567Z'
+ publish_time_secs = 1520861821
+ publish_time_nanos = 234567000
attributes = {'key': 'value'}
- payloads = [create_client_message(
- data, message_id, attributes, publish_time)]
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(
+ data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+ ])
expected_elements = [
TestWindowedValue(PubsubMessage(data, attributes),
timestamp.Timestamp(1520861821.234567),
[window.GlobalWindow()])]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -453,17 +349,18 @@
None, None, with_attributes=True))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_strings_success(self, mock_pubsub):
data = u'🤷 ¯\\_(ツ)_/¯'
data_encoded = data.encode('utf-8')
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [create_client_message(data_encoded, None, None, publish_time)]
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)
+ ])
expected_elements = [data]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -472,16 +369,16 @@
None, None))
assert_that(pcoll, equal_to(expected_elements))
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_data_success(self, mock_pubsub):
data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [create_client_message(data_encoded, None, None, publish_time)]
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
expected_elements = [data_encoded]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -489,24 +386,26 @@
| ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
data = 'data'
- message_id = 'message_id'
attributes = {'time': '1337'}
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [
- create_client_message(data, message_id, attributes, publish_time)]
+ publish_time_secs = 1520861821
+ publish_time_nanos = 234567000
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(
+ data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+ ])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp(micros=int(attributes['time']) * 1000),
[window.GlobalWindow()]),
]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -516,24 +415,26 @@
with_attributes=True, timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
data = 'data'
- message_id = 'message_id'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [
- create_client_message(data, message_id, attributes, publish_time)]
+ publish_time_secs = 1337000000
+ publish_time_nanos = 133700000
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(
+ data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+ ])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(attributes['time']),
[window.GlobalWindow()]),
]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -543,24 +444,27 @@
with_attributes=True, timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
data = 'data'
- message_id = 'message_id'
attributes = {}
+ publish_time_secs = 1520861821
+ publish_time_nanos = 234567000
publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [
- create_client_message(data, message_id, attributes, publish_time)]
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(
+ data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+ ])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(publish_time),
[window.GlobalWindow()]),
]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -570,18 +474,20 @@
with_attributes=True, timestamp_attribute='nonexistent'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
p.run()
+ mock_pubsub.return_value.acknowledge.assert_has_calls([
+ mock.call(mock.ANY, [ack_id])])
- @mock.patch('google.cloud.pubsub')
def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
data = 'data'
- message_id = 'message_id'
attributes = {'time': '1337 unparseable'}
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [
- create_client_message(data, message_id, attributes, publish_time)]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
+ publish_time_secs = 1520861821
+ publish_time_nanos = 234567000
+ ack_id = 'ack_id'
+ pull_response = test_utils.create_pull_response([
+ test_utils.PullResponseMessage(
+ data, attributes, publish_time_secs, publish_time_nanos, ack_id)
+ ])
+ mock_pubsub.return_value.pull.return_value = pull_response
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -591,20 +497,10 @@
with_attributes=True, timestamp_attribute='time'))
with self.assertRaisesRegexp(ValueError, r'parse'):
p.run()
+ mock_pubsub.return_value.acknowledge.assert_not_called()
- @mock.patch('google.cloud.pubsub')
- def test_read_message_id_label_unsupported(self, mock_pubsub):
+ def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
# id_label is unsupported in DirectRunner.
- data = 'data'
- message_id = 'message_id'
- attributes = {'time': '1337 unparseable'}
- publish_time = '2018-03-12T13:37:01.234567Z'
- payloads = [
- create_client_message(data, message_id, attributes, publish_time)]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
- mock_pubsub.subscription.AutoAck = FakeAutoAck
-
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
_ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label'))
@@ -614,16 +510,12 @@
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+@mock.patch('google.cloud.pubsub.PublisherClient')
class TestWriteToPubSub(unittest.TestCase):
- @mock.patch('google.cloud.pubsub')
def test_write_messages_success(self, mock_pubsub):
data = 'data'
payloads = [data]
- expected_payloads = [[data, {}]]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient,
- messages_write=expected_payloads)
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -632,15 +524,12 @@
| WriteToPubSub('projects/fakeprj/topics/a_topic',
with_attributes=False))
p.run()
+ mock_pubsub.return_value.publish.assert_has_calls([
+ mock.call(mock.ANY, data)])
- @mock.patch('google.cloud.pubsub')
def test_write_messages_deprecated(self, mock_pubsub):
data = 'data'
payloads = [data]
- expected_payloads = [[data, {}]]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient,
- messages_write=expected_payloads)
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -648,16 +537,13 @@
| Create(payloads)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
p.run()
+ mock_pubsub.return_value.publish.assert_has_calls([
+ mock.call(mock.ANY, data)])
- @mock.patch('google.cloud.pubsub')
def test_write_messages_with_attributes_success(self, mock_pubsub):
data = 'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
- expected_payloads = [[data, attributes]]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient,
- messages_write=expected_payloads)
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
@@ -666,15 +552,14 @@
| WriteToPubSub('projects/fakeprj/topics/a_topic',
with_attributes=True))
p.run()
+ mock_pubsub.return_value.publish.assert_has_calls([
+ mock.call(mock.ANY, data, **attributes)])
- @mock.patch('google.cloud.pubsub')
def test_write_messages_with_attributes_error(self, mock_pubsub):
data = 'data'
# Sending raw data when WriteToPubSub expects a PubsubMessage object.
payloads = [data]
- mock_pubsub.Client = functools.partial(FakePubsubClient)
-
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
_ = (p
@@ -685,15 +570,10 @@
r'str.*has no attribute.*data'):
p.run()
- @mock.patch('google.cloud.pubsub')
def test_write_messages_unsupported_features(self, mock_pubsub):
data = 'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
- expected_payloads = [[data, attributes]]
-
- mock_pubsub.Client = functools.partial(FakePubsubClient,
- messages_write=expected_payloads)
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 6217faf..7a0b5c8 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -31,12 +31,10 @@
# Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
DEFAULT_TIMEOUT = 5 * 60
MAX_MESSAGES_IN_ONE_PULL = 50
@@ -49,8 +47,9 @@
subscription until all expected messages are shown or timeout.
"""
- def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT,
- with_attributes=False, strip_attributes=None):
+ def __init__(self, project, sub_name, expected_msg,
+ timeout=DEFAULT_TIMEOUT, with_attributes=False,
+ strip_attributes=None):
"""Initialize PubSubMessageMatcher object.
Args:
@@ -59,8 +58,9 @@
expected_msg: A string list that contains expected message data pulled
from the subscription. See also: with_attributes.
timeout: Timeout in seconds to wait for all expected messages appears.
- with_attributes: Whether expected_msg is a list of
- ``PubsubMessage`` objects.
+ with_attributes: If True, will match against both message data and
+ attributes. If True, expected_msg should be a list of ``PubsubMessage``
+ objects. Otherwise, it should be a list of ``bytes``.
strip_attributes: List of strings. If with_attributes==True, strip the
attributes keyed by these values from incoming messages.
If a key is missing, will add an attribute with an error message as
@@ -86,28 +86,26 @@
def _matches(self, _):
if self.messages is None:
- self.messages = self._wait_for_messages(self._get_subscription(),
- len(self.expected_msg),
+ self.messages = self._wait_for_messages(len(self.expected_msg),
self.timeout)
return Counter(self.messages) == Counter(self.expected_msg)
- def _get_subscription(self):
- return pubsub.Client(project=self.project).subscription(self.sub_name)
-
- def _wait_for_messages(self, subscription, expected_num, timeout):
+ def _wait_for_messages(self, expected_num, timeout):
"""Wait for messages from given subscription."""
- logging.debug('Start pulling messages from %s', subscription.full_name)
total_messages = []
+
+ sub_client = pubsub.SubscriberClient()
start_time = time.time()
while time.time() - start_time <= timeout:
- pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
- for ack_id, message in pulled:
- subscription.acknowledge([ack_id])
+ response = sub_client.pull(self.sub_name,
+ max_messages=MAX_MESSAGES_IN_ONE_PULL,
+ return_immediately=True)
+ for rm in response.received_messages:
+ msg = PubsubMessage._from_message(rm.message)
if not self.with_attributes:
- total_messages.append(message.data)
+ total_messages.append(msg.data)
continue
- msg = PubsubMessage._from_message(message)
if self.strip_attributes:
for attr in self.strip_attributes:
try:
@@ -117,12 +115,16 @@
'expected attribute not found.')
total_messages.append(msg)
+ ack_ids = [rm.ack_id for rm in response.received_messages]
+ if ack_ids:
+ sub_client.acknowledge(self.sub_name, ack_ids)
if len(total_messages) >= expected_num:
- return total_messages
+ break
time.sleep(1)
- logging.error('Timeout after %d sec. Received %d messages from %s.',
- timeout, len(total_messages), subscription.full_name)
+ if time.time() - start_time > timeout:
+ logging.error('Timeout after %d sec. Received %d messages from %s.',
+ timeout, len(total_messages), self.sub_name)
return total_messages
def describe_to(self, description):
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 0e59481..0f9351d 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -27,17 +27,19 @@
from apache_beam.io.gcp.pubsub import PubsubMessage
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.testing.test_utils import PullResponseMessage
+from apache_beam.testing.test_utils import create_pull_response
# Protect against environments where pubsub library is not available.
-# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
-# pylint: enable=wrong-import-order, wrong-import-position
@unittest.skipIf(pubsub is None, 'PubSub dependencies are not installed.')
+@mock.patch('time.sleep', return_value=None)
+@mock.patch('google.cloud.pubsub.SubscriberClient')
class PubSubMatcherTest(unittest.TestCase):
def setUp(self):
@@ -48,90 +50,75 @@
'mock_project', 'mock_sub_name', ['mock_expected_msg'],
with_attributes=with_attributes, strip_attributes=strip_attributes)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_success(self, mock_get_sub, unsued_mock):
self.init_matcher()
self.pubsub_matcher.expected_msg = ['a', 'b']
mock_sub = mock_get_sub.return_value
mock_sub.pull.side_effect = [
- [(1, pubsub.message.Message(b'a', 'unused_id'))],
- [(2, pubsub.message.Message(b'b', 'unused_id'))],
+ create_pull_response([PullResponseMessage(b'a', {})]),
+ create_pull_response([PullResponseMessage(b'b', {})]),
]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 2)
+ self.assertEqual(mock_sub.acknowledge.call_count, 2)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
- msg_a = pubsub.message.Message(b'a', 'unused_id')
- msg_a.attributes['k'] = 'v'
- mock_sub.pull.side_effect = [[(1, msg_a)]]
+ mock_sub.pull.side_effect = [
+ create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+ ]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
+ self.assertEqual(mock_sub.acknowledge.call_count, 1)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True)
self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
mock_sub = mock_get_sub.return_value
- msg_a = pubsub.message.Message(b'a', 'unused_id')
- msg_a.attributes['k'] = 'v' # Unexpected.
- mock_sub.pull.side_effect = [[(1, msg_a)]]
+ # Unexpected attribute 'k'.
+ mock_sub.pull.side_effect = [
+ create_pull_response([PullResponseMessage(b'a', {'k': 'v'})])
+ ]
with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
+ self.assertEqual(mock_sub.acknowledge.call_count, 1)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
- msg_a = pubsub.message.Message(b'a', 'unused_id')
- msg_a.attributes['id'] = 'foo'
- msg_a.attributes['timestamp'] = 'bar'
- msg_a.attributes['k'] = 'v'
- mock_sub.pull.side_effect = [[(1, msg_a)]]
+ mock_sub.pull.side_effect = [create_pull_response([
+ PullResponseMessage(b'a', {'id': 'foo', 'timestamp': 'bar', 'k': 'v'})
+ ])]
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
+ self.assertEqual(mock_sub.acknowledge.call_count, 1)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
self.init_matcher(with_attributes=True,
strip_attributes=['id', 'timestamp'])
self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
mock_sub = mock_get_sub.return_value
- # msg_a is missing attribute 'timestamp'.
- msg_a = pubsub.message.Message(b'a', 'unused_id')
- msg_a.attributes['id'] = 'foo'
- msg_a.attributes['k'] = 'v'
- mock_sub.pull.side_effect = [[(1, msg_a)]]
+ # Message is missing attribute 'timestamp'.
+ mock_sub.pull.side_effect = [create_pull_response([
+ PullResponseMessage(b'a', {'id': 'foo', 'k': 'v'})
+ ])]
with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertEqual(mock_sub.pull.call_count, 1)
+ self.assertEqual(mock_sub.acknowledge.call_count, 1)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
self.init_matcher()
self.pubsub_matcher.expected_msg = ['a']
mock_sub = mock_get_sub.return_value
- mock_sub.pull.return_value = [
- (1, pubsub.message.Message(b'c', 'unused_id')),
- (1, pubsub.message.Message(b'd', 'unused_id')),
+ mock_sub.pull.side_effect = [
+ create_pull_response([PullResponseMessage(b'c', {}),
+ PullResponseMessage(b'd', {})]),
]
with self.assertRaises(AssertionError) as error:
hc_assert_that(self.mock_presult, self.pubsub_matcher)
@@ -140,10 +127,9 @@
self.assertTrue(
'\nExpected: Expected 1 messages.\n but: Got 2 messages.'
in str(error.exception.args[0]))
+ self.assertEqual(mock_sub.pull.call_count, 1)
+ self.assertEqual(mock_sub.acknowledge.call_count, 1)
- @mock.patch('time.sleep', return_value=None)
- @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
- 'PubSubMessageMatcher._get_subscription')
def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
self.init_matcher()
mock_sub = mock_get_sub.return_value
@@ -152,6 +138,7 @@
with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'):
hc_assert_that(self.mock_presult, self.pubsub_matcher)
self.assertTrue(mock_sub.pull.called)
+ self.assertEqual(mock_sub.acknowledge.call_count, 0)
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 00e37f3..d410992 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -25,6 +25,7 @@
import itertools
import logging
+import time
from google.protobuf import wrappers_pb2
@@ -264,11 +265,12 @@
class _DirectWriteToPubSubFn(DoFn):
- _topic = None
+ BUFFER_SIZE_ELEMENTS = 100
+ FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
def __init__(self, sink):
self.project = sink.project
- self.topic_name = sink.topic_name
+ self.short_topic_name = sink.topic_name
self.id_label = sink.id_label
self.timestamp_attribute = sink.timestamp_attribute
self.with_attributes = sink.with_attributes
@@ -282,30 +284,33 @@
'supported for PubSub writes')
def start_bundle(self):
- from google.cloud import pubsub
-
- if self._topic is None:
- self._topic = pubsub.Client(project=self.project).topic(
- self.topic_name)
self._buffer = []
def process(self, elem):
self._buffer.append(elem)
- if len(self._buffer) >= 100:
+ if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
self._flush()
def finish_bundle(self):
self._flush()
def _flush(self):
- if self._buffer:
- with self._topic.batch() as batch:
- for elem in self._buffer:
- if self.with_attributes:
- batch.publish(elem.data, **elem.attributes)
- else:
- batch.publish(elem)
- self._buffer = []
+ from google.cloud import pubsub
+ pub_client = pubsub.PublisherClient()
+ topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+ if self.with_attributes:
+ futures = [pub_client.publish(topic, elem.data, **elem.attributes)
+ for elem in self._buffer]
+ else:
+ futures = [pub_client.publish(topic, elem)
+ for elem in self._buffer]
+
+ timer_start = time.time()
+ for future in futures:
+ remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+ future.result(remaining)
+ self._buffer = []
def _get_pubsub_transform_overrides(pipeline_options):
diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
index 8facca8..23dfeab 100644
--- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
@@ -52,5 +52,6 @@
finally:
if not PipelineState.is_terminal(self.result.state):
self.result.cancel()
+ self.result.wait_until_finish()
return self.result
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ef12e2c..fad0704 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -377,20 +377,44 @@
class _PubSubSubscriptionWrapper(object):
- """Wrapper for garbage-collecting temporary PubSub subscriptions."""
+ """Wrapper for managing temporary PubSub subscriptions."""
- def __init__(self, subscription, should_cleanup):
- self.subscription = subscription
- self.should_cleanup = should_cleanup
+ def __init__(self, project, short_topic_name, short_sub_name):
+ """Initialize subscription wrapper.
+
+ If sub_name is None, will create a temporary subscription to topic_name.
+
+ Args:
+ project: GCP project name for topic and subscription. May be None.
+ Required if sub_name is None.
+ short_topic_name: Valid topic name without
+ 'projects/{project}/topics/' prefix. May be None.
+ Required if sub_name is None.
+ short_sub_name: Valid subscription name without
+ 'projects/{project}/subscriptions/' prefix. May be None.
+ """
+ from google.cloud import pubsub
+ self.sub_client = pubsub.SubscriberClient()
+
+ if short_sub_name is None:
+ self.sub_name = self.sub_client.subscription_path(
+ project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+ topic_name = self.sub_client.topic_path(project, short_topic_name)
+ self.sub_client.create_subscription(self.sub_name, topic_name)
+ self._should_cleanup = True
+ else:
+ self.sub_name = self.sub_client.subscription_path(project, short_sub_name)
+ self._should_cleanup = False
def __del__(self):
- if self.should_cleanup:
- self.subscription.delete()
+ if self._should_cleanup:
+ self.sub_client.delete_subscription(self.sub_name)
class _PubSubReadEvaluator(_TransformEvaluator):
"""TransformEvaluator for PubSub read."""
+ # A mapping of transform to _PubSubSubscriptionWrapper.
_subscription_cache = {}
def __init__(self, evaluation_context, applied_ptransform,
@@ -404,26 +428,16 @@
if self.source.id_label:
raise NotImplementedError(
'DirectRunner: id_label is not supported for PubSub reads')
- self._subscription = _PubSubReadEvaluator.get_subscription(
+ self._sub_name = _PubSubReadEvaluator.get_subscription(
self._applied_ptransform, self.source.project, self.source.topic_name,
self.source.subscription_name)
@classmethod
- def get_subscription(cls, transform, project, topic, subscription_name):
+ def get_subscription(cls, transform, project, topic, short_sub_name):
if transform not in cls._subscription_cache:
- from google.cloud import pubsub
- should_create = not subscription_name
- if should_create:
- subscription_name = 'beam_%d_%x' % (
- int(time.time()), random.randrange(1 << 32))
- wrapper = _PubSubSubscriptionWrapper(
- pubsub.Client(project=project).topic(topic).subscription(
- subscription_name),
- should_create)
- if should_create:
- wrapper.subscription.create()
+ wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
cls._subscription_cache[transform] = wrapper
- return cls._subscription_cache[transform].subscription
+ return cls._subscription_cache[transform].sub_name
def start_bundle(self):
pass
@@ -438,28 +452,34 @@
# evaluator fails with an exception before emitting a bundle. However,
# the DirectRunner currently doesn't retry work items anyway, so the
# pipeline would enter an inconsistent state on any error.
- with pubsub.subscription.AutoAck(
- self._subscription, return_immediately=True,
- max_messages=10) as results:
- def _get_element(message):
- parsed_message = PubsubMessage._from_message(message)
- if (timestamp_attribute and
- timestamp_attribute in parsed_message.attributes):
- rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+ sub_client = pubsub.SubscriberClient()
+ response = sub_client.pull(self._sub_name, max_messages=10,
+ return_immediately=True)
+
+ def _get_element(message):
+ parsed_message = PubsubMessage._from_message(message)
+ if (timestamp_attribute and
+ timestamp_attribute in parsed_message.attributes):
+ rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+ try:
+ timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+ except ValueError:
try:
- timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
- except ValueError:
- try:
- timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
- except ValueError as e:
- raise ValueError('Bad timestamp value: %s' % e)
- else:
- timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+ timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+ except ValueError as e:
+ raise ValueError('Bad timestamp value: %s' % e)
+ else:
+ timestamp = Timestamp(message.publish_time.seconds,
+ message.publish_time.nanos // 1000)
- return timestamp, parsed_message
+ return timestamp, parsed_message
- return [_get_element(message)
- for unused_ack_id, message in iteritems(results)]
+ results = [_get_element(rm.message) for rm in response.received_messages]
+ ack_ids = [rm.ack_id for rm in response.received_messages]
+ if ack_ids:
+ sub_client.acknowledge(self._sub_name, ack_ids)
+
+ return results
def finish_bundle(self):
data = self._read_from_pubsub(self.source.timestamp_attribute)
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 490d079..1f0e99e 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -24,11 +24,9 @@
import hashlib
import imp
-import logging
import os
import shutil
import tempfile
-import time
from builtins import object
from mock import Mock
@@ -136,46 +134,61 @@
FileSystems.delete(file_paths)
-def wait_for_subscriptions_created(subs, timeout=60):
- """Wait for all PubSub subscriptions are created."""
- return _wait_until_all_exist(subs, timeout)
-
-
-def wait_for_topics_created(topics, timeout=60):
- """Wait for all PubSub topics are created."""
- return _wait_until_all_exist(topics, timeout)
-
-
-def _wait_until_all_exist(components, timeout):
- unchecked_components = set(components)
- start_time = time.time()
- while time.time() - start_time <= timeout:
- unchecked_components = set(
- [c for c in unchecked_components if not c.exists()])
- if len(unchecked_components) == 0:
- return True
- time.sleep(2)
-
- raise RuntimeError(
- 'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
- 'They are %s.' % (timeout, len(unchecked_components),
- len(components), list(unchecked_components)))
-
-
-def cleanup_subscriptions(subs):
+def cleanup_subscriptions(sub_client, subs):
"""Cleanup PubSub subscriptions if exist."""
- _cleanup_pubsub(subs)
+ for sub in subs:
+ sub_client.delete_subscription(sub.name)
-def cleanup_topics(topics):
+def cleanup_topics(pub_client, topics):
"""Cleanup PubSub topics if exist."""
- _cleanup_pubsub(topics)
+ for topic in topics:
+ pub_client.delete_topic(topic.name)
-def _cleanup_pubsub(components):
- for c in components:
- if c.exists():
- c.delete()
- else:
- logging.debug('Cannot delete topic/subscription. %s does not exist.',
- c.full_name)
+class PullResponseMessage(object):
+ """Data representing a pull request response.
+
+ Utility class for ``create_pull_response``.
+ """
+ def __init__(self, data, attributes=None,
+ publish_time_secs=None, publish_time_nanos=None, ack_id=None):
+ self.data = data
+ self.attributes = attributes
+ self.publish_time_secs = publish_time_secs
+ self.publish_time_nanos = publish_time_nanos
+ self.ack_id = ack_id
+
+
+def create_pull_response(responses):
+ """Create an instance of ``google.cloud.pubsub.types.ReceivedMessage``.
+
+ Used to simulate the response from pubsub.SubscriberClient().pull().
+
+ Args:
+ responses: list of ``PullResponseMessage``
+
+ Returns:
+ An instance of ``google.cloud.pubsub.types.PullResponse`` populated with
+ responses.
+ """
+ from google.cloud import pubsub
+
+ res = pubsub.types.PullResponse()
+ for response in responses:
+ received_message = res.received_messages.add()
+
+ message = received_message.message
+ message.data = response.data
+ if response.attributes is not None:
+ for k, v in response.attributes.items():
+ message.attributes[k] = v
+ if response.publish_time_secs is not None:
+ message.publish_time.seconds = response.publish_time_secs
+ if response.publish_time_nanos is not None:
+ message.publish_time.nanos = response.publish_time_nanos
+
+ if response.ack_id is not None:
+ received_message.ack_id = response.ack_id
+
+ return res
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
index bef4078..2b16c30c 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -82,56 +82,19 @@
self.assertEqual(f.readline(), b'line2\n')
self.assertEqual(f.readline(), b'line3\n')
- @mock.patch('time.sleep', return_value=None)
- def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
- sub1 = mock.MagicMock()
- sub1.exists.return_value = True
- sub2 = mock.MagicMock()
- sub2.exists.return_value = False
- with self.assertRaises(RuntimeError) as error:
- utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
- self.assertTrue(sub1.exists.called)
- self.assertTrue(sub2.exists.called)
- self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
- @mock.patch('time.sleep', return_value=None)
- def test_wait_for_topics_created_fails(self, patched_time_sleep):
- topic1 = mock.MagicMock()
- topic1.exists.return_value = True
- topic2 = mock.MagicMock()
- topic2.exists.return_value = False
- with self.assertRaises(RuntimeError) as error:
- utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
- self.assertTrue(topic1.exists.called)
- self.assertTrue(topic2.exists.called)
- self.assertTrue(error.exception.args[0].startswith('Timeout after'))
-
- @mock.patch('time.sleep', return_value=None)
- def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
- sub1 = mock.MagicMock()
- sub1.exists.return_value = True
- self.assertTrue(
- utils.wait_for_subscriptions_created([sub1], timeout=0.1))
-
- @mock.patch('time.sleep', return_value=None)
- def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
- topic1 = mock.MagicMock()
- topic1.exists.return_value = True
- self.assertTrue(
- utils.wait_for_subscriptions_created([topic1], timeout=0.1))
- self.assertTrue(topic1.exists.called)
-
def test_cleanup_subscriptions(self):
- mock_sub = mock.MagicMock()
- mock_sub.exist.return_value = True
- utils.cleanup_subscriptions([mock_sub])
- self.assertTrue(mock_sub.delete.called)
+ sub_client = mock.Mock()
+ sub = mock.Mock()
+ sub.name = 'test_sub'
+ utils.cleanup_subscriptions(sub_client, [sub])
+ sub_client.delete_subscription.assert_called_with(sub.name)
def test_cleanup_topics(self):
- mock_topics = mock.MagicMock()
- mock_topics.exist.return_value = True
- utils.cleanup_subscriptions([mock_topics])
- self.assertTrue(mock_topics.delete.called)
+ pub_client = mock.Mock()
+ topic = mock.Mock()
+ topic.name = 'test_topic'
+ utils.cleanup_topics(pub_client, [topic])
+ pub_client.delete_topic.assert_called_with(topic.name)
if __name__ == '__main__':
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index a32bc44..8e352a6 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -47,10 +47,9 @@
# GCP extra features
google-apitools==0.5.20
googledatastore==7.0.1
-google-cloud-pubsub==0.26.0
+google-cloud-pubsub==0.35.4
google-cloud-bigquery==0.25.0
proto-google-cloud-datastore-v1==0.90.4
-proto-google-cloud-pubsub-v1==0.15.4
# Optional packages
cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 9ee4bdf..ce94945 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -141,8 +141,7 @@
'google-apitools>=0.5.18,<=0.5.20',
'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4',
'googledatastore==7.0.1',
- 'google-cloud-pubsub==0.26.0',
- 'proto-google-cloud-pubsub-v1==0.15.4',
+ 'google-cloud-pubsub==0.35.4',
# GCP packages required by tests
'google-cloud-bigquery==0.25.0',
]