Allow users to specify the 'size' of MySQL instances.

- Use 'disk' resources to run MySQL tasks.
diff --git a/docs/user-guide.md b/docs/user-guide.md
index 682cc90..8e6f1e4 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -14,10 +14,13 @@
 
 
 ### Parameters
-- `cluster_name`: Name of the cluster.
-- `cluster_user`: The user account for all MySQL instances in the cluster which as full admin
+- `cluster_name`: Required. Name of the cluster.
+- `cluster_user`: Required. The user account for all MySQL instances in the cluster which as full admin
 privileges.
-- `num_nodes`: Number of nodes in the cluster. [default: 3]
+- `num_nodes`: Number of nodes in the cluster. [default: 1]
+- `size`: The size of instances in the cluster as a JSON dictionary of `cpus`, `mem` and `disk`.
+`mem` and `disk` are specified with standard data size units such as `mb`, `gb`, `tb`, etc. (no
+spaces, see the default for an example) [default: `{"mem": "512mb", "disk": "2gb", "cpus": 1.0}`]
 - `backup_id`: An ID for the MySQL backup to restore from when the MySQL instance starts. If not
 specified, Mysos will start an empty MySQL instance. The format and meaning of `backup_id` is
 specific to the implementation of `BackupStore` that the Mysos cluster uses.
@@ -34,12 +37,15 @@
 
 
 ### Example
-```
-# Create a cluster named 'test_cluster3' and restore from the backup 'foo/bar:201503122000'.
-curl -X POST 192.168.33.7/clusters/test_cluster3 --form "cluster_user=mysos" --form "num_nodes=2" --form "num_nodes=2" --form "backup_id=foo/bar:201503122000"
-# Response
-{"cluster_password": "w9gMCkecsMh6sWsRdxNTa", "cluster_url": "zk://mysos:mysos@192.168.33.7:2181/mysos/discover/test_cluster3"}
-```
+
+    # Create a cluster named 'test_cluster3' and restore from the backup 'foo/bar:201503122000'.
+    curl -X POST 192.168.33.7/clusters/test_cluster3 --form "cluster_user=mysos" \
+      --form "num_nodes=2" --form "backup_id=foo/bar:201503122000" \
+      --form 'size={"mem": "512mb", "disk": "3gb", "cpus": 1.0}'
+      
+    # Response
+    {"cluster_password": "w9gMCkecsMh6sWsRdxNTa", "cluster_url": "zk://192.168.33.7:2181/mysos/discover/test_cluster3"}
+
 
 ### Notes
 - Cluster creation is asynchronous. The API call returns (with status 200) as soon as the Mysos
diff --git a/mysos/scheduler/http.py b/mysos/scheduler/http.py
index 1a80b94..c38ae1c 100644
--- a/mysos/scheduler/http.py
+++ b/mysos/scheduler/http.py
@@ -24,15 +24,17 @@
   def create(self, clustername):
     """Create a db cluster."""
     cluster_name = clustername  # For naming consistency.
-    num_nodes = bottle.request.forms.get('num_nodes', default=3)
+    num_nodes = bottle.request.forms.get('num_nodes', default=1)
     cluster_user = bottle.request.forms.get('cluster_user', default=None)
     backup_id = bottle.request.forms.get('backup_id', default=None)
+    size = bottle.request.forms.get('size', default=None)
 
     try:
       cluster_zk_url, cluster_password = self._scheduler.create_cluster(
           cluster_name,
           cluster_user,
           num_nodes,
+          size,
           backup_id=backup_id)
       return json.dumps(dict(cluster_url=cluster_zk_url, cluster_password=cluster_password))
     except MysosScheduler.ClusterExists as e:
diff --git a/mysos/scheduler/launcher.py b/mysos/scheduler/launcher.py
index beb99ca..6167546 100644
--- a/mysos/scheduler/launcher.py
+++ b/mysos/scheduler/launcher.py
@@ -12,15 +12,10 @@
 
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
-from twitter.common.quantity import Amount, Time
+from twitter.common.quantity import Amount, Data, Time
 from twitter.common.zookeeper.serverset.endpoint import Endpoint, ServiceInstance
 
 
-# TODO(jyx): Replace this when we start taking tasks from an HTTP API.
-TASK_CPUS = 1
-TASK_MEM = 512
-
-
 class MySQLClusterLauncher(object):
   """
     Responsible for launching and maintaining a MySQL cluster.
@@ -160,13 +155,13 @@
       if self._terminating:
         return None, offer.resources
 
-      cpus, mem, ports = self._get_resources(offer.resources)
+      cpus, mem, disk, ports = self._get_resources(offer.resources)
 
-      # TODO(jyx): Replace the static resource requirements with what the user requests.
-      task_cpus = TASK_CPUS
-      task_mem = TASK_MEM
+      task_cpus = self._cluster.cpus
+      task_mem = self._cluster.mem
+      task_disk = self._cluster.disk
 
-      if cpus < task_cpus or mem < task_mem or len(ports) == 0:
+      if cpus < task_cpus or mem < task_mem or disk < task_disk or len(ports) == 0:
         # Offer doesn't fit.
         return None, offer.resources
 
@@ -175,7 +170,7 @@
 
       task_port = random.choice(list(ports))  # Randomly pick a port in the offer.
 
-      task_info = self._new_task(offer, task_cpus, task_mem, task_port)
+      task_info = self._new_task(offer, task_cpus, task_mem, task_disk, task_port)
       self._cluster.tasks[task_info.task_id.value] = MySQLTask(
           self._cluster.name,
           task_info.task_id.value,
@@ -198,7 +193,11 @@
 
       # Update the offer's resources and return them for other clusters to use.
       remaining = create_resources(
-          cpus - task_cpus, mem - task_mem, ports - set([task_port]), role=self._framework_role)
+          cpus - task_cpus,
+          mem - task_mem,
+          disk - task_disk,
+          ports - set([task_port]),
+          role=self._framework_role)
       return task_info.task_id.value, remaining
 
   def kill(self, password):
@@ -225,8 +224,8 @@
     return self._terminating and len(self._cluster.active_tasks) == 0
 
   def _get_resources(self, resources):
-    """Return a tuple of the resources: cpus, mem, set of ports."""
-    cpus, mem, ports = 0.0, 0, set()
+    """Return a tuple of the resources: cpus, mem, disk, set of ports."""
+    cpus, mem, disk, ports = 0.0, Amount(0, Data.MB), Amount(0, Data.MB), set()
     for resource in resources:
       # We do the following check:
       # 1. We only care about the role of the resources we are going to use.
@@ -237,21 +236,26 @@
       #    cases where Mysos tasks run side-by-side with tasks from other frameworks. This also
       #    simplifies the launcher's role filtering logic.
       #    TODO(jyx): Revisit this when the above assumption changes.
-      if resource.name in ('cpus', 'mem', 'ports') and resource.role != self._framework_role:
+      if (resource.name in ('cpus', 'mem', 'disk', 'ports') and
+          resource.role != self._framework_role):
         raise self.IncompatibleRoleError("Offered resource %s has role %s, expecting %s" % (
             resource.name, resource.role, self._framework_role))
 
       if resource.name == 'cpus':
         cpus = resource.scalar.value
       elif resource.name == 'mem':
-        mem = resource.scalar.value
+        # 'Amount' requires an integer while 'value' is double. We convert it bytes to minimize
+        # precision loss.
+        mem = Amount(int(resource.scalar.value * 1024 * 1024), Data.BYTES)
+      elif resource.name == 'disk':
+        disk = Amount(int(resource.scalar.value * 1024 * 1024), Data.BYTES)
       elif resource.name == 'ports' and resource.ranges.range:
         for r in resource.ranges.range:
           ports |= set(range(r.begin, r.end + 1))
 
-    return cpus, mem, ports
+    return cpus, mem, disk, ports
 
-  def _new_task(self, offer, task_cpus, task_mem, task_port):
+  def _new_task(self, offer, task_cpus, task_mem, task_disk, task_port):
     """Return a new task with the requested resources."""
     server_id = self._cluster.next_id
     task_id = "mysos-" + self.cluster_name + "-" + str(server_id)
@@ -294,7 +298,7 @@
     })
 
     resources = create_resources(
-        task_cpus, task_mem, set([task_port]), role=self._framework_role)
+        task_cpus, task_mem, task_disk, set([task_port]), role=self._framework_role)
     task.resources.extend(resources)
 
     return task
@@ -510,7 +514,7 @@
 
 
 # --- Utility methods. ---
-def create_resources(cpus, mem, ports, role='*'):
+def create_resources(cpus, mem, disk, ports, role='*'):
   """Return a list of 'Resource' protobuf for the provided resources."""
   cpus_resources = mesos_pb2.Resource()
   cpus_resources.name = 'cpus'
@@ -522,7 +526,13 @@
   mem_resources.name = 'mem'
   mem_resources.type = mesos_pb2.Value.SCALAR
   mem_resources.role = role
-  mem_resources.scalar.value = mem
+  mem_resources.scalar.value = mem.as_(Data.MB)
+
+  disk_resources = mesos_pb2.Resource()
+  disk_resources.name = 'disk'
+  disk_resources.type = mesos_pb2.Value.SCALAR
+  disk_resources.role = role
+  disk_resources.scalar.value = disk.as_(Data.MB)
 
   ports_resources = mesos_pb2.Resource()
   ports_resources.name = 'ports'
@@ -533,7 +543,7 @@
     port_range.begin = port
     port_range.end = port
 
-  return [cpus_resources, mem_resources, ports_resources]
+  return [cpus_resources, mem_resources, disk_resources, ports_resources]
 
 
 def is_terminal(state):
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 9e5d17f..b3f0139 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -1,4 +1,5 @@
 from collections import OrderedDict
+import json
 import posixpath
 import random
 import threading
@@ -15,9 +16,14 @@
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
 from twitter.common.collections.orderedset import OrderedSet
-from twitter.common.quantity import Amount, Time
+from twitter.common.quantity import Amount, Data, Time
+from twitter.common.quantity.parse_simple import InvalidData, parse_data
 
 
+DEFAULT_TASK_CPUS = 1.0
+DEFAULT_TASK_DISK = Amount(2, Data.GB)
+DEFAULT_TASK_MEM = Amount(512, Data.MB)
+
 # Refuse the offer for "eternity".
 # NOTE: Using sys.maxint / 2 because sys.maxint causes rounding and precision loss when converted to
 # double 'refuse_seconds' in the ProtoBuf and results in a negative duration on Mesos Master.
@@ -106,11 +112,14 @@
                                         # Mesos. The scheduler tolerates later disconnections.
 
   # --- Public interface. ---
-  def create_cluster(self, cluster_name, cluster_user, num_nodes, backup_id=None):
+  def create_cluster(self, cluster_name, cluster_user, num_nodes, size=None, backup_id=None):
     """
       :param cluster_name: Name of the cluster.
       :param cluster_user: The user account on MySQL server.
       :param num_nodes: Number of nodes in the cluster.
+      :param size: The size of instances in the cluster as a JSON dictionary of 'cpus', 'mem',
+                   'disk'. 'mem' and 'disk' are specified with data size units: kb, mb, gb, etc. If
+                   given 'None' then app defaults are used.
       :param backup_id: The 'backup_id' of the backup to restore from. If None then Mysos starts an
                         empty instance.
 
@@ -139,6 +148,9 @@
       if int(num_nodes) <= 0:
         raise ValueError("Invalid number of cluster nodes: %s" % num_nodes)
 
+      resources = parse_size(size)
+      log.info("Requested resources per instance for cluster %s: %s" % (resources, cluster_name))
+
       self._state.clusters.add(cluster_name)
       self._state_provider.dump_scheduler_state(self._state)
 
@@ -149,6 +161,9 @@
           cluster_user,
           self._password_box.encrypt(password),
           int(num_nodes),
+          cpus=resources['cpus'],
+          mem=resources['mem'],
+          disk=resources['disk'],
           backup_id=backup_id)
       self._state_provider.dump_cluster_state(cluster)
 
@@ -380,3 +395,22 @@
   copy = li[:]
   random.shuffle(copy)
   return copy
+
+
+def parse_size(size):
+  """Return the resources specified in 'size' as a dictionary."""
+  if not size:
+    resources = dict(cpus=DEFAULT_TASK_CPUS, mem=DEFAULT_TASK_MEM, disk=DEFAULT_TASK_DISK)
+  else:
+    # TODO(jyx): Simplify this using T-shirt sizing
+    # (https://github.com/twitter/mysos/issues/14).
+    try:
+      resources_ = json.loads(size)
+      resources = dict(
+        cpus=resources_['cpus'],
+        mem=parse_data(resources_['mem']),
+        disk=parse_data(resources_['disk']))
+    except (TypeError, KeyError, ValueError, InvalidData):
+      raise ValueError("'size' should be a JSON dictionary with keys 'cpus', 'mem' and 'disk'")
+
+  return resources
diff --git a/mysos/scheduler/state.py b/mysos/scheduler/state.py
index 62763fc..4bf888f 100644
--- a/mysos/scheduler/state.py
+++ b/mysos/scheduler/state.py
@@ -85,7 +85,7 @@
     It includes tasks (MySQLTask) for members of the cluster.
   """
 
-  def __init__(self, name, user, encrypted_password, num_nodes, backup_id=None):
+  def __init__(self, name, user, encrypted_password, num_nodes, cpus, mem, disk, backup_id=None):
     if not isinstance(num_nodes, int):
       raise TypeError("'num_nodes' should be an int")
 
@@ -93,6 +93,9 @@
     self.user = user
     self.encrypted_password = encrypted_password
     self.num_nodes = num_nodes
+    self.cpus = cpus
+    self.mem = mem
+    self.disk = disk
     self.backup_id = backup_id
 
     self.members = {}  # {TaskID : MemberID} mappings. MemberIDs are assigned by ZooKeeper. A task
diff --git a/mysos/testing/mysos_test_client.py b/mysos/testing/mysos_test_client.py
index 42f3bc7..b80a381 100755
--- a/mysos/testing/mysos_test_client.py
+++ b/mysos/testing/mysos_test_client.py
@@ -63,6 +63,12 @@
       '--cluster_user',
       dest='cluster_user',
       help='MySQL user name the of cluster')
+  @app.command_option(
+      '--size',
+      dest='size',
+      help="The size of instances in the cluster as a JSON dictionary of 'cpus', 'mem', 'disk'. "
+           "'mem' and 'disk' are specified with data size units: kb, mb, gb, etc. If given 'None'"
+           "then app defaults are used.")
   def create(args, options):
     validate_common_options(options)
 
@@ -76,6 +82,7 @@
     values = dict(
         num_nodes=int(options.num_nodes),
         cluster_user=options.cluster_user,
+        size=options.size if options.size else '',
         backup_id=options.backup_id if options.backup_id else '')
 
     req = urllib2.Request(url, urllib.urlencode(values))
diff --git a/tests/scheduler/test_http.py b/tests/scheduler/test_http.py
index 75347d8..9c814a8 100644
--- a/tests/scheduler/test_http.py
+++ b/tests/scheduler/test_http.py
@@ -26,7 +26,7 @@
   def set_response(self, response):
     self._response = response
 
-  def create_cluster(self, cluster_name, cluster_user, num_nodes, backup_id=None):
+  def create_cluster(self, cluster_name, cluster_user, num_nodes, size, backup_id=None):
     if self._exception:
       raise self._exception
     return self._response
diff --git a/tests/scheduler/test_launcher.py b/tests/scheduler/test_launcher.py
index e6ea5d4..b40b23d 100644
--- a/tests/scheduler/test_launcher.py
+++ b/tests/scheduler/test_launcher.py
@@ -8,6 +8,7 @@
 from mysos.common.testing import Fake
 from mysos.scheduler.launcher import create_resources, MySQLClusterLauncher
 from mysos.scheduler.password import gen_encryption_key, PasswordBox
+from mysos.scheduler.scheduler import DEFAULT_TASK_CPUS, DEFAULT_TASK_MEM, DEFAULT_TASK_DISK
 from mysos.scheduler.state import LocalStateProvider, MySQLCluster
 from mysos.scheduler.zk_state import ZooKeeperStateProvider
 
@@ -15,7 +16,7 @@
 import mesos.interface.mesos_pb2 as mesos_pb2
 from twitter.common import log
 from twitter.common.concurrent import deadline
-from twitter.common.quantity import Amount, Time
+from twitter.common.quantity import Amount, Data, Time
 from zake.fake_client import FakeClient
 from zake.fake_storage import FakeStorage
 
@@ -46,8 +47,12 @@
     self._offer.slave_id.value = "slave_id_0"
     self._offer.hostname = "localhost"
 
-    # Enough memory and ports to fit three tasks.
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000, 10001, 10002]))
+    # Enough resources to fit three tasks.
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS * 3,
+        mem=DEFAULT_TASK_MEM * 3,
+        disk=DEFAULT_TASK_DISK * 3,
+        ports=set([10000, 10001, 10002]))
     self._offer.resources.extend(resources)
 
     self._framework_user = "framework_user"
@@ -58,7 +63,14 @@
     self._scheduler_key = gen_encryption_key()
     self._password_box = PasswordBox(self._scheduler_key)
 
-    self._cluster = MySQLCluster("cluster0", "user", self._password_box.encrypt("pass"), 3)
+    self._cluster = MySQLCluster(
+        "cluster0",
+        "user",
+        self._password_box.encrypt("pass"),
+        3,
+        DEFAULT_TASK_CPUS,
+        DEFAULT_TASK_MEM,
+        DEFAULT_TASK_DISK)
 
     # Construct the state provider based on the test parameter.
     if request.param == LocalStateProvider:
@@ -130,7 +142,11 @@
   def test_launch_cluster_insufficient_resources(self):
     """All but one slave in the slave are launched successfully."""
     del self._offer.resources[:]
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000, 10001]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS * 3,
+        mem=DEFAULT_TASK_MEM * 3,
+        disk=DEFAULT_TASK_DISK * 3 - Amount(1, Data.MB),  # 1mb less than required disk space.
+        ports=set([10000, 10001, 10002]))
     self._offer.resources.extend(resources)
 
     # There is one fewer port than required to launch the entire cluster.
@@ -172,7 +188,14 @@
     launchers = [
       MySQLClusterLauncher(
           self._driver,
-          MySQLCluster("cluster0", "user0", self._password_box.encrypt("pass0"), 1),
+          MySQLCluster(
+              "cluster0",
+              "user0",
+              self._password_box.encrypt("pass0"),
+              1,
+              DEFAULT_TASK_CPUS,
+              DEFAULT_TASK_MEM,
+              DEFAULT_TASK_DISK),
           self._state_provider,
           self._zk_url,
           self._zk_client,
@@ -184,7 +207,14 @@
           self._scheduler_key),
       MySQLClusterLauncher(
           self._driver,
-          MySQLCluster("cluster1", "user1", self._password_box.encrypt("pass1"), 2),
+          MySQLCluster(
+              "cluster1",
+              "user1",
+              self._password_box.encrypt("pass1"),
+              2,
+              DEFAULT_TASK_CPUS,
+              DEFAULT_TASK_MEM,
+              DEFAULT_TASK_DISK),
           self._state_provider,
           self._zk_url,
           self._zk_client,
@@ -196,7 +226,11 @@
           self._scheduler_key)]
     self._launchers.extend(launchers)
 
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000, 10001, 10002]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS * 3,
+        mem=DEFAULT_TASK_MEM * 3,
+        disk=DEFAULT_TASK_DISK * 3,
+        ports=set([10000, 10001, 10002]))
     self._offer.resources.extend(resources)
 
     # Three nodes in total across two clusters.
@@ -230,7 +264,11 @@
         self._scheduler_key)
     self._launchers.append(launcher)
 
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS,
+        mem=DEFAULT_TASK_MEM,
+        disk=DEFAULT_TASK_DISK,
+        ports=set([10000]))
     self._offer.resources.extend(resources)
 
     task_id, _ = launcher.launch(self._offer)
@@ -266,7 +304,11 @@
         self._scheduler_key)
     self._launchers.append(launcher)
 
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS,
+        mem=DEFAULT_TASK_MEM,
+        disk=DEFAULT_TASK_DISK,
+        ports=set([10000]))
     self._offer.resources.extend(resources)
 
     task_id, _ = launcher.launch(self._offer)
@@ -358,7 +400,11 @@
 
     # When a new offer comes in, a new task is launched.
     del self._offer.resources[:]
-    resources = create_resources(cpus=1, mem=512, ports=set([10000]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS,
+        mem=DEFAULT_TASK_MEM,
+        disk=DEFAULT_TASK_DISK,
+        ports=set([10000]))
     self._offer.resources.extend(resources)
     task_id, _ = self._launcher.launch(self._offer)
     assert task_id == "mysos-cluster0-3"
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 7ef9910..0beb182 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -6,6 +6,9 @@
 
 from mysos.common.testing import Fake
 from mysos.scheduler.scheduler import (
+    DEFAULT_TASK_CPUS,
+    DEFAULT_TASK_DISK,
+    DEFAULT_TASK_MEM,
     INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION,
     MysosScheduler)
 from mysos.scheduler.launcher import create_resources
@@ -48,13 +51,18 @@
     self._offer.slave_id.value = "slave_id_0"
     self._offer.hostname = "localhost"
 
-    resources = create_resources(cpus=4, mem=512 * 3, ports=set([10000, 10001, 10002]))
+    resources = create_resources(
+        cpus=DEFAULT_TASK_CPUS * 3,
+        mem=DEFAULT_TASK_MEM * 3,
+        disk=DEFAULT_TASK_DISK * 3,
+        ports=set([10000, 10001, 10002]))
     self._offer.resources.extend(resources)
 
     self._framework_user = "framework_user"
 
     self._zk_url = "zk://host/mysos/test"
-    self._cluster = MySQLCluster("cluster0", "user", "pass", 3)
+    self._cluster = MySQLCluster(
+        "cluster0", "user", "pass", 3, DEFAULT_TASK_CPUS, DEFAULT_TASK_MEM, DEFAULT_TASK_DISK)
 
     self._tmpdir = tempfile.mkdtemp()
     self._state_provider = LocalStateProvider(self._tmpdir)
diff --git a/tests/scheduler/test_state.py b/tests/scheduler/test_state.py
index 43bed59..d244985 100644
--- a/tests/scheduler/test_state.py
+++ b/tests/scheduler/test_state.py
@@ -3,6 +3,7 @@
 import tempfile
 import unittest
 
+from mysos.scheduler.scheduler import DEFAULT_TASK_CPUS, DEFAULT_TASK_MEM, DEFAULT_TASK_DISK
 from mysos.scheduler.state import (
     LocalStateProvider,
     MySQLCluster,
@@ -47,7 +48,14 @@
   def test_cluster_state(self):
     password_box = PasswordBox(gen_encryption_key())
 
-    expected = MySQLCluster('cluster1', 'cluster_user', password_box.encrypt('cluster_password'), 3)
+    expected = MySQLCluster(
+        'cluster1',
+        'cluster_user',
+        password_box.encrypt('cluster_password'),
+        3,
+        DEFAULT_TASK_CPUS,
+        DEFAULT_TASK_MEM,
+        DEFAULT_TASK_DISK)
 
     expected.tasks['task1'] = MySQLTask(
         'cluster1', 'task1', 'slave1', 'host1', 10000)
diff --git a/tests/scheduler/test_zk_state.py b/tests/scheduler/test_zk_state.py
index 69d899b..9584727 100644
--- a/tests/scheduler/test_zk_state.py
+++ b/tests/scheduler/test_zk_state.py
@@ -2,6 +2,7 @@
 import os
 import unittest
 
+from mysos.scheduler.scheduler import DEFAULT_TASK_CPUS, DEFAULT_TASK_MEM, DEFAULT_TASK_DISK
 from mysos.scheduler.state import (
     MySQLCluster,
     MySQLTask,
@@ -58,7 +59,14 @@
       self._state_provider.load_scheduler_state()
 
   def test_cluster_state(self):
-    expected = MySQLCluster('cluster1', 'cluster_user', 'cluster_password', 3)
+    expected = MySQLCluster(
+        'cluster1',
+        'cluster_user',
+        'cluster_password',
+        3,
+        DEFAULT_TASK_CPUS,
+        DEFAULT_TASK_MEM,
+        DEFAULT_TASK_DISK)
 
     expected.tasks['task1'] = MySQLTask(
         'cluster1', 'task1', 'slave1', 'host1', 10000)
diff --git a/vagrant/test.sh b/vagrant/test.sh
index bbfe21f..dd2597c 100755
--- a/vagrant/test.sh
+++ b/vagrant/test.sh
@@ -23,7 +23,8 @@
   --api_port=${port} \
   --cluster_user=${cluster_user} \
   --cluster=${cluster_name} \
-  --num_nodes=${num_nodes}
+  --num_nodes=${num_nodes} \
+  --size='{"mem": "512mb", "disk": "3gb", "cpus": 1.0}'
 
 echo "Finished creating the cluster, now deleting it"