|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | # contributor license agreements.  See the NOTICE file distributed with | 
|  | # this work for additional information regarding copyright ownership. | 
|  | # The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | # (the "License"); you may not use this file except in compliance with | 
|  | # the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | # | 
|  |  | 
|  | """Unit tests for the cloudpickle_pickler module.""" | 
|  |  | 
|  | # pytype: skip-file | 
|  |  | 
|  | import threading | 
|  | import types | 
|  | import unittest | 
|  |  | 
|  | from apache_beam.coders import proto2_coder_test_messages_pb2 | 
|  | from apache_beam.internal import module_test | 
|  | from apache_beam.internal.cloudpickle_pickler import dumps | 
|  | from apache_beam.internal.cloudpickle_pickler import loads | 
|  | from apache_beam.utils import shared | 
|  |  | 
|  | GLOBAL_DICT_REF = module_test.GLOBAL_DICT | 
|  |  | 
|  |  | 
|  | # Allow weakref to dict | 
|  | class DictWrapper(dict): | 
|  | pass | 
|  |  | 
|  |  | 
|  | MAIN_MODULE_DICT = DictWrapper() | 
|  |  | 
|  |  | 
|  | def acquire_dict(): | 
|  | return DictWrapper() | 
|  |  | 
|  |  | 
|  | class PicklerTest(unittest.TestCase): | 
|  |  | 
|  | NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") | 
|  |  | 
|  | def test_globals_main_are_pickled_by_value(self): | 
|  | self.assertIsNot(MAIN_MODULE_DICT, loads(dumps(lambda: MAIN_MODULE_DICT))()) | 
|  |  | 
|  | def test_globals_shared_are_pickled_by_reference(self): | 
|  | shared_handler = shared.Shared() | 
|  | original_dict = shared_handler.acquire(acquire_dict) | 
|  |  | 
|  | unpickled_dict = loads( | 
|  | dumps(lambda: shared_handler.acquire(acquire_dict)))() | 
|  |  | 
|  | self.assertIs(original_dict, unpickled_dict) | 
|  |  | 
|  | def test_module_globals_are_pickled_by_value_when_directly_referenced(self): | 
|  | global_dict = loads(dumps(module_test.GLOBAL_DICT)) | 
|  |  | 
|  | self.assertIsNot(module_test.GLOBAL_DICT, global_dict) | 
|  |  | 
|  | def test_function_main_with_explicit_module_reference_pickles_by_reference( | 
|  | self): | 
|  | def returns_global_dict(): | 
|  | return module_test.GLOBAL_DICT | 
|  |  | 
|  | self.assertIs(module_test.GLOBAL_DICT, loads(dumps(returns_global_dict))()) | 
|  |  | 
|  | def test_function_main_with_indirect_module_reference_pickles_by_value(self): | 
|  | def returns_global_dict(): | 
|  | return GLOBAL_DICT_REF | 
|  |  | 
|  | self.assertIsNot( | 
|  | module_test.GLOBAL_DICT, loads(dumps(returns_global_dict))()) | 
|  |  | 
|  | def test_function_referencing_unpicklable_object_works_when_imported(self): | 
|  | self.assertEqual( | 
|  | module_test.UNPICKLABLE_INSTANCE, | 
|  | loads(dumps(module_test.fn_returns_unpicklable))()) | 
|  |  | 
|  | def test_closure_with_unpicklable_object_fails_when_imported(self): | 
|  | # The entire closure is pickled by value, and therefore module_test is | 
|  | # not imported. Requires the module global to be pickled by value. | 
|  | with self.assertRaises(Exception): | 
|  | loads(dumps(module_test.closure_contains_unpicklable())) | 
|  |  | 
|  | def test_closure_with_explicit_self_import_can_reference_unpicklable_objects( | 
|  | self): | 
|  | # The closure imports module_test within the function definition | 
|  | # and returns self.UNPICKLABLE_INSTANCE. This allows cloudpickle | 
|  | # to use submimort to reference module_test.UNPICKLABLE_INSTANCE | 
|  | self.assertIs( | 
|  | module_test.UNPICKLABLE_INSTANCE, | 
|  | loads(dumps(module_test.closure_contains_unpicklable_imports_self()))()) | 
|  |  | 
|  | def test_closure_main_can_reference_unpicklable_module_objects(self): | 
|  | def outer(): | 
|  | def inner(): | 
|  | return module_test.UNPICKLABLE_INSTANCE | 
|  |  | 
|  | return inner | 
|  |  | 
|  | # Uses subimport to reference module_test.UNPICKLABLE_INSTANCE rather than | 
|  | # recreate. | 
|  | self.assertIs(module_test.UNPICKLABLE_INSTANCE, loads(dumps(outer()))()) | 
|  |  | 
|  | def test_pickle_nested_enum_descriptor(self): | 
|  | NestedEnum = proto2_coder_test_messages_pb2.MessageD.NestedEnum | 
|  |  | 
|  | def fn(): | 
|  | return NestedEnum.TWO | 
|  |  | 
|  | self.assertEqual(fn(), loads(dumps(fn))()) | 
|  |  | 
|  | def test_pickle_top_level_enum_descriptor(self): | 
|  | TopLevelEnum = proto2_coder_test_messages_pb2.TopLevelEnum | 
|  |  | 
|  | def fn(): | 
|  | return TopLevelEnum.ONE | 
|  |  | 
|  | self.assertEqual(fn(), loads(dumps(fn))()) | 
|  |  | 
|  | def test_basics(self): | 
|  | self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )]))) | 
|  | fun = lambda x: 'xyz-%s' % x | 
|  | self.assertEqual('xyz-abc', loads(dumps(fun))('abc')) | 
|  |  | 
|  | def test_lambda_with_globals(self): | 
|  | """Tests that the globals of a function are preserved.""" | 
|  |  | 
|  | # The point of the test is that the lambda being called after unpickling | 
|  | # relies on having the re module being loaded. | 
|  | self.assertEqual(['abc', 'def'], | 
|  | loads(dumps( | 
|  | module_test.get_lambda_with_globals()))('abc def')) | 
|  |  | 
|  | def test_lambda_with_main_globals(self): | 
|  | self.assertEqual(unittest, loads(dumps(lambda: unittest))()) | 
|  |  | 
|  | def test_lambda_with_closure(self): | 
|  | """Tests that the closure of a function is preserved.""" | 
|  | self.assertEqual( | 
|  | 'closure: abc', | 
|  | loads(dumps(module_test.get_lambda_with_closure('abc')))()) | 
|  |  | 
|  | def test_class_object_pickled(self): | 
|  | self.assertEqual(['abc', 'def'], | 
|  | loads(dumps(module_test.Xyz))().foo('abc def')) | 
|  |  | 
|  | def test_class_instance_pickled(self): | 
|  | self.assertEqual(['abc', 'def'], | 
|  | loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) | 
|  |  | 
|  | def test_pickling_preserves_closure_of_a_function(self): | 
|  | self.assertEqual( | 
|  | 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) | 
|  | self.assertEqual( | 
|  | 'Y:abc', | 
|  | loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) | 
|  |  | 
|  | def test_pickle_dynamic_class(self): | 
|  | self.assertEqual( | 
|  | 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) | 
|  |  | 
|  | def test_generators(self): | 
|  | with self.assertRaises(TypeError): | 
|  | dumps((_ for _ in range(10))) | 
|  |  | 
|  | def test_recursive_class(self): | 
|  | self.assertEqual( | 
|  | 'RecursiveClass:abc', | 
|  | loads(dumps(module_test.RecursiveClass('abc').datum))) | 
|  |  | 
|  | def test_function_with_external_reference(self): | 
|  | out_of_scope_var = 'expected_value' | 
|  |  | 
|  | def foo(): | 
|  | return out_of_scope_var | 
|  |  | 
|  | self.assertEqual('expected_value', loads(dumps(foo))()) | 
|  |  | 
|  | def test_pickle_rlock(self): | 
|  | rlock_instance = threading.RLock() | 
|  | rlock_type = type(rlock_instance) | 
|  |  | 
|  | self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type) | 
|  |  | 
|  | def test_pickle_lock(self): | 
|  | lock_instance = threading.Lock() | 
|  | lock_type = type(lock_instance) | 
|  |  | 
|  | self.assertIsInstance(loads(dumps(lock_instance)), lock_type) | 
|  |  | 
|  | @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') | 
|  | def test_dump_and_load_mapping_proxy(self): | 
|  | self.assertEqual( | 
|  | 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) | 
|  | self.assertEqual( | 
|  | types.MappingProxyType, type(loads(dumps(types.MappingProxyType({}))))) | 
|  |  | 
|  | # pylint: disable=exec-used | 
|  | def test_dataclass(self): | 
|  | exec( | 
|  | ''' | 
|  | from apache_beam.internal.module_test import DataClass | 
|  | self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc')))) | 
|  | ''') | 
|  |  | 
|  | def test_best_effort_determinism_not_implemented(self): | 
|  | with self.assertLogs('apache_beam.internal.cloudpickle_pickler', | 
|  | "WARNING") as l: | 
|  | dumps(123, enable_best_effort_determinism=True) | 
|  | self.assertIn( | 
|  | 'Ignoring unsupported option: enable_best_effort_determinism', | 
|  | '\n'.join(l.output)) | 
|  |  | 
|  |  | 
|  | if __name__ == '__main__': | 
|  | unittest.main() |