blob: f61a30747a97f0056126270e30b496722c63fdda [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import unittest
import yaml
from apache_beam.yaml.yaml_utils import SafeLineLoader
class SafeLineLoaderTest(unittest.TestCase):
def test_get_line(self):
pipeline_yaml = '''
type: composite
input:
elements: input
transforms:
- type: PyMap
name: Square
input: elements
config:
fn: "lambda x: x * x"
- type: PyMap
name: Cube
input: elements
config:
fn: "lambda x: x * x * x"
output:
Flatten
'''
spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 11)
self.assertEqual(SafeLineLoader.get_line(spec['output']), 17)
self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
def test_strip_metadata(self):
spec_yaml = '''
transforms:
- type: PyMap
name: Square
'''
spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
stripped = SafeLineLoader.strip_metadata(spec['transforms'])
self.assertFalse(hasattr(stripped[0], '__line__'))
self.assertFalse(hasattr(stripped[0], '__uuid__'))
def test_strip_metadata_nothing_to_strip(self):
spec_yaml = 'prop: 123'
spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
stripped = SafeLineLoader.strip_metadata(spec['prop'])
self.assertFalse(hasattr(stripped, '__line__'))
self.assertFalse(hasattr(stripped, '__uuid__'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()