Merge pull request #57 from xujyan/jyx/executor_resources
Take some resources from Mysos task to assign to the executor.
diff --git a/mysos/scheduler/launcher.py b/mysos/scheduler/launcher.py
index 87923f5..8da9567 100644
--- a/mysos/scheduler/launcher.py
+++ b/mysos/scheduler/launcher.py
@@ -18,6 +18,10 @@
EXECUTOR_NAME = 'mysos.executor'
+EXECUTOR_CPUS_EPSILON = 0.01
+EXECUTOR_MEM_EPSILON = Amount(32, Data.MB)
+EXECUTOR_DISK_EPSILON = Amount(1, Data.MB)
+
class MySQLClusterLauncher(object):
"""
@@ -287,6 +291,13 @@
task.executor.source = '.'.join(source)
task.executor.command.value = self._executor_cmd
+ task.executor.resources.extend(create_resources(
+ EXECUTOR_CPUS_EPSILON,
+ EXECUTOR_MEM_EPSILON,
+ EXECUTOR_DISK_EPSILON,
+ ports=[],
+ role=self._framework_role))
+
if self._executor_environ: # Could be 'None' since it's an optional argument.
executor_environ_ = json.loads(self._executor_environ)
if executor_environ_:
@@ -316,9 +327,12 @@
'backup_id': self._cluster.backup_id,
})
- resources = create_resources(
- task_cpus, task_mem, task_disk, set([task_port]), role=self._framework_role)
- task.resources.extend(resources)
+ task.resources.extend(create_resources(
+ task_cpus - EXECUTOR_CPUS_EPSILON,
+ task_mem - EXECUTOR_MEM_EPSILON,
+ task_disk - EXECUTOR_DISK_EPSILON,
+ set([task_port]),
+ role=self._framework_role))
return task
@@ -558,16 +572,20 @@
disk_resources.role = role
disk_resources.scalar.value = disk.as_(Data.MB)
- ports_resources = mesos_pb2.Resource()
- ports_resources.name = 'ports'
- ports_resources.type = mesos_pb2.Value.RANGES
- ports_resources.role = role
- for port in ports:
- port_range = ports_resources.ranges.range.add()
- port_range.begin = port
- port_range.end = port
+ resources = [cpus_resources, mem_resources, disk_resources]
- return [cpus_resources, mem_resources, disk_resources, ports_resources]
+ if ports:
+ ports_resources = mesos_pb2.Resource()
+ ports_resources.name = 'ports'
+ ports_resources.type = mesos_pb2.Value.RANGES
+ ports_resources.role = role
+ for port in ports:
+ port_range = ports_resources.ranges.range.add()
+ port_range.begin = port
+ port_range.end = port
+ resources += [ports_resources]
+
+ return resources
def is_terminal(state):
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index b1cfe5c..e5d403e 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -10,7 +10,9 @@
from mysos.common.cluster import get_cluster_path
from mysos.common.decorators import logged
-from .launcher import MySQLClusterLauncher
+from .launcher import (
+ EXECUTOR_CPUS_EPSILON, EXECUTOR_DISK_EPSILON, EXECUTOR_MEM_EPSILON, MySQLClusterLauncher
+)
from .password import gen_password, PasswordBox
from .state import MySQLCluster, Scheduler, StateProvider
@@ -210,6 +212,15 @@
raise ValueError("Invalid number of cluster nodes: %s" % num_nodes)
resources = parse_size(size)
+
+ if (resources['cpus'] <= EXECUTOR_CPUS_EPSILON or
+ resources['mem'] <= EXECUTOR_MEM_EPSILON or
+ resources['disk'] <= EXECUTOR_DISK_EPSILON):
+ raise ValueError(
+ "Instance 'size' too small. It should be larger than what Mysos executor consumes: "
+ "(cpus, mem, disk) = (%s, %s, %s)" % (
+ EXECUTOR_CPUS_EPSILON, EXECUTOR_MEM_EPSILON, EXECUTOR_DISK_EPSILON))
+
log.info("Requested resources per instance for cluster %s: %s" % (resources, cluster_name))
self._metrics.total_requested_cpus.write(