blob: 0556945d3013cd6e92c626d67b046e7c3921b01c [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
import os
import signal
import sys
import threading
import traceback
from contextlib import contextmanager, ExitStack
from collections import deque
from typing import Callable, Deque
# Global per process state for handling of sigterm/sigtstp/sigcont,
# note that it is expected that this only ever be used by new processes
# the scheduler starts, not the main process.
#
terminator_stack: Deque[Callable] = deque()
suspendable_stack: Deque[Callable] = deque()
terminator_lock = threading.Lock()
suspendable_lock = threading.Lock()
# This event is used to block all the threads while we wait for user
# interaction. This is because we can't stop all the pythin threads but the
# one easily when waiting for user input. However, most performance intensive
# tasks will pass through a subprocess or a multiprocess.Process and all of
# those are guarded by the signal handling. Thus, by setting and unsetting this
# event in the scheduler, we can enable and disable the launching of processes
# and ensure we don't do anything resource intensive while being interrupted.
is_not_suspended = threading.Event()
is_not_suspended.set()
class TerminateException(BaseException):
pass
# Per process SIGTERM handler
def terminator_handler(signal_, frame):
exit_code = -1
while terminator_stack:
terminator_ = terminator_stack.pop()
try:
terminator_()
except SystemExit as e:
exit_code = e.code or 0
except: # noqa pylint: disable=bare-except
# Ensure we print something if there's an exception raised when
# processing the handlers. Note that the default exception
# handler won't be called because we os._exit next, so we must
# catch all possible exceptions with the unqualified 'except'
# clause.
traceback.print_exc(file=sys.stderr)
print(
"Error encountered in BuildStream while processing custom SIGTERM handler:",
terminator_,
file=sys.stderr,
)
# Use special exit here, terminate immediately, recommended
# for precisely this situation where child processes are teminated.
os._exit(exit_code)
# terminator()
#
# A context manager for interruptable tasks, this guarantees
# that while the code block is running, the supplied function
# will be called upon process termination.
#
# /!\ The callbacks passed must only contain code that does not acces thread
# local variables. Those will run in the main thread.
#
# Note that after handlers are called, the termination will be handled by
# terminating immediately with os._exit(). This means that SystemExit will not
# be raised and 'finally' clauses will not be executed.
#
# Args:
# terminate_func (callable): A function to call when aborting
# the nested code block.
#
@contextmanager
def terminator(terminate_func):
global terminator_stack # pylint: disable=global-statement,global-variable-not-assigned
outermost = bool(not terminator_stack)
assert threading.current_thread() == threading.main_thread() or not outermost
with terminator_lock:
terminator_stack.append(terminate_func)
if outermost:
original_handler = signal.signal(signal.SIGTERM, terminator_handler)
try:
yield
except TerminateException:
terminate_func()
raise
finally:
if outermost:
signal.signal(signal.SIGTERM, original_handler)
with terminator_lock:
terminator_stack.remove(terminate_func)
# Just a simple object for holding on to two callbacks
class Suspender:
def __init__(self, suspend_callback, resume_callback):
self.suspend = suspend_callback
self.resume = resume_callback
# Per process SIGTSTP handler
def suspend_handler(sig, frame):
is_not_suspended.clear()
# Suspend callbacks from innermost frame first
with suspendable_lock:
for suspender in reversed(suspendable_stack):
suspender.suspend()
# Use SIGSTOP directly now on self, dont introduce more SIGTSTP
#
# Here the process sleeps until SIGCONT, which we simply
# dont handle. We know we'll pickup execution right here
# when we wake up.
os.kill(os.getpid(), signal.SIGSTOP)
# Resume callbacks from outermost frame inwards
for suspender in suspendable_stack:
suspender.resume()
is_not_suspended.set()
# suspendable()
#
# A context manager for handling process suspending and resumeing
#
# Args:
# suspend_callback (callable): A function to call as process suspend time.
# resume_callback (callable): A function to call as process resume time.
#
# /!\ The callbacks passed must only contain code that does not acces thread
# local variables. Those will run in the main thread.
#
# This must be used in code blocks which start processes that become
# their own session leader. In these cases, SIGSTOP and SIGCONT need
# to be propagated to the child process group.
#
# This context manager can also be used recursively, so multiple
# things can happen at suspend/resume time (such as tracking timers
# and ensuring durations do not count suspended time).
#
@contextmanager
def suspendable(suspend_callback, resume_callback):
global suspendable_stack # pylint: disable=global-statement,global-variable-not-assigned
outermost = bool(not suspendable_stack)
assert threading.current_thread() == threading.main_thread() or not outermost
# If we are not in the main thread, ensure that we are not suspended
# before running.
# If we are in the main thread, never block on this, to ensure we
# don't deadlock.
if threading.current_thread() != threading.main_thread():
is_not_suspended.wait()
suspender = Suspender(suspend_callback, resume_callback)
with suspendable_lock:
suspendable_stack.append(suspender)
if outermost:
original_stop = signal.signal(signal.SIGTSTP, suspend_handler)
try:
yield
finally:
if outermost:
signal.signal(signal.SIGTSTP, original_stop)
with suspendable_lock:
suspendable_stack.remove(suspender)
# blocked()
#
# A context manager for running a code block with blocked signals
#
# Args:
# signals (list): A list of unix signals to block
# ignore (bool): Whether to ignore entirely the signals which were
# received and pending while the process had blocked them
#
@contextmanager
def blocked(signal_list, ignore=True):
with ExitStack() as stack:
# Optionally add the ignored() context manager to this context
if ignore:
stack.enter_context(ignored(signal_list))
# Set and save the sigprocmask
blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, signal_list)
try:
yield
finally:
# If we have discarded the signals completely, this line will cause
# the discard_handler() to trigger for each signal in the list
signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals)
# ignored()
#
# A context manager for running a code block with ignored signals
#
# Args:
# signals (list): A list of unix signals to ignore
#
@contextmanager
def ignored(signal_list):
orig_handlers = {}
for sig in signal_list:
orig_handlers[sig] = signal.signal(sig, signal.SIG_IGN)
try:
yield
finally:
for sig in signal_list:
signal.signal(sig, orig_handlers[sig])