blob: df86660ee9571d7a520d7d260a6d8a368dbe6965 [file] [log] [blame]
import os
import tempfile
from mysos.common import pkgutil, zookeeper
from .http import MysosServer
from .scheduler import MysosScheduler
from .state import LocalStateProvider, Scheduler, StateProvider
from .zk_state import ZooKeeperStateProvider
from kazoo.client import KazooClient
import mesos.interface
from mesos.interface.mesos_pb2 import Credential, FrameworkInfo
import mesos.native
from twitter.common import app, log
from twitter.common.exceptions import ExceptionalThread
from twitter.common.http import HttpServer
from twitter.common.log.options import LogOptions
from twitter.common.metrics import MetricSampler, RootMetrics
from twitter.common.quantity import Time
from twitter.common.quantity.parse_simple import InvalidTime, parse_time
import yaml
FRAMEWORK_NAME = 'mysos'
MYSOS_MODULE = 'mysos.scheduler'
ASSET_RELPATH = 'assets'
LogOptions.disable_disk_logging()
LogOptions.set_stderr_log_level('google:INFO')
def proxy_main():
app.add_option(
'--port',
dest='api_port',
type='int',
default=None,
help='Port for the HTTP API server')
app.add_option(
'--mesos_master',
dest='mesos_master',
default=None,
help='Mesos master address. It can be a ZooKeeper URL through which the master can be '
'detected')
app.add_option(
'--framework_user',
dest='framework_user',
help='The Unix user that Mysos executor runs as')
app.add_option(
'--framework_role',
dest='framework_role',
default='*',
help="The role that Mysos framework runs as. If set, Mysos only uses Mesos pool resources "
"with that role. The default value '*' is what Mesos considers as the default role.\n"
"NOTE: Mesos master needs to be configured to allow the specified role. See its --roles "
"flag")
app.add_option(
'--executor_uri',
dest='executor_uri',
default=None,
help='URI for the Mysos executor package')
app.add_option(
'--executor_cmd',
dest='executor_cmd',
default=None,
help='Command to execute the executor package')
app.add_option(
'--executor_environ',
dest='executor_environ',
default=None,
help="Environment variables for the executors (and the tasks) as a list of dicts keyed by "
"{name, value} in JSON. Note that these variables don't affect Mesos slave components "
"such as the fetcher")
app.add_option(
'--zk_url',
dest='zk_url',
default=None,
help='ZooKeeper URL for various Mysos operations, in the form of '
'"zk://username:password@servers/path". The sub-directory <zk_url>/discover is used for '
'communicating MySQL cluster information between Mysos scheduler and executors')
# TODO(jyx): This could also be made a per-cluster configuration.
app.add_option(
'--election_timeout',
dest='election_timeout',
default='60s',
help='The amount of time the scheduler waits for all slaves to respond during a MySQL master '
'election, e.g., 60s. After the timeout the master is elected from only the slaves that '
'have responded')
app.add_option(
'--admin_keypath',
dest='admin_keypath',
default=None,
help='The path to the key file with MySQL admin credentials on Mesos slaves')
app.add_option(
'--work_dir',
dest='work_dir',
default=os.path.join(tempfile.gettempdir(), 'mysos'),
help="Directory path to place Mysos work directories, e.g., web assets, state files if "
"--state_storage=local. Default to a system temp directory.")
app.add_option(
'--state_storage',
dest='state_storage',
default='zk',
help="Mechanism to persist scheduler state. Available options are 'zk' and 'local'. If 'zk' "
"is chosen, the scheduler state is stored under <zk_url>/state; see --zk_url. Otherwise "
"'local' is chosen and the state is persisted under <work_dir>/state; see --work_dir")
app.add_option(
'--scheduler_keypath',
dest='scheduler_keypath',
help="Path to the key file that the scheduler uses to store secrets such as MySQL "
"cluster passwords. This key must be exactly 32 bytes long")
app.add_option(
'--framework_failover_timeout',
dest='framework_failover_timeout',
default='14d',
help='Time after which Mysos framework is considered deleted. This implies losing all tasks. '
'SHOULD BE VERY HIGH')
# TODO(jyx): Flags like this are generally optional but specific executor implementations may
# require them. Consider adding validators that can be plugged in so configuration errors can be
# caught in the scheduler.
app.add_option(
'--installer_args',
dest='installer_args',
default=None,
help='Arguments for MySQL installer directly passed along to and parsed by the installer. '
'e.g., a serialized JSON string'
)
app.add_option(
'--backup_store_args',
dest='backup_store_args',
default=None,
help="Arguments for the store for MySQL backups. Its use and format are defined by the "
"backup store implementation. e.g., It can be a serialized JSON string"
)
app.add_option(
'--framework_authentication_file',
dest='framework_authentication_file',
default=None,
help="Path to the key file for authenticating the framework against Mesos master. Framework "
"will fail to register with Mesos if authentication is required by Mesos and this "
"option is not provided"
)
app.add_option(
'--executor_source_prefix',
dest='executor_source_prefix',
default=None,
help="Mysos uses the 'source' field in ExecutorInfo (See Mesos documentation) to group tasks "
"to support metrics tracking by external utilities. The format of ExecutorInfo.source "
"is '<prefix>.<cluster_name>.<server_id>'. This flag specifies the prefix to use in the "
"'source' field. e.g., it can be '<availability_zone>.<mesos_cluster>'. There is no "
"preceding period if <prefix> is empty"
)
def main(args, options):
log.info("Options in use: %s", options)
if not options.api_port:
app.error('Must specify --port')
if not options.mesos_master:
app.error('Must specify --mesos_master')
if not options.framework_user:
app.error('Must specify --framework_user')
if not options.executor_uri:
app.error('Must specify --executor_uri')
if not options.executor_cmd:
app.error('Must specify --executor_cmd')
if not options.zk_url:
app.error('Must specify --zk_url')
if not options.admin_keypath:
app.error('Must specify --admin_keypath')
if not options.scheduler_keypath:
app.error('Must specify --scheduler_keypath')
try:
election_timeout = parse_time(options.election_timeout)
framework_failover_timeout = parse_time(options.framework_failover_timeout)
except InvalidTime as e:
app.error(e.message)
try:
_, zk_servers, zk_root = zookeeper.parse(options.zk_url)
except Exception as e:
app.error("Invalid --zk_url: %s" % e.message)
web_assets_dir = os.path.join(options.work_dir, "web")
pkgutil.unpack_assets(web_assets_dir, MYSOS_MODULE, ASSET_RELPATH)
log.info("Extracted web assets into %s" % options.work_dir)
fw_principal = None
fw_secret = None
if options.framework_authentication_file:
try:
with open(options.framework_authentication_file, "r") as f:
cred = yaml.load(f)
fw_principal = cred["principal"]
fw_secret = cred["secret"]
log.info("Loaded credential (principal=%s) for framework authentication" % fw_principal)
except IOError as e:
app.error("Unable to read the framework authentication key file: %s" % e)
except (KeyError, yaml.YAMLError) as e:
app.error("Invalid framework authentication key file format %s" % e)
scheduler_key = None
try:
with open(options.scheduler_keypath, 'rb') as f:
scheduler_key = f.read().strip()
if not scheduler_key:
raise ValueError("The key file is empty")
except Exception as e:
app.error("Cannot read --scheduler_keypath: %s" % e)
log.info("Starting Mysos scheduler")
kazoo = KazooClient(zk_servers)
kazoo.start()
if options.state_storage == 'zk':
log.info("Using ZooKeeper (path: %s) for state storage" % zk_root)
state_provider = ZooKeeperStateProvider(kazoo, zk_root)
else:
log.info("Using local disk for state storage")
state_provider = LocalStateProvider(options.work_dir)
try:
state = state_provider.load_scheduler_state()
except StateProvider.Error as e:
app.error(e.message)
if state:
log.info("Successfully restored scheduler state")
framework_info = state.framework_info
if framework_info.HasField('id'):
log.info("Recovered scheduler's FrameworkID is %s" % framework_info.id.value)
else:
log.info("No scheduler state to restore")
framework_info = FrameworkInfo(
user=options.framework_user,
name=FRAMEWORK_NAME,
checkpoint=True,
failover_timeout=framework_failover_timeout.as_(Time.SECONDS),
role=options.framework_role)
if fw_principal:
framework_info.principal = fw_principal
state = Scheduler(framework_info)
state_provider.dump_scheduler_state(state)
scheduler = MysosScheduler(
state,
state_provider,
options.framework_user,
options.executor_uri,
options.executor_cmd,
kazoo,
options.zk_url,
election_timeout,
options.admin_keypath,
scheduler_key,
installer_args=options.installer_args,
backup_store_args=options.backup_store_args,
executor_environ=options.executor_environ,
executor_source_prefix=options.executor_source_prefix,
framework_role=options.framework_role)
RootMetrics().register_observable('scheduler', scheduler)
if fw_principal and fw_secret:
cred = Credential(principal=fw_principal, secret=fw_secret)
scheduler_driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework_info,
options.mesos_master,
cred)
else:
scheduler_driver = mesos.native.MesosSchedulerDriver(
scheduler,
framework_info,
options.mesos_master)
scheduler_driver.start()
metric_sampler = MetricSampler(RootMetrics())
metric_sampler.start()
server = HttpServer()
server.mount_routes(MysosServer(scheduler, web_assets_dir, metric_sampler))
et = ExceptionalThread(
target=server.run, args=('0.0.0.0', options.api_port, 'cherrypy'))
et.daemon = True
et.start()
try:
# Wait for the scheduler to stop.
# The use of 'stopped' event instead of scheduler_driver.join() is necessary to stop the
# process with SIGINT.
while not scheduler.stopped.wait(timeout=0.5):
pass
except KeyboardInterrupt:
log.info('Interrupted, exiting.')
else:
log.info('Scheduler exited.')
app.shutdown(1) # Mysos scheduler is supposed to be long-running thus the use of exit status 1.
app.main()