| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from cpython.exc cimport PyErr_CheckSignals, PyErr_SetInterrupt |
| |
| from pyarrow.includes.libarrow cimport CStatus, IsPyError, RestorePyError |
| from pyarrow.includes.common cimport c_string |
| |
| from contextlib import contextmanager |
| import os |
| import signal |
| import threading |
| |
| |
| class ArrowException(Exception): |
| pass |
| |
| |
| class ArrowInvalid(ValueError, ArrowException): |
| pass |
| |
| |
| class ArrowMemoryError(MemoryError, ArrowException): |
| pass |
| |
| |
| class ArrowKeyError(KeyError, ArrowException): |
| def __str__(self): |
| # Override KeyError.__str__, as it uses the repr() of the key |
| return ArrowException.__str__(self) |
| |
| |
| class ArrowTypeError(TypeError, ArrowException): |
| pass |
| |
| |
| class ArrowNotImplementedError(NotImplementedError, ArrowException): |
| pass |
| |
| |
| class ArrowCapacityError(ArrowException): |
| pass |
| |
| |
| class ArrowIndexError(IndexError, ArrowException): |
| pass |
| |
| |
| class ArrowSerializationError(ArrowException): |
| pass |
| |
| |
| class ArrowCancelled(ArrowException): |
| def __init__(self, message, signum=None): |
| super().__init__(message) |
| self.signum = signum |
| |
| |
| # Compatibility alias |
| ArrowIOError = IOError |
| |
| |
| # This function could be written directly in C++ if we didn't |
| # define Arrow-specific subclasses (ArrowInvalid etc.) |
| cdef int check_status(const CStatus& status) nogil except -1: |
| if status.ok(): |
| return 0 |
| |
| with gil: |
| if IsPyError(status): |
| RestorePyError(status) |
| return -1 |
| |
| # We don't use Status::ToString() as it would redundantly include |
| # the C++ class name. |
| message = frombytes(status.message(), safe=True) |
| detail = status.detail() |
| if detail != nullptr: |
| message += ". Detail: " + frombytes(detail.get().ToString(), |
| safe=True) |
| |
| if status.IsInvalid(): |
| raise ArrowInvalid(message) |
| elif status.IsIOError(): |
| # Note: OSError constructor is |
| # OSError(message) |
| # or |
| # OSError(errno, message, filename=None) |
| # or (on Windows) |
| # OSError(errno, message, filename, winerror) |
| errno = ErrnoFromStatus(status) |
| winerror = WinErrorFromStatus(status) |
| if winerror != 0: |
| raise IOError(errno, message, None, winerror) |
| elif errno != 0: |
| raise IOError(errno, message) |
| else: |
| raise IOError(message) |
| elif status.IsOutOfMemory(): |
| raise ArrowMemoryError(message) |
| elif status.IsKeyError(): |
| raise ArrowKeyError(message) |
| elif status.IsNotImplemented(): |
| raise ArrowNotImplementedError(message) |
| elif status.IsTypeError(): |
| raise ArrowTypeError(message) |
| elif status.IsCapacityError(): |
| raise ArrowCapacityError(message) |
| elif status.IsIndexError(): |
| raise ArrowIndexError(message) |
| elif status.IsSerializationError(): |
| raise ArrowSerializationError(message) |
| elif status.IsCancelled(): |
| signum = SignalFromStatus(status) |
| if signum > 0: |
| raise ArrowCancelled(message, signum) |
| else: |
| raise ArrowCancelled(message) |
| else: |
| message = frombytes(status.ToString(), safe=True) |
| raise ArrowException(message) |
| |
| |
| # This is an API function for C++ PyArrow |
| cdef api int pyarrow_internal_check_status(const CStatus& status) \ |
| nogil except -1: |
| return check_status(status) |
| |
| |
| cdef class StopToken: |
| cdef void init(self, CStopToken stop_token): |
| self.stop_token = move(stop_token) |
| |
| |
| cdef c_bool signal_handlers_enabled = True |
| |
| |
| def enable_signal_handlers(c_bool enable): |
| """ |
| Enable or disable interruption of long-running operations. |
| |
| By default, certain long running operations will detect user |
| interruptions, such as by pressing Ctrl-C. This detection relies |
| on setting a signal handler for the duration of the long-running |
| operation, and may therefore interfere with other frameworks or |
| libraries (such as an event loop). |
| |
| Parameters |
| ---------- |
| enable: bool |
| Whether to enable user interruption by setting a temporary |
| signal handler. |
| """ |
| global signal_handlers_enabled |
| signal_handlers_enabled = enable |
| |
| |
| # For internal use |
| |
| cdef class SignalStopHandler: |
| cdef: |
| StopToken _stop_token |
| vector[int] _signals |
| c_bool _enabled |
| |
| def __cinit__(self): |
| self._enabled = False |
| |
| tid = threading.current_thread().ident |
| if (signal_handlers_enabled and |
| threading.current_thread() is threading.main_thread()): |
| self._signals = [ |
| sig for sig in (signal.SIGINT, signal.SIGTERM) |
| if signal.getsignal(sig) not in (signal.SIG_DFL, |
| signal.SIG_IGN, None)] |
| |
| if not self._signals.empty(): |
| self._stop_token = StopToken() |
| self._stop_token.init(GetResultValue( |
| SetSignalStopSource()).token()) |
| self._enabled = True |
| |
| def __enter__(self): |
| if self._enabled: |
| check_status(RegisterCancellingSignalHandler(self._signals)) |
| return self |
| |
| def __exit__(self, exc_type, exc_value, exc_tb): |
| if self._enabled: |
| UnregisterCancellingSignalHandler() |
| if isinstance(exc_value, ArrowCancelled): |
| if exc_value.signum: |
| # Re-emit the exact same signal. We restored the Python signal |
| # handler above, so it should receive it. |
| if os.name == 'nt': |
| SendSignal(exc_value.signum) |
| else: |
| SendSignalToThread(exc_value.signum, threading.get_ident()) |
| else: |
| # Simulate Python receiving a SIGINT |
| # (see https://bugs.python.org/issue43356 for why we can't |
| # simulate the exact signal number) |
| PyErr_SetInterrupt() |
| # Maximize chances of the Python signal handler being executed now. |
| # Otherwise a potential KeyboardInterrupt might be missed by an |
| # immediately enclosing try/except block. |
| PyErr_CheckSignals() |
| # ArrowCancelled will be re-raised if PyErr_CheckSignals() |
| # returned successfully. |
| |
| def __dealloc__(self): |
| if self._enabled: |
| ResetSignalStopSource() |
| |
| @property |
| def stop_token(self): |
| return self._stop_token |