blob: 8387fe734fc1136c7981e38e4f6674701dde9d76 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Integration test for Google Cloud Pub/Sub.
"""
# pytype: skip-file
import logging
import time
import unittest
import uuid
import pytest
from hamcrest.core.core.allof import all_of
from apache_beam.io.gcp import pubsub_it_pipeline
from apache_beam.io.gcp.pubsub import PubsubMessage
from apache_beam.io.gcp.pubsub import WriteToPubSub
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
INPUT_TOPIC = 'psit_topic_input'
OUTPUT_TOPIC = 'psit_topic_output'
INPUT_SUB = 'psit_subscription_input'
OUTPUT_SUB = 'psit_subscription_output'
# How long TestXXXRunner will wait for pubsub_it_pipeline to run before
# cancelling it.
TEST_PIPELINE_DURATION_MS = 10 * 60 * 1000
# How long PubSubMessageMatcher will wait for the correct set of messages to
# appear.
MESSAGE_MATCHER_TIMEOUT_S = 10 * 60
class PubSubIntegrationTest(unittest.TestCase):
ID_LABEL = 'id'
TIMESTAMP_ATTRIBUTE = 'timestamp'
INPUT_MESSAGES = {
# TODO(https://github.com/apache/beam/issues/18939): DirectRunner doesn't
# support reading or writing label_ids, nor writing timestamp attributes.
# Once these features exist, TestDirectRunner and TestDataflowRunner
# should behave identically.
'TestDirectRunner': [
PubsubMessage(b'data001', {}),
# For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
# IT pipeline writes back the timestamp of each element (as reported
# by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
PubsubMessage(
b'data002', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
}),
PubsubMessage(b'data003\xab\xac', {}),
PubsubMessage(
b'data004\xab\xac', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
})
],
'TestDataflowRunner': [
# Use ID_LABEL attribute to deduplicate messages with the same ID.
PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
PubsubMessage(b'data001', {ID_LABEL: 'foo'}),
# For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the
# IT pipeline writes back the timestamp of each element (as reported
# by Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
PubsubMessage(
b'data002', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
}),
PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
PubsubMessage(b'data003\xab\xac', {ID_LABEL: 'foo2'}),
PubsubMessage(
b'data004\xab\xac', {
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
})
],
}
EXPECTED_OUTPUT_MESSAGES = {
'TestDirectRunner': [
PubsubMessage(b'data001-seen', {'processed': 'IT'}),
PubsubMessage(
b'data002-seen',
{
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
}),
PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}),
PubsubMessage(
b'data004\xab\xac-seen',
{
TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
})
],
'TestDataflowRunner': [
PubsubMessage(b'data001-seen', {'processed': 'IT'}),
PubsubMessage(
b'data002-seen',
{
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
}),
PubsubMessage(b'data003\xab\xac-seen', {'processed': 'IT'}),
PubsubMessage(
b'data004\xab\xac-seen',
{
TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
'processed': 'IT',
})
],
}
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
self.uuid = str(uuid.uuid4())
# Set up PubSub environment.
from google.cloud import pubsub
self.pub_client = pubsub.PublisherClient()
self.input_topic = self.pub_client.create_topic(
name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid))
self.output_topic = self.pub_client.create_topic(
name=self.pub_client.topic_path(self.project, OUTPUT_TOPIC + self.uuid))
self.sub_client = pubsub.SubscriberClient()
self.input_sub = self.sub_client.create_subscription(
name=self.sub_client.subscription_path(
self.project, INPUT_SUB + self.uuid),
topic=self.input_topic.name)
self.output_sub = self.sub_client.create_subscription(
name=self.sub_client.subscription_path(
self.project, OUTPUT_SUB + self.uuid),
topic=self.output_topic.name)
# Add a 30 second sleep after resource creation to ensure subscriptions will
# receive messages.
time.sleep(30)
def tearDown(self):
test_utils.cleanup_subscriptions(
self.sub_client, [self.input_sub, self.output_sub])
test_utils.cleanup_topics(
self.pub_client, [self.input_topic, self.output_topic])
def _test_streaming(self, with_attributes):
"""Runs IT pipeline with message verifier.
Args:
with_attributes: False - Reads and writes message data only.
True - Reads and writes message data and attributes. Also verifies
id_label and timestamp_attribute features.
"""
# Set on_success_matcher to verify pipeline state and pubsub output. These
# verifications run on a (remote) worker.
# Expect the state to be RUNNING since a streaming pipeline is usually
# never DONE. The test runner will cancel the pipeline after verification.
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
expected_messages = self.EXPECTED_OUTPUT_MESSAGES[self.runner_name]
if not with_attributes:
expected_messages = [pubsub_msg.data for pubsub_msg in expected_messages]
if self.runner_name == 'TestDirectRunner':
strip_attributes = None
else:
strip_attributes = [self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE]
pubsub_msg_verifier = PubSubMessageMatcher(
self.project,
self.output_sub.name,
expected_messages,
timeout=MESSAGE_MATCHER_TIMEOUT_S,
with_attributes=with_attributes,
strip_attributes=strip_attributes)
extra_opts = {
'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
}
# Generate input data and inject to PubSub.
for msg in self.INPUT_MESSAGES[self.runner_name]:
self.pub_client.publish(
self.input_topic.name, msg.data, **msg.attributes).result()
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
pubsub_it_pipeline.run_pipeline(
argv=self.test_pipeline.get_full_options_as_args(**extra_opts),
with_attributes=with_attributes,
id_label=self.ID_LABEL,
timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
@pytest.mark.it_postcommit
def test_streaming_data_only(self):
self._test_streaming(with_attributes=False)
@pytest.mark.it_postcommit
def test_streaming_with_attributes(self):
self._test_streaming(with_attributes=True)
def _test_batch_write(self, with_attributes):
"""Tests batch mode WriteToPubSub functionality.
Args:
with_attributes: False - Writes message data only.
True - Writes message data and attributes.
"""
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import Create
# Create test messages for batch mode
test_messages = [
PubsubMessage(b'batch_data001', {'batch_attr': 'value1'}),
PubsubMessage(b'batch_data002', {'batch_attr': 'value2'}),
PubsubMessage(b'batch_data003', {'batch_attr': 'value3'})
]
pipeline_options = PipelineOptions()
# Explicitly set streaming to False for batch mode
pipeline_options.view_as(StandardOptions).streaming = False
with TestPipeline(options=pipeline_options) as p:
if with_attributes:
messages = p | 'CreateMessages' >> Create(test_messages)
_ = messages | 'WriteToPubSub' >> WriteToPubSub(
self.output_topic.name, with_attributes=True)
else:
# For data-only mode, extract just the data
message_data = [msg.data for msg in test_messages]
messages = p | 'CreateData' >> Create(message_data)
_ = messages | 'WriteToPubSub' >> WriteToPubSub(
self.output_topic.name, with_attributes=False)
# Verify messages were published by reading from the subscription
time.sleep(10) # Allow time for messages to be published and received
# Pull messages from the output subscription to verify they were written
response = self.sub_client.pull(
request={
"subscription": self.output_sub.name,
"max_messages": 10,
})
received_messages = []
for received_message in response.received_messages:
if with_attributes:
# Parse attributes
attrs = dict(received_message.message.attributes)
received_messages.append(
PubsubMessage(received_message.message.data, attrs))
else:
received_messages.append(received_message.message.data)
# Acknowledge the message
self.sub_client.acknowledge(
request={
"subscription": self.output_sub.name,
"ack_ids": [received_message.ack_id],
})
# Verify we received the expected number of messages
self.assertEqual(len(received_messages), len(test_messages))
if with_attributes:
# Verify message content and attributes
received_data = [msg.data for msg in received_messages]
expected_data = [msg.data for msg in test_messages]
self.assertEqual(sorted(received_data), sorted(expected_data))
else:
# Verify message data only
expected_data = [msg.data for msg in test_messages]
self.assertEqual(sorted(received_messages), sorted(expected_data))
@pytest.mark.it_postcommit
def test_batch_write_data_only(self):
"""Test WriteToPubSub in batch mode with data only."""
self._test_batch_write(with_attributes=False)
@pytest.mark.it_postcommit
def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()