blob: 1fbaccded9cd5418773798538dc9b83f97d49e29 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Utilities for running or stopping processes"""
import errno
import logging
import os
import pty
import select
import shlex
import signal
import subprocess
import sys
import termios
import tty
from contextlib import contextmanager
from typing import Dict, List
import psutil
from lockfile.pidlockfile import PIDLockFile
from airflow.configuration import conf
from airflow.exceptions import AirflowException
log = logging.getLogger(__name__)
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint('core', 'KILLED_TASK_CLEANUP_TIME')
def reap_process_group(
pgid: int,
logger,
sig: 'signal.Signals' = signal.SIGTERM,
timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
) -> Dict[int, int]:
"""
Tries really hard to terminate all processes in the group (including grandchildren). Will send
sig (SIGTERM) to the process group of pid. If any process is alive after timeout
a SIGKILL will be send.
:param pgid: process group id to kill
:param logger: log handler
:param sig: signal type
:param timeout: how much time a process has to terminate
"""
returncodes = {}
def on_terminate(p):
logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
returncodes[p.pid] = p.returncode
def signal_procs(sig):
try:
os.killpg(pgid, sig)
except OSError as err:
# If operation not permitted error is thrown due to run_as_user,
# use sudo -n(--non-interactive) to kill the process
if err.errno == errno.EPERM:
subprocess.check_call(
["sudo", "-n", "kill", "-" + str(int(sig))] + [str(p.pid) for p in children]
)
else:
raise
if pgid == os.getpgid(0):
raise RuntimeError("I refuse to kill myself")
try:
parent = psutil.Process(pgid)
children = parent.children(recursive=True)
children.append(parent)
except psutil.NoSuchProcess:
# The process already exited, but maybe it's children haven't.
children = []
for proc in psutil.process_iter():
try:
if os.getpgid(proc.pid) == pgid and proc.pid != 0:
children.append(proc)
except OSError:
pass
logger.info("Sending %s to GPID %s", sig, pgid)
try:
signal_procs(sig)
except OSError as err:
# No such process, which means there is no such process group - our job
# is done
if err.errno == errno.ESRCH:
return returncodes
_, alive = psutil.wait_procs(children, timeout=timeout, callback=on_terminate)
if alive:
for proc in alive:
logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
try:
signal_procs(signal.SIGKILL)
except OSError as err:
if err.errno != errno.ESRCH:
raise
_, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
if alive:
for proc in alive:
logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
return returncodes
def execute_in_subprocess(cmd: List[str]):
"""
Execute a process and stream output to logger
:param cmd: command and arguments to run
:type cmd: List[str]
"""
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True
) as proc:
log.info("Output:")
if proc.stdout:
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
log.info("%s", line.decode().rstrip())
exit_code = proc.wait()
if exit_code != 0:
raise subprocess.CalledProcessError(exit_code, cmd)
def execute_interactive(cmd: List[str], **kwargs):
"""
Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
the process is completed.
"""
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
try: # pylint: disable=too-many-nested-blocks
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
with subprocess.Popen(
cmd, stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, universal_newlines=True, **kwargs
) as proc:
while proc.poll() is None:
readable_fbs, _, _ = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in readable_fbs:
input_data = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, input_data)
if master_fd in readable_fbs:
output_data = os.read(master_fd, 10240)
if output_data:
os.write(sys.stdout.fileno(), output_data)
finally:
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
def kill_child_processes_by_pids(pids_to_kill: List[int], timeout: int = 5) -> None:
"""
Kills child processes for the current process.
First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
the SIGKILL signal, if the process is still alive.
:param pids_to_kill: List of PID to be killed.
:type pids_to_kill: List[int]
:param timeout: The time to wait before sending the SIGKILL signal.
:type timeout: Optional[int]
"""
this_process = psutil.Process(os.getpid())
# Only check child processes to ensure that we don't have a case
# where we kill the wrong process because a child process died
# but the PID got reused.
child_processes = [
x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
]
# First try SIGTERM
for child in child_processes:
log.info("Terminating child PID: %s", child.pid)
child.terminate()
log.info("Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(
child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
)
except psutil.TimeoutExpired:
log.debug("Ran out of time while waiting for processes to exit")
# Then SIGKILL
child_processes = [
x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
]
if child_processes:
log.info("SIGKILL processes that did not terminate gracefully")
for child in child_processes:
log.info("Killing child PID: %s", child.pid)
child.kill()
child.wait()
@contextmanager
def patch_environ(new_env_variables: Dict[str, str]):
"""
Sets environment variables in context. After leaving the context, it restores its original state.
:param new_env_variables: Environment variables to set
"""
current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()}
os.environ.update(new_env_variables)
try: # pylint: disable=too-many-nested-blocks
yield
finally:
for key, old_value in current_env_state.items():
if old_value is None:
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = old_value
def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
"""
Checks if a pidfile already exists and process is still running.
If process is dead then pidfile is removed.
:param pid_file: path to the pidfile
:param process_name: name used in exception if process is up and
running
"""
pid_lock_file = PIDLockFile(path=pid_file)
# If file exists
if pid_lock_file.is_locked():
# Read the pid
pid = pid_lock_file.read_pid()
if pid is None:
return
try:
# Check if process is still running
proc = psutil.Process(pid)
if proc.is_running():
raise AirflowException(f"The {process_name} is already running under PID {pid}.")
except psutil.NoSuchProcess:
# If process is dead remove the pidfile
pid_lock_file.break_lock()