Added Mesos authentication to the Mesos cli
The following points I have done:
- Add authentication against Mesos master and agent.
- Add option to skip SSL verification of the mesos-agent.
- Change the order of "task list" to get back more running states.
This closes #383
diff --git a/src/python/cli_new/README.md b/src/python/cli_new/README.md
index 0e6c716..7ac22b2 100644
--- a/src/python/cli_new/README.md
+++ b/src/python/cli_new/README.md
@@ -104,6 +104,9 @@
# `address` or `zookeeper` field, but not both. For example:
[master]
address = "10.10.0.30:5050"
+ principal = "username"
+ secret = "password"
+
# The `zookeeper` field has an `addresses` array and a `path` field.
# [master.zookeeper]
# addresses = [
@@ -112,6 +115,13 @@
# "10.10.0.33:5050"
# ]
# path = "/mesos"
+
+[agent]
+ ssl = true
+ ssl_verify = false
+ principal = "username"
+ secret = "password"
+ timeout = 5
```
You can override the location of this configuration file using
diff --git a/src/python/cli_new/lib/cli/config.py b/src/python/cli_new/lib/cli/config.py
index 7f41736..fa6c8ff 100644
--- a/src/python/cli_new/lib/cli/config.py
+++ b/src/python/cli_new/lib/cli/config.py
@@ -21,6 +21,7 @@
import os
import toml
+import requests
import cli
from cli.constants import DEFAULT_MASTER_IP
@@ -119,6 +120,79 @@
return master
+ def principal(self):
+ """
+ Return the principal in the configuration file
+ """
+ return self.data["master"].get("principal")
+
+ def secret(self):
+ """
+ Return the secret in the configuration file
+ """
+ return self.data["master"].get("secret")
+
+ def agent_ssl(self, default=False):
+ """
+ Return if the agent support ssl
+ """
+ if "agent" in self.data:
+ agent_ssl = self.data["agent"].get("ssl", default)
+ if not isinstance(agent_ssl, bool):
+ raise CLIException("The 'agent->ssl' field"
+ " must be True/False")
+
+ return agent_ssl
+
+ return default
+
+ def agent_ssl_verify(self, default=False):
+ """
+ Return if the ssl certificate should be verified
+ """
+ if "agent" in self.data:
+ ssl_verify = self.data["agent"].get("ssl_verify", default)
+ if not isinstance(ssl_verify, bool):
+ raise CLIException("The 'agent->ssl_verify' field"
+ " must be True/False")
+
+ return ssl_verify
+
+ return default
+
+ def agent_timeout(self, default=5):
+ """
+ Return the connection timeout of the agent
+ """
+ if "agent" in self.data:
+ timeout = self.data["agent"].get("timeout", default)
+ if not isinstance(timeout, int):
+ raise CLIException("The 'agent->timeout' field"
+ " must be a number in seconds")
+
+ return timeout
+
+ return default
+
+
+ def agent_principal(self):
+ """
+ Return the principal in the configuration file
+ """
+ if "agent" in self.data:
+ return self.data["agent"].get("principal")
+
+ return None
+
+ def agent_secret(self):
+ """
+ Return the secret in the configuration file
+ """
+ if "agent" in self.data:
+ return self.data["agent"].get("secret")
+
+ return None
+
def plugins(self):
"""
Parse the plugins listed in the configuration file and return them.
@@ -137,3 +211,15 @@
return self.data["plugins"]
return []
+
+ def authentication_header(self):
+ """
+ Return the BasicAuth authentication header
+ """
+ if (self.agent_principal() is not None
+ and self.agent_secret() is not None):
+ return requests.auth.HTTPBasicAuth(
+ self.agent_principal(),
+ self.agent_secret()
+ )
+ return None
diff --git a/src/python/cli_new/lib/cli/http.py b/src/python/cli_new/lib/cli/http.py
index 10fd889..c39935f 100644
--- a/src/python/cli_new/lib/cli/http.py
+++ b/src/python/cli_new/lib/cli/http.py
@@ -19,70 +19,70 @@
"""
import json
-import urllib.request
-import urllib.error
-import urllib.parse
-import time
+from urllib.parse import urlencode
+import urllib3
import cli
from cli.exceptions import CLIException
+# Disable all SSL warnings. These are not necessary, as the user has
+# the option to disable SSL verification.
+urllib3.disable_warnings()
-def read_endpoint(addr, endpoint, query=None):
+def read_endpoint(addr, endpoint, config, query=None):
"""
Read the specified endpoint and return the results.
"""
+
try:
addr = cli.util.sanitize_address(addr)
except Exception as exception:
raise CLIException("Unable to sanitize address '{addr}': {error}"
.format(addr=addr, error=str(exception)))
-
try:
url = "{addr}/{endpoint}".format(addr=addr, endpoint=endpoint)
if query is not None:
- url += "?{query}".format(query=urllib.parse.urlencode(query))
- http_response = urllib.request.urlopen(url).read().decode("utf-8")
+ url += "?{query}".format(query=urlencode(query))
+ if config.principal() is not None and config.secret() is not None:
+ headers = urllib3.make_headers(
+ basic_auth=config.principal() + ":" + config.secret()
+ )
+ else:
+ headers = None
+ http = urllib3.PoolManager()
+ http_response = http.request(
+ 'GET',
+ url,
+ headers=headers,
+ timeout=config.agent_timeout()
+ )
+ return http_response.data.decode('utf-8')
+
except Exception as exception:
raise CLIException("Unable to open url '{url}': {error}"
.format(url=url, error=str(exception)))
- return http_response
-
-def get_json(addr, endpoint, condition=None, timeout=5, query=None):
+def get_json(addr, endpoint, config, condition=None, query=None):
"""
Return the contents of the 'endpoint' at 'addr' as JSON data
subject to the condition specified in 'condition'. If we are
- unable to read the data or unable to meet the condition within
- 'timeout' seconds we throw an error.
+ unable to read the data we throw an error.
"""
- start_time = time.time()
- while True:
- data = None
+ data = read_endpoint(addr, endpoint, config, query)
- try:
- data = read_endpoint(addr, endpoint, query)
- except Exception as exception:
- pass
+ try:
+ data = json.loads(data)
+ except Exception as exception:
+ raise CLIException("Could not load JSON from '{data}': {error}"
+ .format(data=data, error=str(exception)))
- if data:
- try:
- data = json.loads(data)
- except Exception as exception:
- raise CLIException("Could not load JSON from '{data}': {error}"
- .format(data=data, error=str(exception)))
+ if not condition:
+ return data
- if not condition:
- return data
+ if condition(data):
+ return data
- if condition(data):
- return data
-
- if time.time() - start_time > timeout:
- raise CLIException("Failed to get data within {seconds} seconds"
- .format(seconds=str(timeout)))
-
- time.sleep(0.1)
+ return data
diff --git a/src/python/cli_new/lib/cli/mesos.py b/src/python/cli_new/lib/cli/mesos.py
index a6fd95f..44a66db 100644
--- a/src/python/cli_new/lib/cli/mesos.py
+++ b/src/python/cli_new/lib/cli/mesos.py
@@ -44,13 +44,13 @@
from mesos.exceptions import MesosHTTPException
-def get_agent_address(agent_id, master):
+def get_agent_address(agent_id, master, config):
"""
Given a master and an agent id, return the agent address
by checking the /slaves endpoint of the master.
"""
try:
- agents = http.get_json(master, "slaves")["slaves"]
+ agents = http.get_json(master, "slaves", config)["slaves"]
except Exception as exception:
raise CLIException("Could not open '/slaves'"
" endpoint at '{addr}': {error}"
@@ -62,15 +62,14 @@
raise CLIException("Unable to find agent '{id}'".format(id=agent_id))
-def get_agents(master):
+def get_agents(master, config):
"""
Get the agents in a Mesos cluster.
"""
endpoint = "slaves"
key = "slaves"
-
try:
- data = http.get_json(master, endpoint)
+ data = http.get_json(master, endpoint, config)
except Exception as exception:
raise CLIException(
"Could not open '/{endpoint}' on master: {error}"
@@ -114,15 +113,18 @@
" Please try again.")
-def get_tasks(master, query=None):
+def get_tasks(master, config, query=None):
"""
Get the tasks in a Mesos cluster.
"""
endpoint = "tasks"
key = "tasks"
+ if query is None:
+ query = {'order':'asc'}
+
try:
- data = http.get_json(master, endpoint, query=query)
+ data = http.get_json(master, endpoint, config, query=query)
except Exception as exception:
raise CLIException(
"Could not open '/{endpoint}' with query parameters: {query}"
@@ -162,7 +164,7 @@
HEARTBEAT_INTERVAL = 30
HEARTBEAT_INTERVAL_NANOSECONDS = HEARTBEAT_INTERVAL * 1000000000
- def __init__(self, master, task_id):
+ def __init__(self, master, config, task_id):
# Get the task and make sure its container was launched by the UCR.
# Since task's containers are launched by the UCR by default, we want
# to allow most tasks to pass through unchecked. The only exception is
@@ -170,7 +172,7 @@
# "MESOS". Having a type of "MESOS" implies that it was launched by the
# UCR -- all other types imply it was not.
try:
- tasks = get_tasks(master, query={'task_id': task_id})
+ tasks = get_tasks(master, config, query={'task_id': task_id})
except Exception as exception:
raise CLIException("Unable to get task with ID {task_id}"
" from leading master '{master}': {error}"
@@ -199,11 +201,13 @@
"This command is only supported for tasks"
" launched by the Universal Container Runtime (UCR).")
+ # Get the scheme of the agent
+ scheme = "https://" if config.agent_ssl() else "http://"
+
# Get the URL to the agent running the task.
agent_addr = util.sanitize_address(
- get_agent_address(task_obj["slave_id"], master))
+ scheme + get_agent_address(task_obj["slave_id"], master, config))
self.agent_url = mesos.http.simple_urljoin(agent_addr, "api/v1")
-
# Get the agent's task path by checking the `state` endpoint.
try:
self.container_id = get_container_id(task_obj)
@@ -253,6 +257,7 @@
self.interactive = False
self.tty = False
self.output_thread_entry_point = None
+ self.config = config
# Allow an exit sequence to be used to break the CLIs attachment to
# the remote task. Depending on the call, this may be disabled, or
@@ -323,7 +328,6 @@
self.args = _args
self.interactive = _interactive
self.tty = _tty
-
# Override the container ID with the current container ID as the
# parent, and generate a new UUID for the nested container used to
# run commands passed to `task exec`.
@@ -413,6 +417,7 @@
'wait_container': {
'container_id': self.container_id}}
req_extra_args = {
+ 'verify': self.config.agent_ssl_verify(),
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/json'}}
@@ -421,6 +426,7 @@
response = resource.request(
mesos.http.METHOD_POST,
data=json.dumps(message),
+ auth=self.config.authentication_header(),
retry=False,
timeout=None,
**req_extra_args)
@@ -511,6 +517,7 @@
req_extra_args = {
'stream': True,
+ 'verify': self.config.agent_ssl_verify(),
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/recordio',
@@ -523,6 +530,7 @@
data=json.dumps(message),
retry=False,
timeout=None,
+ auth=self.config.authentication_header(),
**req_extra_args)
except MesosHTTPException as e:
text = "I/O switchboard server was disabled for this container"
@@ -539,6 +547,7 @@
nested container and attach to its output stream.
The output stream is then sent back in the response.
"""
+
message = {
'type': "LAUNCH_NESTED_CONTAINER_SESSION",
'launch_nested_container_session': {
@@ -557,11 +566,11 @@
req_extra_args = {
'stream': True,
+ 'verify': self.config.agent_ssl_verify(),
'additional_headers': {
'Content-Type': 'application/json',
'Accept': 'application/recordio',
'Message-Accept': 'application/json'}}
-
resource = mesos.http.Resource(self.agent_url)
try:
response = resource.request(
@@ -569,6 +578,7 @@
data=json.dumps(message),
retry=False,
timeout=None,
+ auth=self.config.authentication_header(),
**req_extra_args)
except MesosException as exception:
raise CLIException("{error}".format(error=exception))
@@ -654,6 +664,7 @@
yield record
req_extra_args = {
+ 'verify': self.config.agent_ssl_verify(),
'additional_headers': {
'Content-Type': 'application/recordio',
'Message-Content-Type': 'application/json',
@@ -682,6 +693,7 @@
mesos.http.METHOD_POST,
data=_initial_input_streamer(),
retry=False,
+ auth=self.config.authentication_header(),
**req_extra_args)
except MesosHTTPException as e:
if not e.response.status_code == 500:
@@ -698,6 +710,7 @@
data=_input_streamer(),
retry=False,
timeout=None,
+ auth=self.config.authentication_header(),
**req_extra_args)
def _detect_exit_sequence(self, chunk):
diff --git a/src/python/cli_new/lib/cli/plugins/agent/main.py b/src/python/cli_new/lib/cli/plugins/agent/main.py
index 4a658f9..158e83a 100644
--- a/src/python/cli_new/lib/cli/plugins/agent/main.py
+++ b/src/python/cli_new/lib/cli/plugins/agent/main.py
@@ -53,12 +53,13 @@
# pylint: disable=unused-argument
try:
master = self.config.master()
+ config = self.config
except Exception as exception:
raise CLIException("Unable to get leading master address: {error}"
.format(error=exception))
try:
- agents = get_agents(master)
+ agents = get_agents(master, config)
except Exception as exception:
raise CLIException("Unable to get agents from leading"
" master '{master}': {error}"
diff --git a/src/python/cli_new/lib/cli/plugins/task/main.py b/src/python/cli_new/lib/cli/plugins/task/main.py
index 00167f8..d223746 100644
--- a/src/python/cli_new/lib/cli/plugins/task/main.py
+++ b/src/python/cli_new/lib/cli/plugins/task/main.py
@@ -75,11 +75,12 @@
"""
try:
master = self.config.master()
+ config = self.config
except Exception as exception:
raise CLIException("Unable to get leading master address: {error}"
.format(error=exception))
- task_io = TaskIO(master, argv["<task-id>"])
+ task_io = TaskIO(master, config, argv["<task-id>"])
return task_io.attach(argv["--no-stdin"])
@@ -89,11 +90,12 @@
"""
try:
master = self.config.master()
+ config = self.config
except Exception as exception:
raise CLIException("Unable to get leading master address: {error}"
.format(error=exception))
- task_io = TaskIO(master, argv["<task-id>"])
+ task_io = TaskIO(master, config, argv["<task-id>"])
return task_io.exec(argv["<command>"],
argv["<args>"],
argv["--interactive"],
@@ -106,12 +108,13 @@
# pylint: disable=unused-argument
try:
master = self.config.master()
+ config = self.config
except Exception as exception:
raise CLIException("Unable to get leading master address: {error}"
.format(error=exception))
try:
- tasks = get_tasks(master)
+ tasks = get_tasks(master, config)
except Exception as exception:
raise CLIException("Unable to get tasks from leading"
" master '{master}': {error}"
diff --git a/src/python/cli_new/lib/cli/tests/agent.py b/src/python/cli_new/lib/cli/tests/agent.py
index 31e3e3f..8ff6842 100644
--- a/src/python/cli_new/lib/cli/tests/agent.py
+++ b/src/python/cli_new/lib/cli/tests/agent.py
@@ -48,7 +48,7 @@
# Open the master's `/slaves` endpoint and read the
# agents' information ourselves.
- agents = http.get_json(master.addr, 'slaves')["slaves"]
+ agents = http.get_json(master.addr, None, 'slaves')["slaves"]
self.assertEqual(type(agents), list)
self.assertEqual(len(agents), 1)
diff --git a/src/python/cli_new/lib/cli/tests/base.py b/src/python/cli_new/lib/cli/tests/base.py
index e3104fe..980c00b 100644
--- a/src/python/cli_new/lib/cli/tests/base.py
+++ b/src/python/cli_new/lib/cli/tests/base.py
@@ -346,7 +346,7 @@
to return data subject to 'condition'.
"""
try:
- data = http.get_json(self.flags["master"], "slaves")
+ data = http.get_json(self.flags["master"], None, "slaves")
except Exception as exception:
raise CLIException("Could not get '/slaves' endpoint"
" as JSON: {error}"
@@ -510,7 +510,7 @@
"""
@retry(wait=wait_fixed(0.2), stop=stop_after_delay(delay))
def _wait_for_task():
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
for task in tasks:
if task["name"] == name and task["state"] == state:
return task
diff --git a/src/python/cli_new/lib/cli/tests/task.py b/src/python/cli_new/lib/cli/tests/task.py
index b846ee8..5511165 100644
--- a/src/python/cli_new/lib/cli/tests/task.py
+++ b/src/python/cli_new/lib/cli/tests/task.py
@@ -72,7 +72,7 @@
.format(name=task.name, state="TASK_RUNNING", error=exception))
try:
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
except Exception as exception:
raise CLIException(
"Could not get tasks from '/{endpoint}' on master: {error}"
@@ -116,7 +116,7 @@
.format(name=task.name, state="TASK_RUNNING", error=exception))
try:
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
except Exception as exception:
raise CLIException(
"Could not get tasks from '/{endpoint}' on master: {error}"
@@ -162,7 +162,7 @@
.format(name=task.name, state="TASK_RUNNING", error=exception))
try:
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
except Exception as exception:
raise CLIException(
"Could not get tasks from '/{endpoint}' on master: {error}"
@@ -210,7 +210,7 @@
.format(name=task.name, state="TASK_RUNNING", error=exception))
try:
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
except Exception as exception:
raise CLIException(
"Could not get tasks from '/{endpoint}' on master: {error}"
@@ -281,7 +281,7 @@
.format(name=task2.name, state=task2_state, error=exception))
try:
- tasks = http.get_json(master.addr, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, None, "tasks")["tasks"]
except Exception as exception:
raise CLIException(
"Could not get tasks from '/{endpoint}' on master: {error}"
diff --git a/src/python/cli_new/pip-requirements.txt b/src/python/cli_new/pip-requirements.txt
index 4f512a4..f0642b9 100644
--- a/src/python/cli_new/pip-requirements.txt
+++ b/src/python/cli_new/pip-requirements.txt
@@ -1,11 +1,9 @@
-astroid==2.0.4
backports.functools-lru-cache==1.2.1
configparser==3.5.0
docopt==0.6.2
isort==4.2.5
kazoo==2.5.0
lazy-object-proxy==1.2.2
-mccabe==0.5.2
parse==1.8.0
Pygments==2.1.3
PyInstaller==3.4
diff --git a/src/python/cli_new/tox.ini b/src/python/cli_new/tox.ini
index 3aa93a6..1d76fe4 100644
--- a/src/python/cli_new/tox.ini
+++ b/src/python/cli_new/tox.ini
@@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
-envlist = {py3,py36,py37}-{lint,test}
+envlist = {py3,py36,py37,py38}-{lint,test}
skipsdist = true
[testenv]
@@ -12,6 +12,7 @@
py3: python3
py36: python3.6
py37: python3.7
+ py38: python3.8
deps =
-rpip-requirements.txt
test: coverage==4.5.1