| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import io |
| import json |
| import logging |
| import unittest |
| |
| import fastavro |
| import mock |
| |
| import apache_beam as beam |
| from apache_beam.coders.row_coder import RowCoder |
| from apache_beam.io.gcp.pubsub import PubsubMessage |
| from apache_beam.testing.util import AssertThat |
| from apache_beam.testing.util import assert_that |
| from apache_beam.testing.util import equal_to |
| from apache_beam.typehints import schemas as schema_utils |
| from apache_beam.yaml.yaml_transform import YamlTransform |
| |
| |
| class FakeReadFromPubSub: |
| def __init__( |
| self, |
| topic, |
| messages, |
| subscription=None, |
| id_attribute=None, |
| timestamp_attribute=None): |
| self._topic = topic |
| self._subscription = subscription |
| self._messages = messages |
| self._id_attribute = id_attribute |
| self._timestamp_attribute = timestamp_attribute |
| |
| def __call__( |
| self, |
| *, |
| topic, |
| subscription, |
| with_attributes, |
| id_label, |
| timestamp_attribute): |
| assert topic == self._topic |
| assert id_label == self._id_attribute |
| assert timestamp_attribute == self._timestamp_attribute |
| assert subscription == self._subscription |
| if with_attributes: |
| data = self._messages |
| else: |
| data = [x.data for x in self._messages] |
| return beam.Create(data) |
| |
| |
| class FakeWriteToPubSub: |
| def __init__( |
| self, topic, messages, id_attribute=None, timestamp_attribute=None): |
| self._topic = topic |
| self._messages = messages |
| self._id_attribute = id_attribute |
| self._timestamp_attribute = timestamp_attribute |
| |
| def __call__(self, topic, *, with_attributes, id_label, timestamp_attribute): |
| assert topic == self._topic |
| assert with_attributes is True |
| assert id_label == self._id_attribute |
| assert timestamp_attribute == self._timestamp_attribute |
| return AssertThat(equal_to(self._messages)) |
| |
| |
| class YamlPubSubTest(unittest.TestCase): |
| def test_simple_read(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {'attr': 'value1'}), |
| PubsubMessage(b'msg2', |
| {'attr': 'value2'})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| ''') |
| assert_that( |
| result, |
| equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')])) |
| |
| def test_simple_read_string(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage('äter'.encode('utf-8'), |
| {'attr': 'value1'}), |
| PubsubMessage('köttbullar'.encode('utf-8'), |
| {'attr': 'value2'})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: STRING |
| ''') |
| assert_that( |
| result, |
| equal_to([beam.Row(payload='äter'), |
| beam.Row(payload='köttbullar')])) |
| |
| def test_read_with_attribute(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {'attr': 'value1'}), |
| PubsubMessage(b'msg2', |
| {'attr': 'value2'})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| attributes: [attr] |
| ''') |
| assert_that( |
| result, |
| equal_to([ |
| beam.Row(payload=b'msg1', attr='value1'), |
| beam.Row(payload=b'msg2', attr='value2') |
| ])) |
| |
| def test_read_with_attribute_map(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {'attr': 'value1'}), |
| PubsubMessage(b'msg2', |
| {'attr': 'value2'})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| attributes_map: attrMap |
| ''') |
| assert_that( |
| result, |
| equal_to([ |
| beam.Row(payload=b'msg1', attrMap={'attr': 'value1'}), |
| beam.Row(payload=b'msg2', attrMap={'attr': 'value2'}) |
| ])) |
| |
| def test_read_with_id_attribute(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {'attr': 'value1'}), |
| PubsubMessage(b'msg2', {'attr': 'value2'})], |
| id_attribute='some_attr')): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| id_attribute: some_attr |
| ''') |
| assert_that( |
| result, |
| equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')])) |
| |
| _avro_schema = { |
| 'type': 'record', |
| 'name': 'ec', |
| 'fields': [{ |
| 'name': 'label', 'type': 'string' |
| }, { |
| 'name': 'rank', 'type': 'int' |
| }] |
| } |
| |
| def _encode_avro(self, data): |
| buffer = io.BytesIO() |
| fastavro.schemaless_writer(buffer, self._avro_schema, data) |
| buffer.seek(0) |
| return buffer.read() |
| |
| def test_read_avro(self): |
| |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch( |
| 'apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(self._encode_avro({'label': '37a', |
| 'rank': 1}), {}), |
| PubsubMessage(self._encode_avro({'label': '389a', |
| 'rank': 2}), {})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: AVRO |
| schema: %s |
| ''' % json.dumps(self._avro_schema)) |
| assert_that( |
| result, |
| equal_to([ |
| beam.Row(label='37a', rank=1), # linebreak |
| beam.Row(label='389a', rank=2) |
| ])) |
| |
| def test_read_json(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage( |
| b'{"generator": {"x": 0, "y": 0}, "rank": 1}', |
| {'weierstrass': 'y^2+y=x^3-x', 'label': '37a'}) |
| ])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: JSON |
| schema: |
| type: object |
| properties: |
| generator: |
| type: object |
| properties: |
| x: {type: integer} |
| y: {type: integer} |
| rank: {type: integer} |
| attributes: [label] |
| attributes_map: other |
| ''') |
| assert_that( |
| result, |
| equal_to([ |
| beam.Row( |
| generator=beam.Row(x=0, y=0), |
| rank=1, |
| label='37a', |
| other={ |
| 'label': '37a', 'weierstrass': 'y^2+y=x^3-x' |
| }) |
| ])) |
| |
| def test_read_json_with_error_handling(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch( |
| 'apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub(topic='my_topic', |
| messages=[PubsubMessage('{"some_int": 123}', |
| attributes={}), |
| PubsubMessage('unparsable', |
| attributes={})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: JSON |
| schema: |
| type: object |
| properties: |
| some_int: {type: integer} |
| error_handling: |
| output: errors |
| ''') |
| assert_that( |
| result['good'], |
| equal_to([beam.Row(some_int=123)]), |
| label='CheckGood') |
| assert_that( |
| result['errors'] | beam.Map(lambda error: error.element), |
| equal_to(['unparsable']), |
| label='CheckErrors') |
| |
| def test_read_json_without_error_handling(self): |
| with self.assertRaises(Exception): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch( |
| 'apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub(topic='my_topic', |
| messages=[PubsubMessage('{"some_int": 123}', |
| attributes={}), |
| PubsubMessage('unparsable', |
| attributes={})])): |
| _ = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: JSON |
| schema: |
| type: object |
| properties: |
| some_int: {type: integer} |
| ''') |
| |
| def test_read_json_with_bad_schema(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage('{"some_int": 123}', |
| attributes={}), |
| PubsubMessage('{"some_int": "NOT"}', |
| attributes={})])): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: JSON |
| schema: |
| type: object |
| properties: |
| some_int: {type: integer} |
| error_handling: |
| output: errors |
| ''') |
| assert_that( |
| result['good'], |
| equal_to([beam.Row(some_int=123)]), |
| label='CheckGood') |
| assert_that( |
| result['errors'] | beam.Map(lambda error: error.element), |
| equal_to(['{"some_int": "NOT"}']), |
| label='CheckErrors') |
| |
| def test_simple_write(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub(topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {}), |
| PubsubMessage(b'msg2', {})])): |
| _ = ( |
| p | beam.Create([beam.Row(a=b'msg1'), beam.Row(a=b'msg2')]) |
| | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| ''')) |
| |
| def test_write_with_attribute(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {'attr': 'value1'}), |
| PubsubMessage(b'msg2', |
| {'attr': 'value2'})])): |
| _ = ( |
| p | beam.Create([ |
| beam.Row(a=b'msg1', attr='value1'), |
| beam.Row(a=b'msg2', attr='value2') |
| ]) | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| attributes: [attr] |
| ''')) |
| |
| def test_write_with_attribute_map(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub(topic='my_topic', |
| messages=[PubsubMessage(b'msg1', |
| {'a': 'b'}), |
| PubsubMessage(b'msg2', |
| {'c': 'd'})])): |
| _ = ( |
| p | beam.Create([ |
| beam.Row(a=b'msg1', attrMap={'a': 'b'}), |
| beam.Row(a=b'msg2', attrMap={'c': 'd'}) |
| ]) | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| attributes_map: attrMap |
| ''')) |
| |
| def test_write_with_id_attribute(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub(topic='my_topic', |
| messages=[PubsubMessage(b'msg1', {}), |
| PubsubMessage(b'msg2', {})], |
| id_attribute='some_attr')): |
| _ = ( |
| p | beam.Create([beam.Row(a=b'msg1'), beam.Row(a=b'msg2')]) |
| | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: RAW |
| id_attribute: some_attr |
| ''')) |
| |
| def test_write_avro(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch( |
| 'apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage(self._encode_avro({'label': '37a', |
| 'rank': 1}), {}), |
| PubsubMessage(self._encode_avro({'label': '389a', |
| 'rank': 2}), {})])): |
| _ = ( |
| p | beam.Create( |
| [beam.Row(label='37a', rank=1), beam.Row(label='389a', rank=2)]) |
| | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: AVRO |
| ''')) |
| |
| def test_write_json(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub( |
| topic='my_topic', |
| messages=[PubsubMessage( |
| b'{"generator": {"x": 0, "y": 0}, "rank": 1}', |
| {'weierstrass': 'y^2+y=x^3-x', 'label': '37a'}) |
| ])): |
| _ = ( |
| p | beam.Create([ |
| beam.Row( |
| label='37a', |
| generator=beam.Row(x=0, y=0), |
| rank=1, |
| other={'weierstrass': 'y^2+y=x^3-x'}) |
| ]) | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: JSON |
| attributes: [label] |
| attributes_map: other |
| ''')) |
| |
| def test_write_proto(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| data = [beam.Row(label='37a', rank=1), beam.Row(label='389a', rank=2)] |
| coder = RowCoder( |
| schema_utils.named_fields_to_schema([('label', str), ('rank', int)])) |
| expected_messages = [PubsubMessage(coder.encode(r), {}) for r in data] |
| with mock.patch('apache_beam.io.WriteToPubSub', |
| FakeWriteToPubSub(topic='my_topic', |
| messages=expected_messages)): |
| _ = ( |
| p | beam.Create(data) | YamlTransform( |
| ''' |
| type: WriteToPubSub |
| config: |
| topic: my_topic |
| format: PROTO |
| ''')) |
| |
| def test_read_proto(self): |
| with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( |
| pickle_library='cloudpickle')) as p: |
| data = [beam.Row(label='37a', rank=1), beam.Row(label='389a', rank=2)] |
| coder = RowCoder( |
| schema_utils.named_fields_to_schema([('label', str), ('rank', int)])) |
| expected_messages = [PubsubMessage(coder.encode(r), {}) for r in data] |
| with mock.patch('apache_beam.io.ReadFromPubSub', |
| FakeReadFromPubSub(topic='my_topic', |
| messages=expected_messages)): |
| result = p | YamlTransform( |
| ''' |
| type: ReadFromPubSub |
| config: |
| topic: my_topic |
| format: PROTO |
| schema: |
| type: object |
| properties: |
| label: {type: string} |
| rank: {type: integer} |
| ''') |
| assert_that(result, equal_to(data)) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |