blob: a6fd95fa2c4d515ab177ff02ea94980d0f43f6dd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Functions to handle agents.
"""
import base64
import json
import os
import platform
import signal
import sys
import threading
import time
import tty
import uuid
from functools import partial
from queue import Queue
import termios
from cli import http
from cli import util
from cli.exceptions import CLIException
import mesos.http
from mesos import recordio
from mesos.exceptions import MesosException
from mesos.exceptions import MesosHTTPException
def get_agent_address(agent_id, master):
"""
Given a master and an agent id, return the agent address
by checking the /slaves endpoint of the master.
"""
try:
agents = http.get_json(master, "slaves")["slaves"]
except Exception as exception:
raise CLIException("Could not open '/slaves'"
" endpoint at '{addr}': {error}"
.format(addr=master,
error=exception))
for agent in agents:
if agent["id"] == agent_id:
return agent["pid"].split("@")[1]
raise CLIException("Unable to find agent '{id}'".format(id=agent_id))
def get_agents(master):
"""
Get the agents in a Mesos cluster.
"""
endpoint = "slaves"
key = "slaves"
try:
data = http.get_json(master, endpoint)
except Exception as exception:
raise CLIException(
"Could not open '/{endpoint}' on master: {error}"
.format(endpoint=endpoint, error=exception))
if not key in data:
raise CLIException(
"Missing '{key}' key in data retrieved"
" from master on '/{endpoint}'"
.format(key=key, endpoint=endpoint))
return data[key]
def get_container_id(task):
"""
Get the container ID of a task.
"""
if 'statuses' not in task:
raise CLIException("Unable to obtain status information for task")
statuses = task['statuses']
if not statuses:
raise CLIException("No status updates available for task")
# It doesn't matter which status we use to get the `container_id`, if the
# `container_id` has been set for the task, all statuses will contain it.
if not 'container_status' in statuses[0]:
raise CLIException("Task status does not contain container information")
container_status = statuses[0]['container_status']
if 'container_id' in container_status:
container_id = container_status['container_id']
if 'value' in container_id:
return container_id
raise CLIException(
"No container found for the specified task."
" It might still be spinning up."
" Please try again.")
def get_tasks(master, query=None):
"""
Get the tasks in a Mesos cluster.
"""
endpoint = "tasks"
key = "tasks"
try:
data = http.get_json(master, endpoint, query=query)
except Exception as exception:
raise CLIException(
"Could not open '/{endpoint}' with query parameters: {query}"
"on master: {error}"
.format(endpoint=endpoint, query=query, error=exception))
if not key in data:
raise CLIException(
"Missing '{key}' key in data retrieved"
" from master on '/{endpoint}' with query parameters: {query}"
.format(key=key, endpoint=endpoint, query=query))
return data[key]
class TaskIO():
"""
Object used to stream I/O between a
running Mesos task and the local terminal.
:param task: task ID
:type task: str
:param cmd: a command to launch inside the task's container
:type cmd: str
:param args: Additional arguments for the command
:type args: str
:param interactive: whether to attach STDIN of the current
terminal to the new command being launched
:type interactive: bool
:param tty: whether to allocate a tty for this command and attach
the local terminal to it
:type tty: bool
"""
# pylint: disable=too-many-instance-attributes
# The interval to send heartbeat messages to
# keep persistent connections alive.
HEARTBEAT_INTERVAL = 30
HEARTBEAT_INTERVAL_NANOSECONDS = HEARTBEAT_INTERVAL * 1000000000
def __init__(self, master, task_id):
# Get the task and make sure its container was launched by the UCR.
# Since task's containers are launched by the UCR by default, we want
# to allow most tasks to pass through unchecked. The only exception is
# when a task has an explicit container specified and it is not of type
# "MESOS". Having a type of "MESOS" implies that it was launched by the
# UCR -- all other types imply it was not.
try:
tasks = get_tasks(master, query={'task_id': task_id})
except Exception as exception:
raise CLIException("Unable to get task with ID {task_id}"
" from leading master '{master}': {error}"
.format(task_id=task_id, master=master,
error=exception))
running_tasks = [t for t in tasks if t["state"] == "TASK_RUNNING"]
matching_tasks = [t for t in running_tasks if t["id"] == task_id]
if not matching_tasks:
raise CLIException("Unable to find running task '{task_id}'"
" from leading master '{master}'"
.format(task_id=task_id, master=master))
if len(matching_tasks) > 1:
raise CLIException("More than one task matching id '{id}'"
.format(id=task_id))
task_obj = matching_tasks[0]
if "container" in task_obj:
if "type" in task_obj["container"]:
if task_obj["container"]["type"] != "MESOS":
raise CLIException(
"This command is only supported for tasks"
" launched by the Universal Container Runtime (UCR).")
# Get the URL to the agent running the task.
agent_addr = util.sanitize_address(
get_agent_address(task_obj["slave_id"], master))
self.agent_url = mesos.http.simple_urljoin(agent_addr, "api/v1")
# Get the agent's task path by checking the `state` endpoint.
try:
self.container_id = get_container_id(task_obj)
except CLIException as exception:
raise CLIException("Could not get container ID of task '{id}'"
" from agent '{addr}': {error}"
.format(id=task_id, addr=agent_addr,
error=exception))
# Set up a recordio encoder and decoder
# for any incoming and outgoing messages.
self.encoder = recordio.Encoder(
lambda s: bytes(json.dumps(s, ensure_ascii=False), "UTF-8"))
self.decoder = recordio.Decoder(
lambda s: json.loads(s.decode("UTF-8")))
# Set up queues to send messages between threads used for
# reading/writing to STDIN/STDOUT/STDERR and threads
# sending/receiving data over the network.
self.input_queue = Queue()
self.output_queue = Queue()
# Set up an event to block attaching
# input until attaching output is complete.
self.attach_input_event = threading.Event()
self.attach_input_event.clear()
# Set up an event to block printing the output
# until an attach input event has successfully
# been established.
self.print_output_event = threading.Event()
self.print_output_event.clear()
# Set up an event to block the main thread
# from exiting until signaled to do so.
self.exit_event = threading.Event()
self.exit_event.clear()
# Use a class variable to store exceptions thrown on
# other threads and raise them on the main thread before
# exiting.
self.exception = None
# Default values for the TaskIO.
self.cmd = None
self.args = None
self.interactive = False
self.tty = False
self.output_thread_entry_point = None
# Allow an exit sequence to be used to break the CLIs attachment to
# the remote task. Depending on the call, this may be disabled, or
# the exit sequence to be used may be overwritten.
self.supports_exit_sequence = False
self.exit_sequence = b'\x10\x11' # Ctrl-p, Ctrl-q
self.exit_sequence_detected = False
def attach(self, _no_stdin=False):
"""
Attach the stdin/stdout/stderr of the CLI to the
STDIN/STDOUT/STDERR of a running task.
As of now, we can only attach to tasks launched with a remote TTY
already set up for them. If we try to attach to a task that was
launched without a remote TTY attached, this command will fail.
:param task: task ID pattern to match
:type task: str
:param no_stdin: True if we should *not* attach stdin,
False if we should
:type no_stdin: bool
"""
# Store relevant parameters of the call for later.
self.interactive = not _no_stdin
self.tty = True
# Set the entry point of the output thread to be a call to
# _attach_container_output.
self.output_thread_entry_point = self._attach_container_output
self._run()
if not self.exit_sequence_detected:
# We are only able to get the 'exit_status' of tasks launched via
# the default executor (i.e. as pods rather than via the command
# executor). In the future, mesos will deprecate the command
# executor in favor of the default executor, so this check will
# be able to go away. In the meantime, we will always return '0'
# for tasks launched via the command executor.
if "parent" in self.container_id:
return self._wait()
return 0
def exec(self, _cmd, _args=None, _interactive=False, _tty=False):
"""
Execute a new process inside of a given task by redirecting
STDIN/STDOUT/STDERR between the CLI and the Mesos Agent API.
If a tty is requested, we take over the current terminal and
put it into raw mode. We make sure to reset the terminal back
to its original settings before exiting.
:param cmd: The command to launch inside the task's container
:type args: cmd
:param args: Additional arguments for the command
:type args: list
:param interactive: attach stdin
:type interactive: bool
:param tty: attach a tty
:type tty: bool
"""
# Store relevant parameters of the call for later.
self.cmd = _cmd
self.args = _args
self.interactive = _interactive
self.tty = _tty
# Override the container ID with the current container ID as the
# parent, and generate a new UUID for the nested container used to
# run commands passed to `task exec`.
self.container_id = {
'parent': self.container_id,
'value': str(uuid.uuid4())
}
# Set the entry point of the output thread to be a call to
# _launch_nested_container_session.
self.output_thread_entry_point = self._launch_nested_container_session
self._run()
return self._wait()
def _run(self):
"""
Run the helper threads in this class which enable streaming
of STDIN/STDOUT/STDERR between the CLI and the Mesos Agent API.
If a tty is requested, we take over the current terminal and
put it into raw mode. We make sure to reset the terminal back
to its original settings before exiting.
"""
# Without a TTY.
if not self.tty:
try:
self._start_threads()
self.exit_event.wait()
except Exception as e:
self.exception = e
if self.exception:
# Known Pylint issue: https://github.com/PyCQA/pylint/issues/157
# pylint: disable=raising-bad-type
raise self.exception
return
# With a TTY.
if platform.system() == "Windows":
raise CLIException(
"Running with the '--tty' flag is not supported on windows")
if not sys.stdin.isatty():
raise CLIException(
"Must be running in a tty to pass the '--tty flag'")
fd = sys.stdin.fileno()
oldtermios = termios.tcgetattr(fd)
try:
if self.interactive:
self.supports_exit_sequence = True
tty.setraw(fd, when=termios.TCSANOW)
# To force a redraw of the remote terminal, we first resize it
# to 0 before setting it to the actual size of our local
# terminal. After that, all terminal resizing is handled in our
# SIGWINCH handler.
self._window_resize(signal.SIGWINCH, dimensions=[0, 0])
self._window_resize(signal.SIGWINCH)
signal.signal(signal.SIGWINCH, self._window_resize)
self._start_threads()
self.exit_event.wait()
except Exception as e:
self.exception = e
termios.tcsetattr(
sys.stdin.fileno(),
termios.TCSAFLUSH,
oldtermios)
if self.exception:
# Known Pylint issue: https://github.com/PyCQA/pylint/issues/157
# pylint: disable=raising-bad-type
raise self.exception
def _wait(self):
"""
Wait for the container associated with this class (through
'container_id') to exit and return its exit status.
"""
message = {
'type': 'WAIT_CONTAINER',
'wait_container': {
'container_id': self.container_id}}
req_extra_args = {
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/json'}}
try:
resource = mesos.http.Resource(self.agent_url)
response = resource.request(
mesos.http.METHOD_POST,
data=json.dumps(message),
retry=False,
timeout=None,
**req_extra_args)
except MesosException as exception:
raise CLIException(
"Error waiting for command to complete: {error}"
.format(error=exception))
exit_status = response.json()["wait_container"]["exit_status"]
if os.WIFSIGNALED(exit_status):
return os.WTERMSIG(exit_status) + 128
return os.WEXITSTATUS(exit_status)
def _thread_wrapper(self, func):
"""
A wrapper around all threads used in this class.
If a thread throws an exception, it will unblock the main
thread and save the exception in a class variable. The main
thread will then rethrow the exception before exiting.
:param func: The start function for the thread
:type func: function
"""
try:
func()
except Exception as e:
self.exception = e
self.exit_event.set()
def _start_threads(self):
"""
Start all threads associated with this class.
"""
if self.interactive:
# Collects input from STDIN and puts
# it in the input_queue as data messages.
thread = threading.Thread(
target=self._thread_wrapper,
args=(self._input_thread,))
thread.daemon = True
thread.start()
# Prepares heartbeat control messages and
# puts them in the input queueaat a specific
# heartbeat interval.
thread = threading.Thread(
target=self._thread_wrapper,
args=(self._heartbeat_thread,))
thread.daemon = True
thread.start()
# Opens a persistent connection with the mesos agent and
# feeds it both control and data messages from the input
# queue via ATTACH_CONTAINER_INPUT messages.
thread = threading.Thread(
target=self._thread_wrapper,
args=(self._attach_container_input,))
thread.daemon = True
thread.start()
# Opens a persistent connection with a mesos agent, reads
# data messages from it and feeds them to an output_queue.
thread = threading.Thread(
target=self._thread_wrapper,
args=(self.output_thread_entry_point,))
thread.daemon = True
thread.start()
# Collects data messages from the output queue and writes
# their content to STDOUT and STDERR.
thread = threading.Thread(
target=self._thread_wrapper,
args=(self._output_thread,))
thread.daemon = True
thread.start()
def _attach_container_output(self):
"""
Streams all output data (e.g. STDOUT/STDERR) to the
client from the agent.
"""
message = {
'type': 'ATTACH_CONTAINER_OUTPUT',
'attach_container_output': {
'container_id': self.container_id}}
req_extra_args = {
'stream': True,
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/recordio',
'Message-Accept': 'application/json'}}
try:
resource = mesos.http.Resource(self.agent_url)
response = resource.request(
mesos.http.METHOD_POST,
data=json.dumps(message),
retry=False,
timeout=None,
**req_extra_args)
except MesosHTTPException as e:
text = "I/O switchboard server was disabled for this container"
if e.response.status_code == 500 and e.response.text == text:
raise CLIException("Unable to attach to a task"
" launched without a TTY")
raise e
self._process_output_stream(response)
def _launch_nested_container_session(self):
"""
Sends a request to the Mesos Agent to launch a new
nested container and attach to its output stream.
The output stream is then sent back in the response.
"""
message = {
'type': "LAUNCH_NESTED_CONTAINER_SESSION",
'launch_nested_container_session': {
'container_id': self.container_id,
'command': {
'value': self.cmd,
'arguments': [self.cmd] + self.args,
'shell': False}}}
if self.tty:
message[
'launch_nested_container_session'][
'container'] = {
'type': 'MESOS',
'tty_info': {}}
req_extra_args = {
'stream': True,
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/recordio',
'Message-Accept': 'application/json'}}
resource = mesos.http.Resource(self.agent_url)
try:
response = resource.request(
mesos.http.METHOD_POST,
data=json.dumps(message),
retry=False,
timeout=None,
**req_extra_args)
except MesosException as exception:
raise CLIException("{error}".format(error=exception))
self._process_output_stream(response)
def _process_output_stream(self, response):
"""
Gets data streamed over the given response and places the
returned messages into our output_queue. Only expects to
receive data messages.
:param response: Response from an http post
:type response: requests.models.Response
"""
# Now that we are ready to process the output stream (meaning
# our output connection has been established), allow the input
# stream to be attached by setting an event.
self.attach_input_event.set()
# If we are running in interactive mode, wait to make sure that
# our input connection succeeds before pushing any output to the
# output queue.
if self.interactive:
self.print_output_event.wait()
try:
for chunk in response.iter_content(chunk_size=None):
records = self.decoder.decode(chunk)
for r in records:
if r.get('type') and r['type'] == 'DATA':
self.output_queue.put(r['data'])
except Exception as e:
raise CLIException(
"Error parsing output stream: {error}".format(error=e))
self.output_queue.join()
self.exit_event.set()
def _attach_container_input(self):
"""
Streams all input data (e.g. STDIN) from the client to the agent.
"""
def _initial_input_streamer():
"""
Generator function yielding the initial ATTACH_CONTAINER_INPUT
message for streaming. We have a separate generator for this so
that we can attempt the connection once before committing to a
persistent connection where we stream the rest of the input.
:returns: A RecordIO encoded message
"""
message = {
'type': 'ATTACH_CONTAINER_INPUT',
'attach_container_input': {
'type': 'CONTAINER_ID',
'container_id': self.container_id}}
yield self.encoder.encode(message)
def _input_streamer():
"""
Generator function yielding ATTACH_CONTAINER_INPUT messages for
streaming. It yields the _intitial_input_streamer() message,
followed by messages from the input_queue on each subsequent call.
:returns: A RecordIO encoded message
"""
yield next(_initial_input_streamer(), None)
while True:
record = self.input_queue.get()
if not record:
if self.exit_sequence_detected:
sys.stdout.write("\r\n")
sys.stdout.flush()
self.exit_event.set()
break
yield record
req_extra_args = {
'additional_headers': {
'Content-Type': 'application/recordio',
'Message-Content-Type': 'application/json',
'Accept': 'application/json',
'Connection': 'close',
'Transfer-Encoding': 'chunked'
}
}
# Ensure we don't try to attach our input to a container that isn't
# fully up and running by waiting until the
# `_process_output_stream` function signals us that it's ready.
self.attach_input_event.wait()
# Send an intial "Test" message to ensure that we are able to
# establish a connection with the agent. If we aren't we will throw
# an exception and break out of this thread. However, in cases where
# we receive a 500 response from the agent, we actually want to
# continue without throwing an exception. A 500 error indicates that
# we can't connect to the container because it has already finished
# running. In that case we continue running to allow the output queue
# to be flushed.
resource = mesos.http.Resource(self.agent_url)
try:
resource.request(
mesos.http.METHOD_POST,
data=_initial_input_streamer(),
retry=False,
**req_extra_args)
except MesosHTTPException as e:
if not e.response.status_code == 500:
raise e
# If we succeeded with that connection, unblock process_output_stream()
# from sending output data to the output thread.
self.print_output_event.set()
# Begin streaming the input.
resource = mesos.http.Resource(self.agent_url)
resource.request(
mesos.http.METHOD_POST,
data=_input_streamer(),
retry=False,
timeout=None,
**req_extra_args)
def _detect_exit_sequence(self, chunk):
"""
Detects if 'self.exit_sequence' is present in 'chunk'.
If a partial exit sequence is detected at the end of 'chunk', then
more characters are read from 'stdin' and appended to 'chunk' in
search of the full sequence. Since python cannot pass variables by
reference, we return a modified 'chunk' with the extra characters
read if necessary.
If the exit sequence is found, the class variable
'exit_sequence_detected' is set to True.
:param chunk: a byte array to search for the exit sequence in
:type chunk: byte array
:returns: a modified byte array containing the original 'chunk' plus
any extra characters read in search of the exit sequence
:rtype: byte array
"""
if not self.supports_exit_sequence:
return chunk
if chunk.find(self.exit_sequence) != -1:
self.exit_sequence_detected = True
return chunk
for i in reversed(range(1, len(self.exit_sequence))):
if self.exit_sequence[:-i] == chunk[len(chunk)-i:]:
chunk += os.read(sys.stdin.fileno(), 1)
return self._detect_exit_sequence(chunk)
return chunk
def _input_thread(self):
"""
Reads from STDIN and places a message
with that data onto the input_queue.
"""
message = {
'type': 'ATTACH_CONTAINER_INPUT',
'attach_container_input': {
'type': 'PROCESS_IO',
'process_io': {
'type': 'DATA',
'data': {
'type': 'STDIN',
'data': ''}}}}
for chunk in iter(partial(os.read, sys.stdin.fileno(), 1024), b''):
chunk = self._detect_exit_sequence(chunk)
if self.exit_sequence_detected:
break
message[
'attach_container_input'][
'process_io'][
'data'][
'data'] = base64.b64encode(chunk).decode('utf-8')
self.input_queue.put(self.encoder.encode(message))
# Push an empty string to indicate EOF to the server and push
# 'None' to signal that we are done processing input.
message['attach_container_input']['process_io']['data']['data'] = ''
self.input_queue.put(self.encoder.encode(message))
self.input_queue.put(None)
def _output_thread(self):
"""
Reads from the output_queue and writes the data
to the appropriate STDOUT or STDERR.
"""
while True:
# Get a message from the output queue and decode it.
# Then write the data to the appropriate stdout or stderr.
output = self.output_queue.get()
if not output.get('data'):
raise CLIException("Error no 'data' field in output message")
data = output['data']
data = base64.b64decode(data.encode('utf-8'))
if output.get('type') and output['type'] == 'STDOUT':
sys.stdout.buffer.write(data)
sys.stdout.flush()
elif output.get('type') and output['type'] == 'STDERR':
sys.stderr.buffer.write(data)
sys.stderr.flush()
else:
raise CLIException("Unsupported data type in output stream")
self.output_queue.task_done()
def _heartbeat_thread(self):
"""
Generates a heartbeat message to send over the ATTACH_CONTAINER_INPUT
stream every `interval` seconds and inserts it in the input queue.
"""
interval = self.HEARTBEAT_INTERVAL
nanoseconds = self.HEARTBEAT_INTERVAL_NANOSECONDS
message = {
'type': 'ATTACH_CONTAINER_INPUT',
'attach_container_input': {
'type': 'PROCESS_IO',
'process_io': {
'type': 'CONTROL',
'control': {
'type': 'HEARTBEAT',
'heartbeat': {
'interval': {
'nanoseconds': nanoseconds}}}}}}
while True:
self.input_queue.put(self.encoder.encode(message))
time.sleep(interval)
def _window_resize(self, signum=None, frame=None, dimensions=None):
# pylint: disable=unused-argument
"""
Signal handler for SIGWINCH.
Generates a message with the current dimensions of the
terminal and puts it in the input_queue.
:param signum: the signal number being handled
:type signum: int
:param frame: current stack frame
:type frame: frame
"""
# Determine the size of our terminal, and create the message to be sent
if dimensions:
rows, columns = dimensions
else:
rows, columns = os.popen('stty size', 'r').read().split()
message = {
'type': 'ATTACH_CONTAINER_INPUT',
'attach_container_input': {
'type': 'PROCESS_IO',
'process_io': {
'type': 'CONTROL',
'control': {
'type': 'TTY_INFO',
'tty_info': {
'window_size': {
'rows': int(rows),
'columns': int(columns)}}}}}}
self.input_queue.put(self.encoder.encode(message))