Merge pull request #37 from xujyan/docs

Add a simple docs/README.md.
diff --git a/docs/user-guide.md b/docs/user-guide.md
index 8e6f1e4..da393b8 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -15,8 +15,8 @@
 
 ### Parameters
 - `cluster_name`: Required. Name of the cluster.
-- `cluster_user`: Required. The user account for all MySQL instances in the cluster which as full admin
-privileges.
+- `cluster_user`: Required. The user account for all MySQL instances in the cluster which as full
+admin privileges.
 - `num_nodes`: Number of nodes in the cluster. [default: 1]
 - `size`: The size of instances in the cluster as a JSON dictionary of `cpus`, `mem` and `disk`.
 `mem` and `disk` are specified with standard data size units such as `mb`, `gb`, `tb`, etc. (no
@@ -24,6 +24,9 @@
 - `backup_id`: An ID for the MySQL backup to restore from when the MySQL instance starts. If not
 specified, Mysos will start an empty MySQL instance. The format and meaning of `backup_id` is
 specific to the implementation of `BackupStore` that the Mysos cluster uses.
+- `cluster_password`: The password used for accessing MySQL instances in the cluster as well as
+deleting the cluster from Mysos. If unspecified then Mysos generates one for the cluster. In either
+case the password is sent back as part of the response.
 
 `cluster_name` is part of the path and the rest of the parameters are specified as form fields.
 
@@ -31,7 +34,8 @@
 ### Response
 A JSON object with the following fields:
 
-- `cluster_password`: The password for accessing the MySQL instance (associated with `cluster_user`).
+- `cluster_password`: The password for accessing the MySQL instance (associated with
+`cluster_user`).
 - `cluster_url`: A URL to the ZooKeeper group for discovering the MySQL instances of this cluster.
 See the *Service Discovery* section below.
 
diff --git a/mysos/executor/files/bin/mysql/scripts/mysos_promote_master.sh b/mysos/executor/files/bin/mysql/scripts/mysos_promote_master.sh
index 722f902..5fc3f38 100755
--- a/mysos/executor/files/bin/mysql/scripts/mysos_promote_master.sh
+++ b/mysos/executor/files/bin/mysql/scripts/mysos_promote_master.sh
@@ -3,7 +3,7 @@
 # Promote the MySQL slave to be a master.
 #
 
-set -uex
+set -ue  # No -x due to passwords in the commands.
 
 host=$1
 port=$2
diff --git a/mysos/executor/files/bin/mysql/scripts/mysos_reparent.sh b/mysos/executor/files/bin/mysql/scripts/mysos_reparent.sh
index c609925..5830940 100755
--- a/mysos/executor/files/bin/mysql/scripts/mysos_reparent.sh
+++ b/mysos/executor/files/bin/mysql/scripts/mysos_reparent.sh
@@ -3,7 +3,7 @@
 # Reparent the slave to a new master.
 #
 
-set -uxe
+set -ue  # No -x due to passwords in the commands.
 
 master_host=$1
 master_port=$2
diff --git a/mysos/executor/mysql_task_control.py b/mysos/executor/mysql_task_control.py
index 3695841..d5ca96f 100644
--- a/mysos/executor/mysql_task_control.py
+++ b/mysos/executor/mysql_task_control.py
@@ -166,33 +166,35 @@
   @synchronized
   def reparent(self, master_host, master_port, env=None):
     command = ("%(cmd)s %(master_host)s %(master_port)s %(slave_host)s %(slave_port)s "
-        "%(admin_user)s %(admin_password)s" % dict(
-            cmd=os.path.join(self._scripts_dir, "mysos_reparent.sh"),
-            master_host=master_host,
-            master_port=master_port,
-            slave_host=self._host,
-            slave_port=self._port,
-            admin_user=self._admin_username,
-            admin_password=self._admin_password))
+               "%(admin_user)s %(admin_password)s")
+    params = dict(
+        cmd=os.path.join(self._scripts_dir, "mysos_reparent.sh"),
+        master_host=master_host,
+        master_port=master_port,
+        slave_host=self._host,
+        slave_port=self._port,
+        admin_user=self._admin_username,
+        admin_password=self._admin_password)
 
-    log.info("Executing command: %s" % command)
-    subprocess.check_call(command, shell=True, env=env)
+    log.info("Executing command: %s" % (command % dict(params, admin_password="<redacted>")))
+    subprocess.check_call(command % params, shell=True, env=env)
 
   @synchronized
   def promote(self, env=None):
     command = ("%(cmd)s %(host)s %(port)s %(cluster_user)s %(password)s %(admin_user)s "
-        "%(admin_password)s" % dict(
-            cmd=os.path.join(self._scripts_dir, "mysos_promote_master.sh"),
-            host=self._host,
-            port=self._port,
-            cluster_user=self._cluster_user,
-            password=self._password,
-            admin_user=self._admin_username,
-            admin_password=self._admin_password))
+               "%(admin_password)s")
+    params = dict(
+        cmd=os.path.join(self._scripts_dir, "mysos_promote_master.sh"),
+        host=self._host,
+        port=self._port,
+        cluster_user=self._cluster_user,
+        password=self._password,
+        admin_user=self._admin_username,
+        admin_password=self._admin_password)
 
-    # TODO(jyx): Scrub the command log line to hide the password.
-    log.info("Executing command: %s" % command)
-    subprocess.check_call(command, shell=True, env=env)
+    log.info("Executing command: %s" % (
+        command % dict(params, password="<redacted>", admin_password="<redacted>")))
+    subprocess.check_call(command % params, shell=True, env=env)
 
   @synchronized
   def get_log_position(self, env=None):
diff --git a/mysos/scheduler/http.py b/mysos/scheduler/http.py
index afe74a5..45804c5 100644
--- a/mysos/scheduler/http.py
+++ b/mysos/scheduler/http.py
@@ -30,6 +30,7 @@
     cluster_user = bottle.request.forms.get('cluster_user', default=None)
     backup_id = bottle.request.forms.get('backup_id', default=None)
     size = bottle.request.forms.get('size', default=None)
+    cluster_password = bottle.request.forms.get('cluster_password', default=None)
 
     try:
       cluster_zk_url, cluster_password = self._scheduler.create_cluster(
@@ -37,7 +38,8 @@
           cluster_user,
           num_nodes,
           size,
-          backup_id=backup_id)
+          backup_id=backup_id,
+          cluster_password=cluster_password)
       return json.dumps(dict(cluster_url=cluster_zk_url, cluster_password=cluster_password))
     except MysosScheduler.ClusterExists as e:
       raise bottle.HTTPResponse(e.message, status=409)
diff --git a/mysos/scheduler/mysos_scheduler.py b/mysos/scheduler/mysos_scheduler.py
index df86660..16ec00a 100644
--- a/mysos/scheduler/mysos_scheduler.py
+++ b/mysos/scheduler/mysos_scheduler.py
@@ -140,16 +140,14 @@
       dest='installer_args',
       default=None,
       help='Arguments for MySQL installer directly passed along to and parsed by the installer. '
-           'e.g., a serialized JSON string'
-  )
+           'e.g., a serialized JSON string')
 
   app.add_option(
       '--backup_store_args',
       dest='backup_store_args',
       default=None,
       help="Arguments for the store for MySQL backups. Its use and format are defined by the "
-           "backup store implementation. e.g., It can be a serialized JSON string"
-  )
+           "backup store implementation. e.g., It can be a serialized JSON string")
 
   app.add_option(
       '--framework_authentication_file',
@@ -157,8 +155,7 @@
       default=None,
       help="Path to the key file for authenticating the framework against Mesos master. Framework "
            "will fail to register with Mesos if authentication is required by Mesos and this "
-           "option is not provided"
-  )
+           "option is not provided")
 
   app.add_option(
       '--executor_source_prefix',
@@ -168,8 +165,13 @@
            "to support metrics tracking by external utilities. The format of ExecutorInfo.source "
            "is '<prefix>.<cluster_name>.<server_id>'. This flag specifies the prefix to use in the "
            "'source' field. e.g., it can be '<availability_zone>.<mesos_cluster>'. There is no "
-           "preceding period if <prefix> is empty"
-  )
+           "preceding period if <prefix> is empty")
+
+  app.add_option(
+      '--verbose',
+      dest='verbose',
+      default=None,
+      help="Turn on verbose logging")
 
   def main(args, options):
     log.info("Options in use: %s", options)
@@ -198,6 +200,9 @@
     if not options.scheduler_keypath:
       app.error('Must specify --scheduler_keypath')
 
+    if options.verbose:
+      LogOptions.set_stderr_log_level('google:DEBUG')
+
     try:
       election_timeout = parse_time(options.election_timeout)
       framework_failover_timeout = parse_time(options.framework_failover_timeout)
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 0c060df..52986ae 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -128,7 +128,14 @@
         MutatorGauge('total_requested_disk_mb', 0.))
 
   # --- Public interface. ---
-  def create_cluster(self, cluster_name, cluster_user, num_nodes, size=None, backup_id=None):
+  def create_cluster(
+        self,
+        cluster_name,
+        cluster_user,
+        num_nodes,
+        size=None,
+        backup_id=None,
+        cluster_password=None):
     """
       :param cluster_name: Name of the cluster.
       :param cluster_user: The user account on MySQL server.
@@ -138,6 +145,10 @@
                    given 'None' then app defaults are used.
       :param backup_id: The 'backup_id' of the backup to restore from. If None then Mysos starts an
                         empty instance.
+      :param cluster_password: The password used for accessing MySQL instances in the cluster as
+                               well as deleting the cluster from Mysos. If None then Mysos generates
+                               one for the cluster. In either case the password is sent back as
+                               part of the return value.
 
       :return: a tuple of the following:
         - ZooKeeper URL for this Mysos cluster that can be used to resolve MySQL cluster info.
@@ -178,12 +189,15 @@
       self._state.clusters.add(cluster_name)
       self._state_provider.dump_scheduler_state(self._state)
 
+      if not cluster_password:
+        log.info("Generating password for cluster %s" % cluster_name)
+        cluster_password = gen_password()
+
       # Return the plaintext version to the client but store the encrypted version.
-      password = gen_password()
       cluster = MySQLCluster(
           cluster_name,
           cluster_user,
-          self._password_box.encrypt(password),
+          self._password_box.encrypt(cluster_password),
           num_nodes,
           cpus=resources['cpus'],
           mem=resources['mem'],
@@ -212,7 +226,7 @@
 
       self._cluster_count.increment()
 
-      return get_cluster_path(self._discover_zk_url, cluster_name), password
+      return get_cluster_path(self._discover_zk_url, cluster_name), cluster_password
 
   def delete_cluster(self, cluster_name, password):
     """
@@ -238,6 +252,11 @@
       self._total_requested_disk_mb.write(
           self._total_requested_disk_mb.read() - cluster_info.total_disk_mb)
 
+      if launcher.terminated:
+        log.info("Deleting the launcher for cluster %s directly because the cluster has already "
+                 "terminated" % launcher.cluster_name)
+        self._delete_launcher(launcher)
+
       return get_cluster_path(self._discover_zk_url, cluster_name)
 
   @property
@@ -382,10 +401,12 @@
           log.info("Declining offer %s for %s because '%s'" % (
               offer.id.value, INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION, e))
         else:
-          log.info("Declining offer %s because no launcher can use this offer" % offer.id.value)
+          log.info("Declining offer %s because no launcher accepted this offer" % offer.id.value)
           # Mesos scheduler Python binding doesn't deal with filters='None' properly.
           # See https://issues.apache.org/jira/browse/MESOS-2567.
           filters = mesos_pb2.Filters()
+
+        log.debug(offer)
         self._driver.declineOffer(offer.id, filters)
 
   @logged
@@ -407,9 +428,13 @@
       if launcher.terminated:
         log.info("Deleting the launcher for cluster %s because the cluster has terminated" %
                  launcher.cluster_name)
-        self._state.clusters.discard(launcher.cluster_name)
-        self._state_provider.dump_scheduler_state(self._state)
-        del self._launchers[launcher.cluster_name]
+        self._delete_launcher(launcher)
+
+  def _delete_launcher(self, launcher):
+    assert launcher.terminated
+    self._state.clusters.discard(launcher.cluster_name)
+    self._state_provider.dump_scheduler_state(self._state)
+    del self._launchers[launcher.cluster_name]
 
   @logged
   def frameworkMessage(self, driver, executorId, slaveId, message):
diff --git a/mysos/testing/mysos_test_client.py b/mysos/testing/mysos_test_client.py
index b80a381..9788c75 100755
--- a/mysos/testing/mysos_test_client.py
+++ b/mysos/testing/mysos_test_client.py
@@ -21,33 +21,29 @@
 LogOptions.set_stderr_log_level('google:INFO')
 
 
-app.add_option(
-    '--api_host',
-    dest='api_host',
-    help='Host for the HTTP API server')
-
-
-app.add_option(
-    '--api_port',
-    dest='api_port',
-    type='int',
-    help='Port for the HTTP API server')
-
-
-app.add_option(
-    '--cluster',
-    dest='cluster_name',
-    help='Name of the MySQL cluster to create')
-
-
-app.add_option(
-    '--password_file',
-    dest='password_file',
-    default=os.path.join(tempfile.gettempdir(), 'mysos', 'mysos_test_client', 'password_file'),
-    help="Path to the file for persisting the cluster password for testing purposes")
-
-
 def proxy_main():
+  app.add_option(
+      '--api_host',
+      dest='api_host',
+      help='Host for the HTTP API server')
+
+  app.add_option(
+      '--api_port',
+      dest='api_port',
+      type='int',
+      help='Port for the HTTP API server')
+
+  app.add_option(
+      '--cluster',
+      dest='cluster_name',
+      help='Name of the MySQL cluster to create')
+
+  app.add_option(
+      '--password_file',
+      dest='password_file',
+      default=os.path.join(tempfile.gettempdir(), 'mysos', 'mysos_test_client', 'password_file'),
+      help="Path to the file for persisting the cluster password for testing purposes")
+
   @app.command
   @app.command_option(
       '--num_nodes',
@@ -69,6 +65,11 @@
       help="The size of instances in the cluster as a JSON dictionary of 'cpus', 'mem', 'disk'. "
            "'mem' and 'disk' are specified with data size units: kb, mb, gb, etc. If given 'None'"
            "then app defaults are used.")
+  @app.command_option(
+      '--cluster_password',
+      dest='cluster_password',
+      help="The password used for accessing MySQL instances in the cluster as well as deleting "
+           "the cluster from Mysos.")
   def create(args, options):
     validate_common_options(options)
 
@@ -82,8 +83,9 @@
     values = dict(
         num_nodes=int(options.num_nodes),
         cluster_user=options.cluster_user,
-        size=options.size if options.size else '',
-        backup_id=options.backup_id if options.backup_id else '')
+        size=options.size if options.size else '',  # 'urlencode()' doesn't accept None.
+        backup_id=options.backup_id if options.backup_id else '',
+        cluster_password=options.cluster_password if options.cluster_password else '')
 
     req = urllib2.Request(url, urllib.urlencode(values))
     try:
diff --git a/tests/scheduler/test_http.py b/tests/scheduler/test_http.py
index 90cb678..ef99bad 100644
--- a/tests/scheduler/test_http.py
+++ b/tests/scheduler/test_http.py
@@ -27,7 +27,14 @@
   def set_response(self, response):
     self._response = response
 
-  def create_cluster(self, cluster_name, cluster_user, num_nodes, size, backup_id=None):
+  def create_cluster(
+      self,
+      cluster_name,
+      cluster_user,
+      num_nodes,
+      size,
+      backup_id=None,
+      cluster_password=None):
     if self._exception:
       raise self._exception
     return self._response
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 58ea8a3..061f915 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -209,21 +209,22 @@
     scheduler_key = gen_encryption_key()
 
     scheduler = MysosScheduler(
-      self._state,
-      self._state_provider,
-      self._framework_user,
-      "./executor.pex",
-      "cmd.sh",
-      self._zk_client,
-      self._zk_url,
-      Amount(5, Time.SECONDS),
-      "/etc/mysos/admin_keyfile.yml",
-      scheduler_key)
+        self._state,
+        self._state_provider,
+        self._framework_user,
+        "./executor.pex",
+        "cmd.sh",
+        self._zk_client,
+        self._zk_url,
+        Amount(5, Time.SECONDS),
+        "/etc/mysos/admin_keyfile.yml",
+        scheduler_key)
 
     RootMetrics().register_observable('scheduler', scheduler)
 
     scheduler.registered(self._driver, self._framework_id, object())
-    _, password = scheduler.create_cluster("cluster1", "mysql_user", 3)
+    scheduler.create_cluster(
+        "cluster1", "mysql_user", 3, cluster_password='test_password')
 
     sample = RootMetrics().sample()
     assert sample['scheduler.cluster_count'] == 1
@@ -231,10 +232,35 @@
     assert sample['scheduler.total_requested_disk_mb'] == DEFAULT_TASK_DISK.as_(Data.MB) * 3
     assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
 
-    scheduler.delete_cluster("cluster1", password)
+    scheduler.delete_cluster("cluster1", 'test_password')
 
     sample = RootMetrics().sample()
     assert sample['scheduler.cluster_count'] == 0
     assert sample['scheduler.total_requested_mem_mb'] == 0
     assert sample['scheduler.total_requested_disk_mb'] == 0
     assert sample['scheduler.total_requested_cpus'] == 0
+
+  def test_scheduler_delete_empty_cluster(self):
+    scheduler_key = gen_encryption_key()
+
+    scheduler = MysosScheduler(
+        self._state,
+        self._state_provider,
+        self._framework_user,
+        "./executor.pex",
+        "cmd.sh",
+        self._zk_client,
+        self._zk_url,
+        Amount(5, Time.SECONDS),
+        "/etc/mysos/admin_keyfile.yml",
+        scheduler_key)
+
+    scheduler.registered(self._driver, self._framework_id, object())
+    _, password = scheduler.create_cluster("cluster1", "mysql_user", 3)
+
+    assert len(scheduler._launchers) == 1
+
+    # Deleting the cluster before any offer comes in for launching any task.
+    scheduler.delete_cluster("cluster1", password)
+
+    assert len(scheduler._launchers) == 0
diff --git a/vagrant/test.sh b/vagrant/test.sh
index dd2597c..ae19207 100755
--- a/vagrant/test.sh
+++ b/vagrant/test.sh
@@ -24,7 +24,8 @@
   --cluster_user=${cluster_user} \
   --cluster=${cluster_name} \
   --num_nodes=${num_nodes} \
-  --size='{"mem": "512mb", "disk": "3gb", "cpus": 1.0}'
+  --size='{"mem": "700mb", "disk": "3gb", "cpus": 1.0}' \
+  --cluster_password='testpasswd'
 
 echo "Finished creating the cluster, now deleting it"