| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| Task plugin tests. |
| """ |
| |
| import os |
| |
| from cli import config |
| from cli import http |
| |
| from cli.exceptions import CLIException |
| |
| from cli.plugins.task.main import Task as TaskPlugin |
| |
| from cli.tests import capture_output |
| from cli.tests import exec_command |
| from cli.tests import wait_for_task |
| from cli.tests import CLITestCase |
| from cli.tests import Agent |
| from cli.tests import Master |
| from cli.tests import Task |
| |
| from cli.tests.constants import TEST_DATA_DIRECTORY |
| |
| from cli.util import Table |
| |
| |
| LOREM_IPSUM = os.path.join(TEST_DATA_DIRECTORY, "lorem-ipsum.txt") |
| |
| class TestTaskPlugin(CLITestCase): |
| """ |
| Test class for the task plugin. |
| """ |
| def test_exec(self): |
| """ |
| Basic test for the task `exec()` sub-command. |
| """ |
| # Launch a master, agent, and task. |
| master = Master() |
| master.launch() |
| |
| agent = Agent() |
| agent.launch() |
| |
| with open(LOREM_IPSUM) as text: |
| content = text.read() |
| command = "printf '{data}' > a.txt && sleep 1000".format(data=content) |
| task = Task({"command": command}) |
| task.launch() |
| |
| try: |
| wait_for_task(master, task.name, "TASK_RUNNING") |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task.name, state="TASK_RUNNING", error=exception)) |
| |
| try: |
| tasks = http.get_json(master.addr, "tasks")["tasks"] |
| except Exception as exception: |
| raise CLIException( |
| "Could not get tasks from '/{endpoint}' on master: {error}" |
| .format(endpoint="tasks", error=exception)) |
| |
| self.assertEqual(type(tasks), list) |
| self.assertEqual(len(tasks), 1) |
| |
| returncode, stdout, stderr = exec_command( |
| ["mesos", "task", "exec", tasks[0]["id"], "cat", "a.txt"]) |
| |
| self.assertEqual(returncode, 0) |
| self.assertEqual(stdout, content) |
| self.assertEqual(stderr, "") |
| |
| # Kill the task, agent, and master. |
| task.kill() |
| agent.kill() |
| master.kill() |
| |
| def test_exec_exit_status(self): |
| """ |
| Basic test for the task `exec()` sub-command exit status. |
| """ |
| # Launch a master, agent, and task. |
| master = Master() |
| master.launch() |
| |
| agent = Agent() |
| agent.launch() |
| |
| task = Task({"command": "sleep 1000"}) |
| task.launch() |
| |
| try: |
| wait_for_task(master, task.name, "TASK_RUNNING") |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task.name, state="TASK_RUNNING", error=exception)) |
| |
| try: |
| tasks = http.get_json(master.addr, "tasks")["tasks"] |
| except Exception as exception: |
| raise CLIException( |
| "Could not get tasks from '/{endpoint}' on master: {error}" |
| .format(endpoint="tasks", error=exception)) |
| |
| self.assertEqual(type(tasks), list) |
| self.assertEqual(len(tasks), 1) |
| |
| returncode, _, _ = exec_command( |
| ["mesos", "task", "exec", tasks[0]["id"], "true"]) |
| self.assertEqual(returncode, 0) |
| |
| returncode, _, _ = exec_command( |
| ["mesos", "task", "exec", tasks[0]["id"], "bash", "-c", "exit 10"]) |
| self.assertEqual(returncode, 10) |
| |
| # Kill the task, agent, and master. |
| task.kill() |
| agent.kill() |
| master.kill() |
| |
| |
| def test_exec_interactive(self): |
| """ |
| Test for the task `exec()` sub-command, using `--interactive`. |
| """ |
| # Launch a master, agent, and task. |
| master = Master() |
| master.launch() |
| |
| agent = Agent() |
| agent.launch() |
| |
| task = Task({"command": "sleep 1000"}) |
| task.launch() |
| |
| try: |
| wait_for_task(master, task.name, "TASK_RUNNING") |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task.name, state="TASK_RUNNING", error=exception)) |
| |
| try: |
| tasks = http.get_json(master.addr, "tasks")["tasks"] |
| except Exception as exception: |
| raise CLIException( |
| "Could not get tasks from '/{endpoint}' on master: {error}" |
| .format(endpoint="tasks", error=exception)) |
| |
| self.assertEqual(type(tasks), list) |
| self.assertEqual(len(tasks), 1) |
| |
| with open(LOREM_IPSUM) as text: |
| returncode, stdout, stderr = exec_command( |
| ["mesos", "task", "exec", "-i", tasks[0]["id"], "cat"], |
| stdin=text) |
| |
| self.assertEqual(returncode, 0) |
| with open(LOREM_IPSUM) as text: |
| self.assertEqual(stdout, text.read()) |
| self.assertEqual(stderr, "") |
| |
| # Kill the task, agent, and master. |
| task.kill() |
| agent.kill() |
| master.kill() |
| |
| |
| def test_list(self): |
| """ |
| Basic test for the task `list()` sub-command. |
| """ |
| # Launch a master, agent, and task. |
| master = Master() |
| master.launch() |
| |
| agent = Agent() |
| agent.launch() |
| |
| task = Task({"command": "sleep 1000"}) |
| task.launch() |
| |
| try: |
| wait_for_task(master, task.name, "TASK_RUNNING") |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task.name, state="TASK_RUNNING", error=exception)) |
| |
| try: |
| tasks = http.get_json(master.addr, "tasks")["tasks"] |
| except Exception as exception: |
| raise CLIException( |
| "Could not get tasks from '/{endpoint}' on master: {error}" |
| .format(endpoint="tasks", error=exception)) |
| |
| self.assertEqual(type(tasks), list) |
| self.assertEqual(len(tasks), 1) |
| |
| # Invoke the task plugin `list()` command |
| # and parse its output as a table. |
| test_config = config.Config(None) |
| plugin = TaskPlugin(None, test_config) |
| output = capture_output(plugin.list, {"--all": False}) |
| table = Table.parse(output) |
| |
| # Verify there are two rows in the table |
| # and that they are formatted as expected, |
| # with the proper task info in them. |
| self.assertEqual(table.dimensions()[0], 2) |
| self.assertEqual(table.dimensions()[1], 4) |
| self.assertEqual("ID", table[0][0]) |
| self.assertEqual("State", table[0][1]) |
| self.assertEqual("Framework ID", table[0][2]) |
| self.assertEqual("Executor ID", table[0][3]) |
| self.assertEqual(tasks[0]["id"], table[1][0]) |
| self.assertEqual(tasks[0]["statuses"][-1]["state"], table[1][1]) |
| self.assertEqual(tasks[0]["framework_id"], table[1][2]) |
| self.assertEqual(tasks[0]["executor_id"], table[1][3]) |
| |
| # Kill the task, agent, and master. |
| task.kill() |
| agent.kill() |
| master.kill() |
| |
| def test_list_all(self): |
| """ |
| Basic test for the task `list()` sub-command with flag `--all`. |
| """ |
| # Launch a master, agent, and two tasks. |
| master = Master() |
| master.launch() |
| |
| agent = Agent() |
| agent.launch() |
| |
| task1 = Task({"command": "true"}) |
| task1.launch() |
| task1_state = "TASK_FINISHED" |
| |
| try: |
| wait_for_task(master, task1.name, task1_state) |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task1.name, state=task1_state, error=exception)) |
| |
| task2 = Task({"command": "sleep 1000"}) |
| task2.launch() |
| task2_state = "TASK_RUNNING" |
| |
| try: |
| wait_for_task(master, task2.name, task2_state) |
| except Exception as exception: |
| raise CLIException( |
| "Error waiting for task '{name}' to" |
| " reach state '{state}': {error}" |
| .format(name=task2.name, state=task2_state, error=exception)) |
| |
| try: |
| tasks = http.get_json(master.addr, "tasks")["tasks"] |
| except Exception as exception: |
| raise CLIException( |
| "Could not get tasks from '/{endpoint}' on master: {error}" |
| .format(endpoint="tasks", error=exception)) |
| |
| self.assertEqual(type(tasks), list) |
| self.assertEqual(len(tasks), 2) |
| |
| # Invoke the task plugin `list()` command |
| # and parse its output as a table. |
| test_config = config.Config(None) |
| plugin = TaskPlugin(None, test_config) |
| output = capture_output(plugin.list, {"--all": True}) |
| table = Table.parse(output) |
| |
| # Verify that there are two rows in the table, one for the running task |
| # and one for the finished task. We do verify the information in the |
| # table as this is already covered in the test `test_list`. |
| self.assertEqual(table.dimensions()[0], 3) |
| self.assertEqual(table.dimensions()[1], 4) |
| |
| # Kill the task1, task2, agent, and master. |
| task1.kill() |
| task2.kill() |
| agent.kill() |
| master.kill() |