blob: 663f0f496c7bb4a7a01df8111bb00570564536aa [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Utilities for running or stopping processes."""
from __future__ import annotations
import errno
import logging
import os
import select
import shlex
import signal
import subprocess
import sys
from airflow.utils.platform import IS_WINDOWS
if not IS_WINDOWS:
import pty
import termios
import tty
from contextlib import contextmanager
from typing import Generator
import psutil
from lockfile.pidlockfile import PIDLockFile
from airflow.configuration import conf
from airflow.exceptions import AirflowException
log = logging.getLogger(__name__)
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
def reap_process_group(
process_group_id: int,
logger,
sig: signal.Signals = signal.SIGTERM,
timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
) -> dict[int, int]:
"""
Send sig (SIGTERM) to the process group of pid.
Tries really hard to terminate all processes in the group (including grandchildren). Will send
sig (SIGTERM) to the process group of pid. If any process is alive after timeout
a SIGKILL will be send.
:param process_group_id: process group id to kill.
The process that wants to create the group should run
`airflow.utils.process_utils.set_new_process_group()` as the first command
it executes which will set group id = process_id. Effectively the process that is the
"root" of the group has pid = gid and all other processes in the group have different
pids but the same gid (equal the pid of the root process)
:param logger: log handler
:param sig: signal type
:param timeout: how much time a process has to terminate
"""
returncodes = {}
def on_terminate(p):
logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
returncodes[p.pid] = p.returncode
def signal_procs(sig):
if IS_WINDOWS:
return
try:
logger.info("Sending the signal %s to group %s", sig, process_group_id)
os.killpg(process_group_id, sig)
except OSError as err_killpg:
# If operation not permitted error is thrown due to run_as_user,
# use sudo -n(--non-interactive) to kill the process
if err_killpg.errno == errno.EPERM:
subprocess.check_call(
["sudo", "-n", "kill", "-" + str(int(sig))]
+ [str(p.pid) for p in all_processes_in_the_group]
)
elif err_killpg.errno == errno.ESRCH:
# There is a rare condition that the process has not managed yet to change it's process
# group. In this case os.killpg fails with ESRCH error
# So we additionally send a kill signal to the process itself.
logger.info(
"Sending the signal %s to process %s as process group is missing.", sig, process_group_id
)
try:
os.kill(process_group_id, sig)
except OSError as err_kill:
if err_kill.errno == errno.EPERM:
subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
else:
raise
else:
raise
if not IS_WINDOWS and process_group_id == os.getpgid(0):
raise RuntimeError("I refuse to kill myself")
try:
parent = psutil.Process(process_group_id)
all_processes_in_the_group = parent.children(recursive=True)
all_processes_in_the_group.append(parent)
except psutil.NoSuchProcess:
# The process already exited, but maybe it's children haven't.
all_processes_in_the_group = []
for proc in psutil.process_iter():
try:
if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
all_processes_in_the_group.append(proc)
except OSError:
pass
logger.info(
"Sending %s to group %s. PIDs of all processes in the group: %s",
sig,
process_group_id,
[p.pid for p in all_processes_in_the_group],
)
try:
signal_procs(sig)
except OSError as err:
# No such process, which means there is no such process group - our job
# is done
if err.errno == errno.ESRCH:
return returncodes
_, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
if alive:
for proc in alive:
logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
try:
signal_procs(signal.SIGKILL)
except OSError as err:
if err.errno != errno.ESRCH:
raise
_, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
if alive:
for proc in alive:
logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
return returncodes
def execute_in_subprocess(cmd: list[str], cwd: str | None = None) -> None:
"""
Execute a process and stream output to logger.
:param cmd: command and arguments to run
:param cwd: Current working directory passed to the Popen constructor
"""
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None:
"""
Execute a process and stream output to logger.
:param cmd: command and arguments to run
All other keyword args will be passed directly to subprocess.Popen
"""
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs
) as proc:
log.info("Output:")
if proc.stdout:
with proc.stdout:
for line in iter(proc.stdout.readline, b""):
log.info("%s", line.decode().rstrip())
exit_code = proc.wait()
if exit_code != 0:
raise subprocess.CalledProcessError(exit_code, cmd)
def execute_interactive(cmd: list[str], **kwargs) -> None:
"""
Run the new command as a subprocess.
Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
the process is completed.
"""
log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
primary_fd, secondary_fd = pty.openpty()
try:
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
with subprocess.Popen(
cmd,
stdin=secondary_fd,
stdout=secondary_fd,
stderr=secondary_fd,
universal_newlines=True,
**kwargs,
) as proc:
while proc.poll() is None:
readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [])
if sys.stdin in readable_fbs:
input_data = os.read(sys.stdin.fileno(), 10240)
os.write(primary_fd, input_data)
if primary_fd in readable_fbs:
output_data = os.read(primary_fd, 10240)
if output_data:
os.write(sys.stdout.fileno(), output_data)
finally:
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None:
"""
Kills child processes for the current process.
First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
the SIGKILL signal, if the process is still alive.
:param pids_to_kill: List of PID to be killed.
:param timeout: The time to wait before sending the SIGKILL signal.
"""
this_process = psutil.Process(os.getpid())
# Only check child processes to ensure that we don't have a case
# where we kill the wrong process because a child process died
# but the PID got reused.
child_processes = [
x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
]
# First try SIGTERM
for child in child_processes:
log.info("Terminating child PID: %s", child.pid)
child.terminate()
log.info("Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(
child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
)
except psutil.TimeoutExpired:
log.debug("Ran out of time while waiting for processes to exit")
# Then SIGKILL
child_processes = [
x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
]
if child_processes:
log.info("SIGKILL processes that did not terminate gracefully")
for child in child_processes:
log.info("Killing child PID: %s", child.pid)
child.kill()
child.wait()
@contextmanager
def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]:
"""
Set environment variables in context.
After leaving the context, it restores its original state.
:param new_env_variables: Environment variables to set
"""
current_env_state = {key: os.environ.get(key) for key in new_env_variables.keys()}
os.environ.update(new_env_variables)
try:
yield
finally:
for key, old_value in current_env_state.items():
if old_value is None:
if key in os.environ:
del os.environ[key]
else:
os.environ[key] = old_value
def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
"""
Checks if a pidfile already exists and process is still running.
If process is dead then pidfile is removed.
:param pid_file: path to the pidfile
:param process_name: name used in exception if process is up and
running
"""
pid_lock_file = PIDLockFile(path=pid_file)
# If file exists
if pid_lock_file.is_locked():
# Read the pid
pid = pid_lock_file.read_pid()
if pid is None:
return
try:
# Check if process is still running
proc = psutil.Process(pid)
if proc.is_running():
raise AirflowException(f"The {process_name} is already running under PID {pid}.")
except psutil.NoSuchProcess:
# If process is dead remove the pidfile
pid_lock_file.break_lock()
def set_new_process_group() -> None:
"""Try to set current process to a new process group.
That makes it easy to kill all sub-process of this at the OS-level,
rather than having to iterate the child processes.
If current process was spawned by system call ``exec()``, the current
process group is kept.
"""
if os.getpid() == os.getsid(0):
# If PID = SID than process a session leader, and it is not possible to change process group
return
os.setpgid(0, 0)