| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Implements a shared object that spans processes. |
| |
| This object will be instanciated once per VM and methods will be invoked |
| on it via rpc. |
| """ |
| # pytype: skip-file |
| |
| import atexit |
| import logging |
| import multiprocessing.managers |
| import os |
| import tempfile |
| import threading |
| import time |
| import traceback |
| from typing import Any |
| from typing import Callable |
| from typing import Dict |
| from typing import Generic |
| from typing import Optional |
| from typing import TypeVar |
| |
| import fasteners |
| |
| # In some python versions, there is a bug where AutoProxy doesn't handle |
| # the kwarg 'manager_owned'. We implement our own backup here to make sure |
| # we avoid this problem. More info here: |
| # https://stackoverflow.com/questions/46779860/multiprocessing-managers-and-custom-classes |
| autoproxy = multiprocessing.managers.AutoProxy # type: ignore[attr-defined] |
| |
| |
| def patched_autoproxy( |
| token, |
| serializer, |
| manager=None, |
| authkey=None, |
| exposed=None, |
| incref=True, |
| manager_owned=True): |
| return autoproxy(token, serializer, manager, authkey, exposed, incref) |
| |
| |
| multiprocessing.managers.AutoProxy = patched_autoproxy # type: ignore[attr-defined] |
| |
| T = TypeVar('T') |
| AUTH_KEY = b'mps' |
| |
| |
| class _SingletonProxy: |
| """Proxies the shared object so we can release it with better errors and no |
| risk of dangling references in the multiprocessing manager infrastructure. |
| """ |
| def __init__(self, entry): |
| # Guard names so as to not conflict with names of underlying object. |
| self._SingletonProxy_entry = entry |
| self._SingletonProxy_valid = True |
| |
| # Used to make the shared object callable (see _AutoProxyWrapper below) |
| def singletonProxy_call__(self, *args, **kwargs): |
| if not self._SingletonProxy_valid: |
| raise RuntimeError('Entry was released.') |
| return self._SingletonProxy_entry.obj.__call__(*args, **kwargs) |
| |
| def singletonProxy_release(self): |
| assert self._SingletonProxy_valid |
| self._SingletonProxy_valid = False |
| |
| def singletonProxy_unsafe_hard_delete(self): |
| assert self._SingletonProxy_valid |
| self._SingletonProxy_entry.unsafe_hard_delete() |
| |
| def __getattr__(self, name): |
| if not self._SingletonProxy_valid: |
| raise RuntimeError('Entry was released.') |
| try: |
| return getattr(self._SingletonProxy_entry.obj, name) |
| except AttributeError as e: |
| # Swallow AttributeError exceptions so that they are ignored when |
| # calculating public functions. These can occur if __getattr__ is |
| # overriden, for example to only support some platforms. This will mean |
| # that these functions will be silently unavailable to the |
| # MultiProcessShared object, leading to worse errors when someone tries |
| # to use them, but it will keep them from breaking the whole object for |
| # functions which are unusable anyways. |
| logging.info( |
| 'Attribute %s is unavailable as a public function because ' |
| 'its __getattr__ function raised the following exception ' |
| '%s', |
| name, |
| e) |
| return None |
| |
| def __dir__(self): |
| # Needed for multiprocessing.managers's proxying. |
| dir = self._SingletonProxy_entry.obj.__dir__() |
| dir.append('singletonProxy_call__') |
| dir.append('singletonProxy_release') |
| dir.append('singletonProxy_unsafe_hard_delete') |
| return dir |
| |
| |
| class _SingletonEntry: |
| """Represents a single, refcounted entry in this process.""" |
| def __init__( |
| self, constructor, initialize_eagerly=True, hard_delete_callback=None): |
| self.constructor = constructor |
| self._hard_delete_callback = hard_delete_callback |
| self.refcount = 0 |
| self.lock = threading.Lock() |
| if initialize_eagerly: |
| self.obj = constructor() |
| self.initialied = True |
| else: |
| self.initialied = False |
| |
| def acquire(self): |
| with self.lock: |
| if not self.initialied: |
| self.obj = self.constructor() |
| self.initialied = True |
| self.refcount += 1 |
| return _SingletonProxy(self) |
| |
| def release(self, proxy): |
| proxy.singletonProxy_release() |
| with self.lock: |
| self.refcount -= 1 |
| if self.refcount == 0: |
| del self.obj |
| self.initialied = False |
| |
| def unsafe_hard_delete(self): |
| with self.lock: |
| if self.initialied: |
| del self.obj |
| self.initialied = False |
| if self._hard_delete_callback: |
| self._hard_delete_callback() |
| |
| |
| class _SingletonManager: |
| entries: Dict[Any, Any] = {} |
| |
| def __init__(self): |
| self._hard_delete_callback = None |
| |
| def set_hard_delete_callback(self, callback): |
| self._hard_delete_callback = callback |
| |
| def register_singleton( |
| self, |
| constructor, |
| tag, |
| initialize_eagerly=True, |
| hard_delete_callback=None): |
| assert tag not in self.entries, tag |
| self.entries[tag] = _SingletonEntry( |
| constructor, initialize_eagerly, hard_delete_callback) |
| |
| def has_singleton(self, tag): |
| return tag in self.entries |
| |
| def acquire_singleton(self, tag): |
| return self.entries[tag].acquire() |
| |
| def release_singleton(self, tag, obj): |
| return self.entries[tag].release(obj) |
| |
| def unsafe_hard_delete_singleton(self, tag): |
| self.entries[tag].unsafe_hard_delete() |
| |
| |
| _process_level_singleton_manager = _SingletonManager() |
| |
| _process_local_lock = threading.Lock() |
| |
| |
| class _SingletonRegistrar(multiprocessing.managers.BaseManager): |
| pass |
| |
| |
| _SingletonRegistrar.register( |
| 'acquire_singleton', |
| callable=_process_level_singleton_manager.acquire_singleton) |
| _SingletonRegistrar.register( |
| 'release_singleton', |
| callable=_process_level_singleton_manager.release_singleton) |
| _SingletonRegistrar.register( |
| 'unsafe_hard_delete_singleton', |
| callable=_process_level_singleton_manager.unsafe_hard_delete_singleton) |
| |
| |
| # By default, objects registered with BaseManager.register will have only |
| # public methods available (excluding __call__). If you know the functions |
| # you would like to expose, you can do so at register time with the `exposed` |
| # attribute. Since we don't, we will add a wrapper around the returned AutoProxy |
| # object to handle __call__ function calls and turn them into |
| # singletonProxy_call__ calls (which is a wrapper around the underlying |
| # object's __call__ function) |
| class _AutoProxyWrapper: |
| def __init__(self, proxyObject: multiprocessing.managers.BaseProxy): |
| self._proxyObject = proxyObject |
| |
| def __call__(self, *args, **kwargs): |
| return self._proxyObject.singletonProxy_call__(*args, **kwargs) |
| |
| def __getattr__(self, name): |
| return getattr(self._proxyObject, name) |
| |
| def get_auto_proxy_object(self): |
| return self._proxyObject |
| |
| def unsafe_hard_delete(self): |
| self._proxyObject.unsafe_hard_delete() |
| |
| |
| def _run_server_process(address_file, tag, constructor, authkey, life_line): |
| """ |
| Runs in a separate process. |
| Includes a 'Suicide Pact' monitor: If parent dies, I die. |
| """ |
| parent_pid = os.getppid() |
| |
| def cleanup_files(): |
| logging.info("Server process exiting. Deleting files for %s", tag) |
| try: |
| if os.path.exists(address_file): |
| os.remove(address_file) |
| if os.path.exists(address_file + ".error"): |
| os.remove(address_file + ".error") |
| except Exception as e: |
| logging.warning('Failed to cleanup files for tag %s: %s', tag, e) |
| |
| def handle_unsafe_hard_delete(): |
| cleanup_files() |
| os._exit(0) |
| |
| def _monitor_parent(): |
| """Checks if parent is alive every second.""" |
| while True: |
| try: |
| # This will break if parent dies. |
| life_line.recv_bytes() |
| except (EOFError, OSError, BrokenPipeError): |
| logging.warning( |
| "Process %s detected Parent %s died. Self-destructing.", |
| os.getpid(), |
| parent_pid) |
| cleanup_files() |
| os._exit(0) |
| time.sleep(0.5) |
| |
| atexit.register(cleanup_files) |
| |
| try: |
| t = threading.Thread(target=_monitor_parent, daemon=True) |
| |
| logging.getLogger().setLevel(logging.INFO) |
| multiprocessing.current_process().authkey = authkey |
| |
| serving_manager = _SingletonRegistrar( |
| address=('localhost', 0), authkey=authkey) |
| _process_level_singleton_manager.set_hard_delete_callback( |
| handle_unsafe_hard_delete) |
| _process_level_singleton_manager.register_singleton( |
| constructor, |
| tag, |
| initialize_eagerly=True, |
| hard_delete_callback=handle_unsafe_hard_delete) |
| # Start monitoring parent after initialisation is done to avoid |
| # potential race conditions. |
| t.start() |
| |
| server = serving_manager.get_server() |
| logging.info( |
| 'Process %s: Proxy serving %s at %s', os.getpid(), tag, server.address) |
| |
| with open(address_file + '.tmp', 'w') as fout: |
| fout.write('%s:%d' % server.address) |
| os.rename(address_file + '.tmp', address_file) |
| |
| server.serve_forever() |
| |
| except Exception: |
| tb = traceback.format_exc() |
| try: |
| with open(address_file + ".error.tmp", 'w') as fout: |
| fout.write(tb) |
| os.rename(address_file + ".error.tmp", address_file + ".error") |
| except Exception: |
| logging.error("CRITICAL ERROR IN SHARED SERVER:\n%s", tb) |
| os._exit(1) |
| |
| |
| class MultiProcessShared(Generic[T]): |
| """MultiProcessShared is used to share a single object across processes. |
| |
| For example, one could have the class:: |
| |
| class MyExpensiveObject(object): |
| def __init__(self, args): |
| [expensive initialization and memory allocation] |
| |
| def method(self, arg): |
| ... |
| |
| One could share a single instance of this class by wrapping it as:: |
| |
| shared_ptr = MultiProcessShared(lambda: MyExpensiveObject(...)) |
| my_expensive_object = shared_ptr.acquire() |
| |
| which could then be invoked as:: |
| |
| my_expensive_object.method(arg) |
| |
| This can then be released with:: |
| |
| shared_ptr.release(my_expensive_object) |
| |
| but care should be taken to avoid releasing the object too soon or |
| expensive re-initialization may be required, defeating the point of |
| using a shared object. |
| |
| |
| Args: |
| constructor: function that initialises / constructs the object if not |
| present in the cache. This function should take no arguments. It should |
| return an initialised object, or raise an exception if the object could |
| not be initialised / constructed. |
| tag: an indentifier to store with the cached object. If multiple |
| MultiProcessShared instances are created with the same tag, they will all |
| share the same proxied object. |
| path: a temporary path in which to create the inter-process lock |
| always_proxy: whether to direct all calls through the proxy, rather than |
| call the object directly for the process that created it |
| """ |
| def __init__( |
| self, |
| constructor: Callable[[], T], |
| tag: Any, |
| *, |
| path: str = tempfile.gettempdir(), |
| always_proxy: Optional[bool] = None, |
| spawn_process: bool = False): |
| self._constructor = constructor |
| self._tag = tag |
| self._path = path |
| self._always_proxy = False if always_proxy is None else always_proxy |
| self._proxy = None |
| self._manager = None |
| self._rpc_address = None |
| self._cross_process_lock = fasteners.InterProcessLock( |
| os.path.join(self._path, self._tag) + '.lock') |
| self._spawn_process = spawn_process |
| |
| def _get_manager(self): |
| if self._manager is None: |
| address_file = os.path.join(self._path, self._tag) + ".address" |
| while self._manager is None: |
| with _process_local_lock: |
| with self._cross_process_lock: |
| if not os.path.exists(address_file): |
| self._create_server(address_file) |
| |
| if _process_level_singleton_manager.has_singleton( |
| self._tag) and not self._always_proxy: |
| self._manager = _process_level_singleton_manager |
| else: |
| with open(address_file) as fin: |
| address = fin.read() |
| logging.info('Connecting to remote proxy at %s', address) |
| host, port = address.split(':') |
| # We need to be able to authenticate with both the manager and |
| # the process. |
| manager = _SingletonRegistrar( |
| address=(host, int(port)), authkey=AUTH_KEY) |
| multiprocessing.current_process().authkey = AUTH_KEY |
| try: |
| manager.connect() |
| self._manager = manager |
| except ConnectionError: |
| # The server is no longer good, assume it died. |
| os.unlink(address_file) |
| |
| return self._manager |
| |
| def acquire(self): |
| # TODO: Allow passing/parameterizing the callable here, in case they are |
| # not available at MultiProcessShared construction time (e.g. from side |
| # inputs) |
| # Caveat: They must always agree, as they will be ignored if the object |
| # is already constructed. |
| singleton = self._get_manager().acquire_singleton(self._tag) |
| # Trigger a sweep of zombie processes. |
| # calling active_children() has the side-effect of joining any finished |
| # processes, effectively reaping zombies from previous unsafe_hard_deletes. |
| if self._spawn_process: |
| multiprocessing.active_children() |
| return _AutoProxyWrapper(singleton) |
| |
| def release(self, obj): |
| self._manager.release_singleton(self._tag, obj.get_auto_proxy_object()) |
| |
| def unsafe_hard_delete(self): |
| """Force deletes the underlying object |
| |
| This function should be used with great care since any other references |
| to this object will now be invalid and may lead to strange errors. Only |
| call unsafe_hard_delete if either (a) you are sure no other references |
| to this object exist, or (b) you are ok with all existing references to |
| this object throwing strange errors when derefrenced. |
| """ |
| self._get_manager().unsafe_hard_delete_singleton(self._tag) |
| |
| def _create_server(self, address_file): |
| if self._spawn_process: |
| error_file = address_file + ".error" |
| |
| if os.path.exists(error_file): |
| try: |
| os.remove(error_file) |
| except OSError: |
| pass |
| |
| # Create a pipe to connect with child process |
| # used to clean up child process if parent dies |
| reader, writer = multiprocessing.Pipe(duplex=False) |
| self._life_line = writer |
| |
| ctx = multiprocessing.get_context('spawn') |
| p = ctx.Process( |
| target=_run_server_process, |
| args=(address_file, self._tag, self._constructor, AUTH_KEY, reader), |
| daemon=False # Must be False for nested proxies |
| ) |
| p.start() |
| logging.info("Parent: Waiting for %s to write address file...", self._tag) |
| |
| def cleanup_process(): |
| if self._life_line: |
| self._life_line.close() |
| if p.is_alive(): |
| logging.info( |
| "Parent: Terminating server process %s for %s", p.pid, self._tag) |
| p.terminate() |
| p.join() |
| try: |
| if os.path.exists(address_file): |
| os.remove(address_file) |
| if os.path.exists(error_file): |
| os.remove(error_file) |
| except Exception as e: |
| logging.warning( |
| 'Failed to cleanup files for tag %s in atexit handler: %s', |
| self._tag, |
| e) |
| |
| atexit.register(cleanup_process) |
| |
| start_time = time.time() |
| last_log = start_time |
| while True: |
| if os.path.exists(address_file): |
| break |
| |
| if os.path.exists(error_file): |
| with open(error_file, 'r') as f: |
| error_msg = f.read() |
| try: |
| os.remove(error_file) |
| except OSError: |
| pass |
| |
| if p.is_alive(): p.terminate() |
| raise RuntimeError(f"Shared Server Process crashed:\n{error_msg}") |
| |
| if not p.is_alive(): |
| exit_code = p.exitcode |
| raise RuntimeError( |
| "Shared Server Process died unexpectedly" |
| f" with exit code {exit_code}") |
| |
| if time.time() - last_log > 300: |
| logging.warning( |
| "Still waiting for %s to initialize... %ss elapsed)", |
| self._tag, |
| int(time.time() - start_time)) |
| last_log = time.time() |
| |
| time.sleep(0.05) |
| |
| logging.info('External process successfully started for %s', self._tag) |
| else: |
| # We need to be able to authenticate with both the manager |
| # and the process. |
| self._serving_manager = _SingletonRegistrar( |
| address=('localhost', 0), authkey=AUTH_KEY) |
| multiprocessing.current_process().authkey = AUTH_KEY |
| # Initialize eagerly to avoid acting as the server if there are issues. |
| # Note, however, that _create_server itself is called lazily. |
| _process_level_singleton_manager.register_singleton( |
| self._constructor, self._tag, initialize_eagerly=True) |
| self._server = self._serving_manager.get_server() |
| logging.info( |
| 'Starting proxy server at %s for shared %s', |
| self._server.address, |
| self._tag) |
| with open(address_file + '.tmp', 'w') as fout: |
| fout.write('%s:%d' % self._server.address) |
| os.rename(address_file + '.tmp', address_file) |
| t = threading.Thread(target=self._server.serve_forever, daemon=True) |
| t.start() |
| logging.info('Done starting server') |