blob: 7b4a4d5c93b90581afccc98e4e0ef17a82a7dad8 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for PubSub sources and sinks."""
# pytype: skip-file
import logging
import unittest
import hamcrest as hc
import mock
import apache_beam as beam
from apache_beam.io import Read
from apache_beam.io import Write
from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub
from apache_beam.io.gcp.pubsub import PubsubMessage
from apache_beam.io.gcp.pubsub import PubSubSourceDescriptor
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
from apache_beam.io.gcp.pubsub import WriteToPubSub
from apache_beam.io.gcp.pubsub import _PubSubSink
from apache_beam.io.gcp.pubsub import _PubSubSource
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import pipeline_context
from apache_beam.runners.direct import transform_evaluator
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
from apache_beam.runners.direct.direct_runner import _get_transform_overrides
from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
from apache_beam.testing import test_utils
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import window
from apache_beam.transforms.core import Create
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp
# Protect against environments where the PubSub library is not available.
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
class TestPubsubMessage(unittest.TestCase):
def test_payload_valid(self):
_ = PubsubMessage('', None)
_ = PubsubMessage('data', None)
_ = PubsubMessage(None, {'k': 'v'})
def test_payload_invalid(self):
with self.assertRaisesRegex(ValueError, r'data.*attributes.*must be set'):
_ = PubsubMessage(None, None)
with self.assertRaisesRegex(ValueError, r'data.*attributes.*must be set'):
_ = PubsubMessage(None, {})
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
def test_proto_conversion(self):
data = b'data'
attributes = {'k1': 'v1', 'k2': 'v2'}
m = PubsubMessage(data, attributes)
m_converted = PubsubMessage._from_proto_str(m._to_proto_str())
self.assertEqual(m_converted.data, data)
self.assertEqual(m_converted.attributes, attributes)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
def test_payload_publish_invalid(self):
with self.assertRaisesRegex(ValueError, r'data field.*10MB'):
msg = PubsubMessage(b'0' * 1024 * 1024 * 11, None)
msg._to_proto_str(for_publish=True)
with self.assertRaisesRegex(ValueError, 'attribute key'):
msg = PubsubMessage(b'0', {'0' * 257: '0'})
msg._to_proto_str(for_publish=True)
with self.assertRaisesRegex(ValueError, 'attribute value'):
msg = PubsubMessage(b'0', {'0' * 100: '0' * 1025})
msg._to_proto_str(for_publish=True)
with self.assertRaisesRegex(ValueError, '100 attributes'):
attributes = {}
for i in range(0, 101):
attributes[str(i)] = str(i)
msg = PubsubMessage(b'0', attributes)
msg._to_proto_str(for_publish=True)
with self.assertRaisesRegex(ValueError, 'ordering key'):
msg = PubsubMessage(b'0', None, ordering_key='0' * 1301)
msg._to_proto_str(for_publish=True)
def test_eq(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(a == b)
self.assertTrue(a != c)
self.assertTrue(b != c)
def test_hash(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(hash(a) == hash(b))
self.assertTrue(hash(a) != hash(c))
self.assertTrue(hash(b) != hash(c))
def test_repr(self):
a = PubsubMessage(b'abc', {1: 2, 3: 4})
b = PubsubMessage(b'abc', {1: 2, 3: 4})
c = PubsubMessage(b'abc', {1: 2})
self.assertTrue(repr(a) == repr(b))
self.assertTrue(repr(a) != repr(c))
self.assertTrue(repr(b) != repr(c))
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSubOverride(unittest.TestCase):
def test_expand_with_topic(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
'a_label',
with_attributes=False,
timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(bytes, pcoll.element_type)
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
# Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
source = read_transform._source
self.assertEqual('a_topic', source.topic_name)
self.assertEqual('a_label', source.id_label)
def test_expand_with_subscription(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (
p
| ReadFromPubSub(
None,
'projects/fakeprj/subscriptions/a_subscription',
'a_label',
with_attributes=False,
timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(bytes, pcoll.element_type)
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
# Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
source = read_transform._source
self.assertEqual('a_subscription', source.subscription_name)
self.assertEqual('a_label', source.id_label)
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegex(
ValueError, "Either a topic or subscription must be provided."):
ReadFromPubSub(
None,
None,
'a_label',
with_attributes=False,
timestamp_attribute=None)
def test_expand_with_both_topic_and_subscription(self):
with self.assertRaisesRegex(
ValueError, "Only one of topic or subscription should be provided."):
ReadFromPubSub(
'a_topic',
'a_subscription',
'a_label',
with_attributes=False,
timestamp_attribute=None)
def test_expand_with_other_options(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
'a_label',
with_attributes=True,
timestamp_attribute='time')
| beam.Map(lambda x: x))
self.assertEqual(PubsubMessage, pcoll.element_type)
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
# Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
source = read_transform._source
self.assertTrue(source.with_attributes)
self.assertEqual('time', source.timestamp_attribute)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestMultiReadFromPubSubOverride(unittest.TestCase):
def test_expand_with_multiple_sources(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
topics = [
'projects/fakeprj/topics/a_topic', 'projects/fakeprj2/topics/b_topic'
]
subscriptions = ['projects/fakeprj/subscriptions/a_subscription']
pubsub_sources = [
PubSubSourceDescriptor(descriptor)
for descriptor in topics + subscriptions
]
pcoll = (p | MultipleReadFromPubSub(pubsub_sources) | beam.Map(lambda x: x))
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
self.assertEqual(bytes, pcoll.element_type)
# Ensure that the sources are passed through correctly
read_transforms = pcoll.producer.inputs[0].producer.inputs
topics_list = []
subscription_list = []
for read_transform in read_transforms:
source = read_transform.producer.transform._source
if source.full_topic:
topics_list.append(source.full_topic)
else:
subscription_list.append(source.full_subscription)
self.assertEqual(topics_list, topics)
self.assertEqual(subscription_list, subscriptions)
def test_expand_with_multiple_sources_and_attributes(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
topics = [
'projects/fakeprj/topics/a_topic', 'projects/fakeprj2/topics/b_topic'
]
subscriptions = ['projects/fakeprj/subscriptions/a_subscription']
pubsub_sources = [
PubSubSourceDescriptor(descriptor)
for descriptor in topics + subscriptions
]
pcoll = (
p | MultipleReadFromPubSub(pubsub_sources, with_attributes=True)
| beam.Map(lambda x: x))
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
self.assertEqual(PubsubMessage, pcoll.element_type)
# Ensure that the sources are passed through correctly
read_transforms = pcoll.producer.inputs[0].producer.inputs
topics_list = []
subscription_list = []
for read_transform in read_transforms:
source = read_transform.producer.transform._source
if source.full_topic:
topics_list.append(source.full_topic)
else:
subscription_list.append(source.full_subscription)
self.assertEqual(topics_list, topics)
self.assertEqual(subscription_list, subscriptions)
def test_expand_with_multiple_sources_and_other_options(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
sources = [
'projects/fakeprj/topics/a_topic',
'projects/fakeprj2/topics/b_topic',
'projects/fakeprj/subscriptions/a_subscription'
]
id_labels = ['a_label_topic', 'b_label_topic', 'a_label_subscription']
timestamp_attributes = ['a_ta_topic', 'b_ta_topic', 'a_ta_subscription']
pubsub_sources = [
PubSubSourceDescriptor(
source=source,
id_label=id_label,
timestamp_attribute=timestamp_attribute) for source,
id_label,
timestamp_attribute in zip(sources, id_labels, timestamp_attributes)
]
pcoll = (p | MultipleReadFromPubSub(pubsub_sources) | beam.Map(lambda x: x))
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
self.assertEqual(bytes, pcoll.element_type)
# Ensure that the sources are passed through correctly
read_transforms = pcoll.producer.inputs[0].producer.inputs
for i, read_transform in enumerate(read_transforms):
id_label = id_labels[i]
timestamp_attribute = timestamp_attributes[i]
source = read_transform.producer.transform._source
self.assertEqual(source.id_label, id_label)
self.assertEqual(source.with_attributes, False)
self.assertEqual(source.timestamp_attribute, timestamp_attribute)
def test_expand_with_wrong_source(self):
with self.assertRaisesRegex(
ValueError,
r'PubSub source descriptor must be in the form '
r'"projects/<project>/topics/<topic>"'
' or "projects/<project>/subscription/<subscription>".*'):
MultipleReadFromPubSub([PubSubSourceDescriptor('not_a_proper_source')])
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestWriteStringsToPubSubOverride(unittest.TestCase):
def test_expand_deprecated(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (
p
| ReadFromPubSub('projects/fakeprj/topics/baz')
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic')
| beam.Map(lambda x: x))
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
# Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
write_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
def test_expand(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (
p
| ReadFromPubSub('projects/fakeprj/topics/baz')
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True)
| beam.Map(lambda x: x))
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(options)
p.replace_all(overrides)
# Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
write_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
self.assertEqual(True, write_transform.dofn.with_attributes)
# TODO(https://github.com/apache/beam/issues/18939): These properties
# aren't supported yet in direct runner.
self.assertEqual(None, write_transform.dofn.id_label)
self.assertEqual(None, write_transform.dofn.timestamp_attribute)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestPubSubSource(unittest.TestCase):
def test_display_data_topic(self):
source = _PubSubSource('projects/fakeprj/topics/a_topic', None, 'a_label')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
DisplayDataItemMatcher('id_label', 'a_label'),
DisplayDataItemMatcher('with_attributes', False),
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_display_data_subscription(self):
source = _PubSubSource(
None, 'projects/fakeprj/subscriptions/a_subscription', 'a_label')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher(
'subscription', 'projects/fakeprj/subscriptions/a_subscription'),
DisplayDataItemMatcher('id_label', 'a_label'),
DisplayDataItemMatcher('with_attributes', False),
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
def test_display_data_no_subscription(self):
source = _PubSubSource('projects/fakeprj/topics/a_topic')
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
DisplayDataItemMatcher('with_attributes', False),
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestPubSubSink(unittest.TestCase):
def test_display_data(self):
sink = WriteToPubSub(
'projects/fakeprj/topics/a_topic',
id_label='id',
timestamp_attribute='time')
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
DisplayDataItemMatcher('id_label', 'id'),
DisplayDataItemMatcher('with_attributes', True),
DisplayDataItemMatcher('timestamp_attribute', 'time'),
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
class TestPubSubReadEvaluator(object):
"""Wrapper of _PubSubReadEvaluator that makes it bounded."""
_pubsub_read_evaluator = _PubSubReadEvaluator
def __init__(self, *args, **kwargs):
self._evaluator = self._pubsub_read_evaluator(*args, **kwargs)
def start_bundle(self):
return self._evaluator.start_bundle()
def process_element(self, element):
return self._evaluator.process_element(element)
def finish_bundle(self):
result = self._evaluator.finish_bundle()
result.unprocessed_bundles = []
result.keyed_watermark_holds = {None: None}
return result
transform_evaluator.TransformEvaluatorRegistry._test_evaluators_overrides = {
_DirectReadFromPubSub: TestPubSubReadEvaluator, # type: ignore[dict-item]
}
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('google.cloud.pubsub.SubscriberClient')
class TestReadFromPubSub(unittest.TestCase):
def test_read_messages_success(self, mock_pubsub):
data = b'data'
publish_time_secs = 1520861821
publish_time_nanos = 234567000
attributes = {'key': 'value'}
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp(1520861821.234567), [window.GlobalWindow()])
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_strings_success(self, mock_pubsub):
data = '🤷 ¯\\_(ツ)_/¯'
data_encoded = data.encode('utf-8')
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response(
[test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
expected_elements = [data]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadStringsFromPubSub(
'projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_data_success(self, mock_pubsub):
data_encoded = '🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response(
[test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
expected_elements = [data_encoded]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
data = b'data'
attributes = {'time': '1337'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp(micros=int(attributes['time']) * 1000),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True,
timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
data = b'data'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
publish_time_secs = 1337000000
publish_time_nanos = 133700000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(attributes['time']),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True,
timestamp_attribute='time'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
data = b'data'
attributes = {}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
publish_time = '2018-03-12T13:37:01.234567Z'
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(publish_time),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
pcoll = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True,
timestamp_attribute='nonexistent'))
assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
mock_pubsub.return_value.acknowledge.assert_has_calls(
[mock.call(subscription=mock.ANY, ack_ids=[ack_id])])
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
data = b'data'
attributes = {'time': '1337 unparseable'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True,
timestamp_attribute='time'))
with self.assertRaisesRegex(ValueError, r'parse'):
p.run()
mock_pubsub.return_value.acknowledge.assert_not_called()
mock_pubsub.return_value.close.assert_has_calls([mock.call()])
def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
# id_label is unsupported in DirectRunner.
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(NotImplementedError,
r'id_label is not supported'):
with TestPipeline(options=options) as p:
_ = (
p | ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, 'a_label'))
def test_runner_api_transformation_with_topic(self, unused_mock_pubsub):
source = _PubSubSource(
topic='projects/fakeprj/topics/a_topic',
subscription=None,
id_label='a_label',
timestamp_attribute='b_label',
with_attributes=True)
transform = Read(source)
context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_READ.urn, proto_transform_spec.urn)
pubsub_read_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubReadPayload))
self.assertEqual(
'projects/fakeprj/topics/a_topic', pubsub_read_payload.topic)
self.assertEqual('a_label', pubsub_read_payload.id_attribute)
self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
self.assertEqual('', pubsub_read_payload.subscription)
self.assertTrue(pubsub_read_payload.with_attributes)
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Read.from_runner_api_parameter(
proto_transform, pubsub_read_payload, None)
self.assertTrue(isinstance(transform_from_proto, Read))
self.assertTrue(isinstance(transform_from_proto.source, _PubSubSource))
self.assertEqual(
'projects/fakeprj/topics/a_topic',
transform_from_proto.source.full_topic)
self.assertTrue(transform_from_proto.source.with_attributes)
def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
# Confirming that properties stay None after a runner API transformation.
source = _PubSubSource(
topic='projects/fakeprj/topics/a_topic', with_attributes=True)
transform = Read(source)
context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_READ.urn, proto_transform_spec.urn)
pubsub_read_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubReadPayload))
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Read.from_runner_api_parameter(
proto_transform, pubsub_read_payload, None)
self.assertIsNone(transform_from_proto.source.full_subscription)
self.assertIsNone(transform_from_proto.source.id_label)
self.assertIsNone(transform_from_proto.source.timestamp_attribute)
def test_runner_api_transformation_with_subscription(
self, unused_mock_pubsub):
source = _PubSubSource(
topic=None,
subscription='projects/fakeprj/subscriptions/a_subscription',
id_label='a_label',
timestamp_attribute='b_label',
with_attributes=True)
transform = Read(source)
context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_READ.urn, proto_transform_spec.urn)
pubsub_read_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubReadPayload))
self.assertEqual(
'projects/fakeprj/subscriptions/a_subscription',
pubsub_read_payload.subscription)
self.assertEqual('a_label', pubsub_read_payload.id_attribute)
self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
self.assertEqual('', pubsub_read_payload.topic)
self.assertTrue(pubsub_read_payload.with_attributes)
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Read.from_runner_api_parameter(
proto_transform, pubsub_read_payload, None)
self.assertTrue(isinstance(transform_from_proto, Read))
self.assertTrue(isinstance(transform_from_proto.source, _PubSubSource))
self.assertTrue(transform_from_proto.source.with_attributes)
self.assertEqual(
'projects/fakeprj/subscriptions/a_subscription',
transform_from_proto.source.full_subscription)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('google.cloud.pubsub.PublisherClient')
class TestWriteToPubSub(unittest.TestCase):
def test_write_messages_success(self, mock_pubsub):
data = 'data'
payloads = [data]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=False))
mock_pubsub.return_value.publish.assert_has_calls(
[mock.call(mock.ANY, data)])
def test_write_messages_deprecated(self, mock_pubsub):
data = 'data'
data_bytes = b'data'
payloads = [data]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
mock_pubsub.return_value.publish.assert_has_calls(
[mock.call(mock.ANY, data_bytes)])
def test_write_messages_with_attributes_success(self, mock_pubsub):
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))
mock_pubsub.return_value.publish.assert_has_calls(
[mock.call(mock.ANY, data, **attributes)])
def test_write_messages_with_attributes_error(self, mock_pubsub):
data = 'data'
# Sending raw data when WriteToPubSub expects a PubsubMessage object.
payloads = [data]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(AttributeError, r'str.*has no attribute.*data'):
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', with_attributes=True))
def test_write_messages_unsupported_features(self, mock_pubsub):
data = b'data'
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(NotImplementedError,
r'id_label is not supported'):
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic', id_label='a_label'))
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(NotImplementedError,
r'timestamp_attribute is not supported'):
with TestPipeline(options=options) as p:
_ = (
p
| Create(payloads)
| WriteToPubSub(
'projects/fakeprj/topics/a_topic',
timestamp_attribute='timestamp'))
def test_runner_api_transformation(self, unused_mock_pubsub):
sink = _PubSubSink(
topic='projects/fakeprj/topics/a_topic',
id_label=None,
# We expect encoded PubSub write transform to always return attributes.
timestamp_attribute=None)
transform = Write(sink)
context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_WRITE.urn, proto_transform_spec.urn)
pubsub_write_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubWritePayload))
self.assertEqual(
'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Write.from_runner_api_parameter(
proto_transform, pubsub_write_payload, None)
self.assertTrue(isinstance(transform_from_proto, Write))
self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
self.assertEqual(
'projects/fakeprj/topics/a_topic', transform_from_proto.sink.full_topic)
def test_runner_api_transformation_properties_none(self, unused_mock_pubsub):
# Confirming that properties stay None after a runner API transformation.
sink = _PubSubSink(
topic='projects/fakeprj/topics/a_topic',
id_label=None,
# We expect encoded PubSub write transform to always return attributes.
timestamp_attribute=None)
transform = Write(sink)
context = pipeline_context.PipelineContext()
proto_transform_spec = transform.to_runner_api(context)
self.assertEqual(
common_urns.composites.PUBSUB_WRITE.urn, proto_transform_spec.urn)
pubsub_write_payload = (
proto_utils.parse_Bytes(
proto_transform_spec.payload,
beam_runner_api_pb2.PubSubWritePayload))
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
transform_from_proto = Write.from_runner_api_parameter(
proto_transform, pubsub_write_payload, None)
self.assertTrue(isinstance(transform_from_proto, Write))
self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
self.assertIsNone(transform_from_proto.sink.id_label)
self.assertIsNone(transform_from_proto.sink.timestamp_attribute)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()