This closes #1151
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
index 68c9398..36e6ba8 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
@@ -31,6 +31,21 @@
  */
 public interface Resizable {
 
+    /**
+     * Indicates that resizing up to the desired size is not possible - only resized to the 
+     * {@link Resizable#getCurrentSize()}, because there is insufficient capacity.
+     */
+    public static class InsufficientCapacityException extends RuntimeException {
+        private static final long serialVersionUID = 953230498564942446L;
+        
+        public InsufficientCapacityException(String msg) {
+            super(msg);
+        }
+        public InsufficientCapacityException(String msg, Throwable cause) {
+            super(msg, cause);
+        }
+    }
+    
     MethodEffector<Integer> RESIZE = new MethodEffector<Integer>(Resizable.class, "resize");
 
     /**
@@ -38,6 +53,9 @@
      *
      * @param desiredSize the new size of the entity group.
      * @return the new size of the group.
+     * 
+     * @throws InsufficientCapacityException If the request was to grow, but there is no capacity to grow to
+     *         the desired size.
      */
     @Effector(description="Changes the size of the entity (e.g. the number of nodes in a cluster)")
     Integer resize(@EffectorParam(name="desiredSize", description="The new size of the cluster") Integer desiredSize);
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f528db7..781cb0c 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -48,6 +48,7 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.reflect.TypeToken;
@@ -101,6 +102,15 @@
     ConfigKey<Boolean> QUARANTINE_FAILED_ENTITIES = ConfigKeys.newBooleanConfigKey(
             "dynamiccluster.quarantineFailedEntities", "If true, will quarantine entities that fail to start; if false, will get rid of them (i.e. delete them)", true);
 
+    @SetFromFlag("quarantineFilter")
+    ConfigKey<Predicate<? super Throwable>> QUARANTINE_FILTER = ConfigKeys.newConfigKey(
+            new TypeToken<Predicate<? super Throwable>>() {},
+            "dynamiccluster.quarantineFilter", 
+            "Quarantine the failed nodes that pass this filter (given the exception thrown by the node). "
+                    + "Default is those that did not fail with NoMachinesAvailableException "
+                    + "(Config ignored if quarantineFailedEntities is false)", 
+            null);
+
     AttributeSensor<Lifecycle> SERVICE_STATE_ACTUAL = Attributes.SERVICE_STATE_ACTUAL;
 
     BasicNotificationSensor<Entity> ENTITY_QUARANTINED = new BasicNotificationSensor<Entity>(Entity.class, "dynamiccluster.entityQuarantined", "Entity failed to start, and has been quarantined");
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index b8e5c63..16a82d4 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -38,6 +38,7 @@
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.location.Location;
 import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -50,6 +51,7 @@
 import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.entity.trait.StartableMethods;
 import org.apache.brooklyn.core.location.Locations;
@@ -80,6 +82,7 @@
 import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
@@ -331,6 +334,19 @@
         return getAttribute(QUARANTINE_GROUP);
     }
 
+    protected Predicate<? super Throwable> getQuarantineFilter() {
+        Predicate<? super Throwable> result = getConfig(QUARANTINE_FILTER);
+        if (result != null) {
+            return result;
+        } else {
+            return new Predicate<Throwable>() {
+                @Override public boolean apply(Throwable input) {
+                    return Exceptions.getFirstThrowableOfType(input, NoMachinesAvailableException.class) == null;
+                }
+            };
+        }
+    }
+
     protected int getInitialQuorumSize() {
         int initialSize = getConfig(INITIAL_SIZE).intValue();
         int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue();
@@ -518,7 +534,20 @@
             } else {
                 if (LOG.isDebugEnabled()) LOG.debug("Resize no-op {} from {} to {}", new Object[] {this, originalSize, desiredSize});
             }
-            resizeByDelta(delta);
+            // If we managed to grow at all, then expect no exception.
+            // Otherwise, if failed because NoMachinesAvailable, then propagate as InsufficientCapacityException.
+            // This tells things like the AutoScalerPolicy to not keep retrying.
+            try {
+                resizeByDelta(delta);
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                NoMachinesAvailableException nmae = Exceptions.getFirstThrowableOfType(e, NoMachinesAvailableException.class);
+                if (nmae != null) {
+                    throw new Resizable.InsufficientCapacityException("Failed to resize", e);
+                } else {
+                    throw Exceptions.propagate(e);
+                }
+            }
         }
         return getCurrentSize();
     }
@@ -669,7 +698,7 @@
         }
     }
 
-    /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
+    /** <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}. */
     protected Collection<Entity> grow(int delta) {
         Preconditions.checkArgument(delta > 0, "Must call grow with positive delta.");
 
@@ -696,8 +725,10 @@
             chosenLocations = Collections.nCopies(delta, getLocation());
         }
 
-        // create and start the entities
-        return addInEachLocation(chosenLocations, ImmutableMap.of()).getWithError();
+        // create and start the entities.
+        // if any fail, then propagate the error.
+        ReferenceWithError<Collection<Entity>> result = addInEachLocation(chosenLocations, ImmutableMap.of());
+        return result.getWithError();
     }
 
     /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
@@ -786,7 +817,7 @@
         // quarantine/cleanup as necessary
         if (!errors.isEmpty()) {
             if (isQuarantineEnabled()) {
-                quarantineFailedNodes(errors.keySet());
+                quarantineFailedNodes(errors);
             } else {
                 cleanupFailedNodes(errors.keySet());
             }
@@ -796,11 +827,18 @@
         return ReferenceWithError.newInstanceWithoutError(result);
     }
 
-    protected void quarantineFailedNodes(Collection<Entity> failedEntities) {
-        for (Entity entity : failedEntities) {
-            sensors().emit(ENTITY_QUARANTINED, entity);
-            getQuarantineGroup().addMember(entity);
-            removeMember(entity);
+    protected void quarantineFailedNodes(Map<Entity, Throwable> failedEntities) {
+        for (Map.Entry<Entity, Throwable> entry : failedEntities.entrySet()) {
+            Entity entity = entry.getKey();
+            Throwable cause = entry.getValue();
+            if (cause == null || getQuarantineFilter().apply(cause)) {
+                sensors().emit(ENTITY_QUARANTINED, entity);
+                getQuarantineGroup().addMember(entity);
+                removeMember(entity);
+            } else {
+                LOG.info("Cluster {} discarding failed node {}, rather than quarantining", this, entity);
+                discardNode(entity);
+            }
         }
     }
 
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
index 5e74e7c..b059858 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
@@ -89,6 +89,7 @@
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.guava.KeyTransformingLoadingCache.KeyTransformingSameTypeLoadingCache;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.pool.BasicPool;
 import org.apache.brooklyn.util.pool.Pool;
 import org.apache.brooklyn.util.ssh.BashCommands;
@@ -389,7 +390,7 @@
     private BasicPool<SshTool> buildPool(final Map<String, ?> properties) {
         return BasicPool.<SshTool>builder()
                 .name(getDisplayName()+"@"+address+":"+getPort()+
-                        (config().getRaw(SSH_HOST).isPresent() ? "("+getConfig(SSH_HOST)+":"+getConfig(SSH_PORT)+")" : "")+
+                        (config().getRaw(SSH_HOST).isPresent() ? "("+getConfig(SSH_HOST)+":"+getPort()+")" : "")+
                         ":hash"+System.identityHashCode(this))
                 .supplier(new Supplier<SshTool>() {
                         @Override public SshTool get() {
@@ -550,7 +551,21 @@
 
     /** port for SSHing */
     public int getPort() {
-        return getConfig(SshTool.PROP_PORT);
+        // Prefer PROP_PORT (i.e. "port"). However if that is explicitly null or is not set, then see if
+        // SSH_PORT (i.e. "brooklyn.ssh.config.port") has been set and use that.
+        // If neither is set (or is explicitly set to null), then use the default PROP_PORT value.
+        //
+        // Note we don't just rely on config().get(PROP_PORT) returning the default, because we hit a rebind
+        // error where the location's port configuration had been explicitly set to null.
+        // See https://issues.apache.org/jira/browse/BROOKLYN-215
+        
+        Maybe<Object> raw = config().getRaw(SshTool.PROP_PORT);
+        if (raw.orNull() == null && config().getRaw(SSH_PORT).orNull() != null) {
+            return config().get(SSH_PORT);
+        } else {
+            Integer result = config().get(SshTool.PROP_PORT);
+            return (result != null) ? result : SshTool.PROP_PORT.getDefaultValue();
+        }
     }
 
     protected <T> T execSsh(final Map<String, ?> props, final Function<ShellTool, T> task) {
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
index 532eb4d..7845326 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
@@ -60,7 +60,7 @@
     ConfigKey<Predicate<? super FailingEntity>> FAIL_ON_RESTART_CONDITION = (ConfigKey) ConfigKeys.newConfigKey(Predicate.class, "failOnRestartCondition", "Whether to throw exception on call to restart", null);
     
     @SetFromFlag("exceptionClazz")
-    ConfigKey<Class<? extends RuntimeException>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);
+    ConfigKey<Class<? extends Exception>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);
     
     @SetFromFlag("execOnFailure")
     ConfigKey<Function<? super FailingEntity,?>> EXEC_ON_FAILURE = (ConfigKey) ConfigKeys.newConfigKey(Function.class, "execOnFailure", "Callback to execute before throwing an exception, on any failure", Functions.identity());
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
index a675c78..423bdd3 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
@@ -79,7 +79,12 @@
     
     private RuntimeException newException(String msg) {
         try {
-            return getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
+            Exception result = getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
+            if (!(result instanceof RuntimeException)) {
+                return new RuntimeException("wrapping", result);
+            } else {
+                return (RuntimeException)result;
+            }
         } catch (Exception e) {
             throw Exceptions.propagate(e);
         }
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
index 92fdb55..ec8cacb 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
@@ -80,5 +80,23 @@
         
         assertEquals(newChildLoc.execScript(Collections.<String,Object>emptyMap(), "mysummary", ImmutableList.of("true")), 0);
     }
-    
+
+    // See https://issues.apache.org/jira/browse/BROOKLYN-215
+    @Test(groups="Integration")
+    public void testRebindWhenPortNull() throws Exception {
+        SshMachineLocation machine = origManagementContext.getLocationManager().createLocation(LocationSpec.create(SshMachineLocation.class)
+                .configure("address", "localhost")
+                .configure("port", null));
+        FixedListMachineProvisioningLocation<?> byon = origManagementContext.getLocationManager().createLocation(LocationSpec.create(FixedListMachineProvisioningLocation.class)
+                .configure("machines", ImmutableList.of(machine)));
+        origApp.start(ImmutableList.of(byon));
+        LOG.info("Before rebind, machine="+machine.toString());
+
+        newApp = (TestApplication) rebind();
+        
+        FixedListMachineProvisioningLocation<?> newByon = (FixedListMachineProvisioningLocation<?>) Iterables.getOnlyElement(newApp.getLocations(), 0);
+        SshMachineLocation newMachine = (SshMachineLocation) Iterables.get(newByon.getChildren(), 0);
+        
+        LOG.info("After rebind, machine="+newMachine.toString());
+    }
 }
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
index 3ff61f0..9593203 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
@@ -18,8 +18,12 @@
  */
 package org.apache.brooklyn.core.test.entity;
 
+import java.util.List;
+
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.ImplementedBy;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.entity.group.DynamicCluster;
 
 /**
@@ -27,4 +31,10 @@
 */
 @ImplementedBy(TestClusterImpl.class)
 public interface TestCluster extends DynamicCluster, EntityLocal {
+    
+    ConfigKey<Integer> MAX_SIZE = ConfigKeys.newIntegerConfigKey("testCluster.maxSize", "Size after which it will throw InsufficientCapacityException", Integer.MAX_VALUE);
+    
+    List<Integer> getSizeHistory();
+    
+    List<Integer> getDesiredSizeHistory();
 }
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
index bf8d0cb..0edea8f 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
@@ -18,22 +18,32 @@
  */
 package org.apache.brooklyn.core.test.entity;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.entity.group.DynamicClusterImpl;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
 
+import com.google.common.collect.ImmutableList;
+
 /**
 * Mock cluster entity for testing.
 */
 public class TestClusterImpl extends DynamicClusterImpl implements TestCluster {
     private volatile int size;
 
+    private final List<Integer> desiredSizeHistory = Collections.synchronizedList(new ArrayList<Integer>());
+    private final List<Integer> sizeHistory = Collections.synchronizedList(new ArrayList<Integer>());
+    
     public TestClusterImpl() {
     }
     
     @Override
     public void init() {
         super.init();
+        sizeHistory.add(size);
         size = getConfig(INITIAL_SIZE);
         sensors().set(Startable.SERVICE_UP, true);
     }
@@ -48,11 +58,36 @@
     
     @Override
     public Integer resize(Integer desiredSize) {
-        this.size = desiredSize;
+        desiredSizeHistory.add(desiredSize);
+        int achievableSize = Math.min(desiredSize, getConfig(MAX_SIZE));
+        
+        if (achievableSize != size) {
+            this.sizeHistory.add(achievableSize);
+            this.size = achievableSize;
+        }
+        
+        if (desiredSize > achievableSize) {
+            throw new InsufficientCapacityException("Simulating insufficient capacity (desiredSize="+desiredSize+"; maxSize="+getConfig(MAX_SIZE)+"; newSize="+size+")");
+        }
+
         return size;
     }
     
     @Override
+    public List<Integer> getSizeHistory() {
+        synchronized (sizeHistory) {
+            return ImmutableList.copyOf(sizeHistory);
+        }
+    }
+    
+    @Override
+    public List<Integer> getDesiredSizeHistory() {
+        synchronized (desiredSizeHistory) {
+            return ImmutableList.copyOf(desiredSizeHistory);
+        }
+    }
+    
+    @Override
     public void stop() {
         size = 0;
         super.stop();
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index dd37e58..f58ac90 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -44,6 +44,7 @@
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.NoMachinesAvailableException;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.core.entity.Attributes;
@@ -54,6 +55,7 @@
 import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
 import org.apache.brooklyn.core.entity.trait.Changeable;
 import org.apache.brooklyn.core.entity.trait.FailingEntity;
+import org.apache.brooklyn.core.entity.trait.Resizable;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
@@ -72,13 +74,13 @@
 import org.testng.annotations.Test;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Atomics;
 
 
@@ -202,6 +204,65 @@
         assertEquals(entity.getApplication(), app);
     }
 
+    @Test
+    public void testResizeWhereChildThrowsNoMachineAvailableExceptionIsPropagatedAsInsufficientCapacityException() throws Exception {
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+            .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                    .configure(FailingEntity.FAIL_ON_START, true)
+                    .configure(FailingEntity.EXCEPTION_CLAZZ, NoMachinesAvailableException.class))
+            .configure(DynamicCluster.INITIAL_SIZE, 0));
+        cluster.start(ImmutableList.of(loc));
+        
+        try {
+            cluster.resize(1);
+            Asserts.shouldHaveFailedPreviously();
+        } catch (Exception e) {
+            Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+        }
+    }
+
+    @Test
+    public void testResizeWhereSubsetOfChildrenThrowsNoMachineAvailableExceptionIsPropagatedAsInsuffientCapacityException() throws Exception {
+        final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+            .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+                    .configure(FailingEntity.FAIL_ON_START_CONDITION, new Predicate<FailingEntity>() {
+                        final AtomicInteger counter = new AtomicInteger();
+                        @Override public boolean apply(FailingEntity input) {
+                            // Only second and subsequent entities fail
+                            int index = counter.getAndIncrement();
+                            return (index >= 1);
+                        }})
+                    .configure(FailingEntity.EXCEPTION_CLAZZ, NoMachinesAvailableException.class))
+            .configure(DynamicCluster.INITIAL_SIZE, 0));
+        cluster.start(ImmutableList.of(loc));
+
+        // Managed to partially resize, but will still throw exception.
+        // The getCurrentSize will report how big we managed to get.
+        // The children that failed due to NoMachinesAvailableException will have been unmanaged automatically.
+        try {
+            cluster.resize(2);
+            Asserts.shouldHaveFailedPreviously();
+        } catch (Exception e) {
+            Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+        }
+        assertEquals(cluster.getCurrentSize(), (Integer)1);
+        Iterable<FailingEntity> children1 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+        assertEquals(Iterables.size(children1), 1);
+        assertEquals(Iterables.getOnlyElement(children1).sensors().get(TestEntity.SERVICE_UP), Boolean.TRUE);
+        
+        // This attempt will also fail, because all new children will fail
+        try {
+            cluster.resize(2);
+            Asserts.shouldHaveFailedPreviously();
+        } catch (Exception e) {
+            Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+        }
+        assertEquals(cluster.getCurrentSize(), (Integer)1);
+        Iterable<FailingEntity> children2 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+        assertEquals(Iterables.size(children2), 1);
+        assertEquals(Iterables.getOnlyElement(children2), Iterables.getOnlyElement(children1));
+    }
+
     /** This can be sensitive to order, e.g. if TestEntity set expected RUNNING before setting SERVICE_UP, 
      * there would be a point when TestEntity is ON_FIRE.
      * <p>
@@ -249,6 +310,7 @@
         assertEquals(Iterables.size(Entities.descendants(cluster, TestEntity.class)), 0);
     }
 
+    
     @Test
     public void currentSizePropertyReflectsActualClusterSize() throws Exception {
         DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
@@ -583,6 +645,62 @@
     }
 
     @Test
+    public void testQuarantineFailedEntitiesRespectsCustomFilter() throws Exception {
+        Predicate<Throwable> filter = new Predicate<Throwable>() {
+            @Override public boolean apply(Throwable input) {
+                return Exceptions.getFirstThrowableOfType(input, AllowedException.class) != null;
+            }
+        };
+        runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, DisallowedException.class, filter);
+    }
+    @SuppressWarnings("serial")
+    public static class AllowedException extends RuntimeException {
+        public AllowedException(String message) {
+            super(message);
+        }
+    }
+    @SuppressWarnings("serial")
+    public static class DisallowedException extends RuntimeException {
+        public DisallowedException(String message) {
+            super(message);
+        }
+    }
+
+    @Test
+    public void testQuarantineFailedEntitiesRespectsDefaultFilter() throws Exception {
+        Predicate<Throwable> filter = null;
+        runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, NoMachinesAvailableException.class, filter);
+    }
+    
+    protected void runQuarantineFailedEntitiesRespectsFilter(Class<? extends Exception> allowedException, 
+            Class<? extends Exception> disallowedException, Predicate<Throwable> quarantineFilter) throws Exception {
+        final List<Class<? extends Exception>> failureCauses = ImmutableList.<Class<? extends Exception>>of(allowedException, disallowedException);
+        final AtomicInteger counter = new AtomicInteger(0);
+        DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure("quarantineFailedEntities", true)
+                .configure("initialSize", 0)
+                .configure("quarantineFilter", quarantineFilter)
+                .configure("factory", new EntityFactory() {
+                    @Override public Entity newEntity(Map flags, Entity parent) {
+                        int num = counter.getAndIncrement();
+                        return app.getManagementContext().getEntityManager().createEntity(EntitySpec.create(FailingEntity.class)
+                                .configure(flags)
+                                .configure(FailingEntity.FAIL_ON_START, true)
+                                .configure(FailingEntity.EXCEPTION_CLAZZ, failureCauses.get(num))
+                                .parent(parent));
+                    }}));
+
+        cluster.start(ImmutableList.of(loc));
+        resizeExpectingError(cluster, 2);
+        Iterable<FailingEntity> children = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+        Collection<Entity> quarantineMembers = cluster.sensors().get(DynamicCluster.QUARANTINE_GROUP).getMembers();
+        
+        assertEquals(cluster.getCurrentSize(), (Integer)0);
+        assertEquals(Iterables.getOnlyElement(children).config().get(FailingEntity.EXCEPTION_CLAZZ), allowedException);
+        assertEquals(Iterables.getOnlyElement(quarantineMembers), Iterables.getOnlyElement(children));
+    }
+
+    @Test
     public void defaultRemovalStrategyShutsDownNewestFirstWhenResizing() throws Exception {
         final List<Entity> creationOrder = Lists.newArrayList();
         DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
diff --git a/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index d315260..b484359 100644
--- a/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -20,7 +20,6 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-import groovy.lang.Closure;
 
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -30,8 +29,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.brooklyn.api.catalog.Catalog;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
@@ -55,6 +52,8 @@
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -62,6 +61,8 @@
 import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import groovy.lang.Closure;
+
 
 /**
  * Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to
@@ -290,6 +291,7 @@
             .defaultValue(1)
             .reconfigurable(true)
             .build();
+    
     @SetFromFlag("resizeUpIterationMax")
     public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
             .name("autoscaler.resizeUpIterationMax")
@@ -297,6 +299,7 @@
             .description("Maximum change to the size on a single iteration when scaling up")
             .reconfigurable(true)
             .build();
+    
     @SetFromFlag("resizeDownIterationIncrement")
     public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
             .name("autoscaler.resizeDownIterationIncrement")
@@ -304,6 +307,7 @@
             .defaultValue(1)
             .reconfigurable(true)
             .build();
+    
     @SetFromFlag("resizeDownIterationMax")
     public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
             .name("autoscaler.resizeDownIterationMax")
@@ -346,6 +350,12 @@
             .reconfigurable(true)
             .build();
 
+    public static final ConfigKey<Integer> INSUFFICIENT_CAPACITY_HIGH_WATER_MARK = BasicConfigKey.builder(Integer.class)
+            .name("autoscaler.insufficientCapacityHighWaterMark")
+            .defaultValue(null)
+            .reconfigurable(true)
+            .build();
+
     @SetFromFlag("resizeOperator")
     public static final ConfigKey<ResizeOperator> RESIZE_OPERATOR = BasicConfigKey.builder(ResizeOperator.class)
             .name("autoscaler.resizeOperator")
@@ -564,6 +574,10 @@
         return getConfig(MAX_POOL_SIZE);
     }
     
+    private Integer getInsufficientCapacityHighWaterMark() {
+        return getConfig(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK);
+    }
+    
     private ResizeOperator getResizeOperator() {
         return getConfig(RESIZE_OPERATOR);
     }
@@ -620,6 +634,14 @@
                 throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
             }
             onPoolSizeLimitsChanged(getConfig(MIN_POOL_SIZE), newMax);
+        } else if (key.equals(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK)) {
+            Integer newVal = (Integer) val;
+            Integer oldVal = config().get(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK);
+            if (oldVal != null && (newVal == null || newVal > oldVal)) {
+                LOG.info("{} resetting {} to {}, which will enable resizing above previous level of {}",
+                        new Object[] {AutoScalerPolicy.this, INSUFFICIENT_CAPACITY_HIGH_WATER_MARK.getName(), newVal, oldVal});
+                // TODO see above about changing metricLowerBound; not triggering resize now
+            }
         } else {
             throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this);
         }
@@ -848,9 +870,17 @@
         onNewUnboundedPoolSize(desiredSizeUnconstrained);
     }
 
+    private int applyMinMaxConstraints(long desiredSize) {
+        return applyMinMaxConstraints(desiredSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)desiredSize);
+    }
+    
     private int applyMinMaxConstraints(int desiredSize) {
-        desiredSize = Math.max(getMinPoolSize(), desiredSize);
-        desiredSize = Math.min(getMaxPoolSize(), desiredSize);
+        int minSize = getMinPoolSize();
+        int maxSize = getMaxPoolSize();
+        Integer insufficientCapacityHighWaterMark = getInsufficientCapacityHighWaterMark();
+        desiredSize = Math.max(minSize, desiredSize);
+        desiredSize = Math.min(maxSize, desiredSize);
+        if (insufficientCapacityHighWaterMark != null) desiredSize = Math.min(insufficientCapacityHighWaterMark, desiredSize);
         return desiredSize;
     }
 
@@ -989,36 +1019,47 @@
     }
 
     private void resizeNow() {
-        long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+        final int currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
         CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
-        final long desiredPoolSize = calculatedDesiredPoolSize.size;
+        long desiredPoolSize = calculatedDesiredPoolSize.size;
         boolean stable = calculatedDesiredPoolSize.stable;
         
+        final int targetPoolSize = applyMinMaxConstraints(desiredPoolSize);
+        
         if (!stable) {
             // the desired size fluctuations are not stable; ensure we check again later (due to time-window)
             // even if no additional events have been received
             // (note we continue now with as "good" a resize as we can given the instability)
             if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...", 
-                    new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+                    new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
             scheduleResize();
         }
-        if (currentPoolSize == desiredPoolSize) {
+        if (currentPoolSize == targetPoolSize) {
             if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}", 
-                    new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+                    new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
             return;
         }
         
         if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}", 
-                new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
+                new Object[] {this, targetPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
         
         Entities.submit(entity, Tasks.<Void>builder().displayName("Auto-scaler")
-            .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize)
+            .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+targetPoolSize)
             .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
             .body(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
                     // TODO Should we use int throughout, rather than casting here?
-                    getResizeOperator().resize(poolEntity, (int) desiredPoolSize);
+                    try {
+                        getResizeOperator().resize(poolEntity, (int) targetPoolSize);
+                    } catch (Resizable.InsufficientCapacityException e) {
+                        // cannot resize beyond this; set the high-water mark
+                        int insufficientCapacityHighWaterMark = getCurrentSizeOperator().apply(poolEntity);
+                        LOG.warn("{} failed to resize {} due to insufficient capacity; setting high-water mark to {}, "
+                                + "and will not attempt to resize above that level again", 
+                                new Object[] {AutoScalerPolicy.this, poolEntity, insufficientCapacityHighWaterMark});
+                        config().set(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK, insufficientCapacityHighWaterMark);
+                    }
                     return null;
                 }
             }).build())
diff --git a/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java b/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
index e04f714..ad67b75 100644
--- a/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
+++ b/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
@@ -36,17 +36,18 @@
 import org.apache.brooklyn.core.test.entity.TestCluster;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
 public class AutoScalerPolicyMetricTest {
     
-    private static long TIMEOUT_MS = 10000;
-    private static long SHORT_WAIT_MS = 50;
+    private static long SHORT_WAIT_MS = 250;
     
     private static final AttributeSensor<Integer> MY_ATTRIBUTE = Sensors.newIntegerSensor("autoscaler.test.intAttrib");
     TestApplication app;
@@ -75,7 +76,7 @@
         Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 1));
 
         tc.sensors().set(MY_ATTRIBUTE, 101);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
     }
     
     @Test
@@ -89,7 +90,7 @@
         Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
 
         tc.sensors().set(MY_ATTRIBUTE, 49);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 1));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 1));
     }
     
     @Test(groups="Integration")
@@ -101,11 +102,11 @@
         
         // workload 200 so requires doubling size to 10 to handle: (200*5)/100 = 10
         tc.sensors().set(MY_ATTRIBUTE, 200);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 10));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 10));
         
         // workload 5, requires 1 entity: (10*110)/100 = 11
         tc.sensors().set(MY_ATTRIBUTE, 110);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 11));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 11));
     }
     
     @Test(groups="Integration")
@@ -117,14 +118,14 @@
         
         // workload can be handled by 4 servers, within its valid range: (49*5)/50 = 4.9
         tc.sensors().set(MY_ATTRIBUTE, 49);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 4));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 4));
         
         // workload can be handled by 4 servers, within its valid range: (25*4)/50 = 2
         tc.sensors().set(MY_ATTRIBUTE, 25);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
         
         tc.sensors().set(MY_ATTRIBUTE, 0);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 1));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 1));
     }
     
     @Test(groups="Integration")
@@ -139,11 +140,11 @@
 
         // Decreases to min-size only
         tc.sensors().set(MY_ATTRIBUTE, 0);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
         
         // Increases to max-size only
         tc.sensors().set(MY_ATTRIBUTE, 100000);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 6));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 6));
     }
     
     @Test(groups="Integration",invocationCount=20)
@@ -167,14 +168,14 @@
 
         // workload can be handled by 6 servers, so no need to notify: 6 <= (100*6)/50
         tc.sensors().set(MY_ATTRIBUTE, 600);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 6));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 6));
         assertTrue(maxReachedEvents.isEmpty());
         
         // Increases to above max capacity: would require (100000*6)/100 = 6000
         tc.sensors().set(MY_ATTRIBUTE, 100000);
         
         // Assert our listener gets notified (once)
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+        Asserts.succeedsEventually(new Runnable() {
             public void run() {
                 assertEquals(maxReachedEvents.size(), 1);
                 assertEquals(maxReachedEvents.get(0).getMaxAllowed(), 6);
@@ -245,7 +246,7 @@
         policy.suspend();
         policy.resume();
         tc.sensors().set(MY_ATTRIBUTE, 101);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
     }
     
     @Test
@@ -268,6 +269,84 @@
 
         // Then confirm we listen to the correct "entityWithMetric"
         entityWithMetric.sensors().set(TestEntity.SEQUENCE, 101);
-        Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+    }
+    
+    @Test(groups="Integration")
+    public void testOnFailedGrowWillSetHighwaterMarkAndNotResizeAboveThatAgain() {
+        tc = app.createAndManageChild(EntitySpec.create(TestCluster.class)
+                .configure("initialSize", 0)
+                .configure(TestCluster.MAX_SIZE, 2));
+
+        tc.resize(1);
+        
+        tc.policies().add(AutoScalerPolicy.builder()
+                .metric(MY_ATTRIBUTE)
+                .metricLowerBound(50)
+                .metricUpperBound(100)
+                .buildSpec());
+        
+        // workload 200 so requires doubling size to 2 to handle: (200*1)/100 = 2
+        tc.sensors().set(MY_ATTRIBUTE, 200);
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2), "desired="+tc.getDesiredSizeHistory());
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+        tc.sensors().set(MY_ATTRIBUTE, 100);
+        
+        // workload 110, requires 1 more entity: (2*110)/100 = 2.1
+        // But max size is 2, so resize will fail.
+        tc.sensors().set(MY_ATTRIBUTE, 110);
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory()); // TODO succeeds eventually?
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+        
+        // another attempt to resize will not cause it to ask
+        tc.sensors().set(MY_ATTRIBUTE, 110);
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory());
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+        
+        // but if we shrink down again, then we'll still be able to go up to the previous level.
+        // We'll only try to go as high as the previous high-water mark though.
+        tc.sensors().set(MY_ATTRIBUTE, 1);
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 1));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3, 1), "desired="+tc.getDesiredSizeHistory());
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2, 1), "sizes="+tc.getSizeHistory());
+        
+        tc.sensors().set(MY_ATTRIBUTE, 10000);
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3, 1, 2), "desired="+tc.getDesiredSizeHistory());
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2, 1, 2), "sizes="+tc.getSizeHistory());
+    }
+    
+    // When there is a resizeUpStabilizationDelay, it remembers all the previously requested sizes (in the recent history)
+    // and then looks at those in the stabilization-delay to determine the sustained desired. This test checks that
+    // we apply the highwater-mark even when the desired size had been recorded prior to the highwater mark being
+    // discovered.
+    @Test(groups="Integration")
+    public void testOnFailedGrowWithStabilizationDelayWillSetHighwaterMarkAndNotResizeAboveThatAgain() throws Exception {
+        tc = app.createAndManageChild(EntitySpec.create(TestCluster.class)
+                .configure("initialSize", 0)
+                .configure(TestCluster.MAX_SIZE, 2));
+
+        tc.resize(1);
+        
+        tc.policies().add(AutoScalerPolicy.builder()
+                .metric(MY_ATTRIBUTE)
+                .metricLowerBound(50)
+                .metricUpperBound(100)
+                .resizeUpStabilizationDelay(Duration.ONE_SECOND)
+                .buildSpec());
+        
+        // workload 200 so requires doubling size to 2 to handle: (200*1)/100 = 2
+        for (int i = 0; i < 10; i++) {
+            tc.sensors().set(MY_ATTRIBUTE, 200 + (i*100));
+            Thread.sleep(100);
+        }
+        
+        Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+        Asserts.succeedsContinually(currentSizeAsserter(tc, 2));
+        assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory());
+        assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
     }
 }
diff --git a/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java b/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java
new file mode 100644
index 0000000..77175d2
--- /dev/null
+++ b/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.test.autoscaling;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.trait.Resizable;
+import org.apache.brooklyn.core.mgmt.internal.CollectionChangeListener;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestCluster;
+import org.apache.brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess;
+import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+public class AutoScalerPolicyNoMoreMachinesTest extends BrooklynAppUnitTestSupport {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(AutoScalerPolicyNoMoreMachinesTest.class);
+    
+    private static long SHORT_WAIT_MS = 250;
+    
+    DynamicCluster cluster;
+    Location loc;
+    AutoScalerPolicy policy;
+    Set<Entity> entitiesAdded;
+    Set<Entity> entitiesRemoved;
+    
+    @BeforeMethod(alwaysRun=true)
+    public void setUp() throws Exception {
+        super.setUp();
+        cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+                .configure(TestCluster.INITIAL_SIZE, 0)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)
+                        .configure(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION, true)));
+        loc = mgmt.getLocationRegistry().resolve("byon(hosts='1.1.1.1,1.1.1.2')");
+        app.start(ImmutableList.of(loc));
+        
+        entitiesAdded = Sets.newLinkedHashSet();
+        entitiesRemoved = Sets.newLinkedHashSet();
+        mgmt.addEntitySetListener(new CollectionChangeListener<Entity>() {
+            @Override public void onItemAdded(Entity item) {
+                entitiesAdded.add(item);
+            }
+            @Override public void onItemRemoved(Entity item) {
+                entitiesRemoved.add(item);
+            }});
+    }
+
+    @Test
+    public void testResizeDirectly() throws Exception {
+        assertSize(0);
+
+        cluster.resize(2);
+        assertSize(2);
+        
+        // Won't get a location to successfully resize (byon location only has 2 machines); 
+        // so still left with 2 members (failed node not quarantined, because exception well understood)
+        try {
+            cluster.resize(3);
+            Asserts.shouldHaveFailedPreviously();
+        } catch (Exception e) {
+            Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+        }
+        assertSize(2, 0, 1);
+
+        // Resize down; will delete one of our nodes
+        cluster.resize(1);
+        assertSize(1, 0, 2);
+        
+        // Resize back up to 2 should be allowed
+        cluster.resize(2);
+        assertSize(2, 0, 2);
+    }
+    
+    @Test
+    public void testPoolHotSensorResizingBeyondMaxMachines() throws Exception {
+        cluster.resize(1);
+        policy = cluster.policies().add(PolicySpec.create(AutoScalerPolicy.class)
+                .configure(AutoScalerPolicy.MIN_PERIOD_BETWEEN_EXECS, Duration.millis(10)));
+
+        // Single node trying to handle a load of 21; too high, so will add one more node
+        cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L));
+        assertSizeEventually(2);
+
+        // Two nodes handing an aggregated load of 41; too high for 2 nodes so tries to scale to 3.
+        // But byon location only has 2 nodes so will fail.
+        cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L));
+        assertSizeEventually(2, 0, 1);
+
+        // Should not repeatedly retry
+        assertSizeContinually(2, 0, 1);
+        
+        // If there is another indication of too much load, should not retry yet again.
+        cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(42L, 10L, 20L));
+        assertSizeContinually(2, 0, 1);
+    }
+    
+    @Test
+    public void testMetricResizingBeyondMaxMachines() throws Exception {
+        AttributeSensor<Integer> metric = Sensors.newIntegerSensor("test.aggregatedLoad");
+        
+        cluster.resize(1);
+        policy = cluster.policies().add(PolicySpec.create(AutoScalerPolicy.class)
+                .configure(AutoScalerPolicy.METRIC, metric)
+                .configure(AutoScalerPolicy.METRIC_LOWER_BOUND, 10)
+                .configure(AutoScalerPolicy.METRIC_UPPER_BOUND, 20)
+                .configure(AutoScalerPolicy.MIN_PERIOD_BETWEEN_EXECS, Duration.millis(10)));
+
+        // Single node trying to handle a load of 21; too high, so will add one more node.
+        // That takes the load back to within acceptable limits
+        cluster.sensors().set(metric, 21);
+        assertSizeEventually(2);
+        cluster.sensors().set(metric, 19);
+
+        // With two nodes, load is now too high, so will try (and fail) to add one more node.
+        // Trigger another attempt to resize.
+        // Any nodes that fail with NoMachinesAvailableException will be immediately deleted.
+        cluster.sensors().set(metric, 22);
+        assertSizeEventually(2, 0, 1);
+        assertSizeContinually(2, 0, 1);
+        
+        // Metric is re-published; should not keep retrying
+        cluster.sensors().set(metric, 21);
+        assertSizeContinually(2, 0, 1);
+    }
+
+    protected Map<String, Object> message(double currentWorkrate, double lowThreshold, double highThreshold) {
+        return message(cluster.getCurrentSize(), currentWorkrate, lowThreshold, highThreshold);
+    }
+    
+    protected Map<String, Object> message(int currentSize, double currentWorkrate, double lowThreshold, double highThreshold) {
+        return ImmutableMap.<String,Object>of(
+            AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, currentSize,
+            AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, currentWorkrate,
+            AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, lowThreshold,
+            AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, highThreshold);
+    }
+    
+    protected void assertSize(Integer targetSize) {
+        assertSize(targetSize, 0);
+    }
+
+    protected void assertSize(int targetSize, int quarantineSize, final int deletedSize) {
+        assertSize(targetSize, quarantineSize);
+        assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+    }
+    
+    protected void assertSize(int targetSize, int quarantineSize) {
+        assertEquals(cluster.getCurrentSize(), (Integer) targetSize, "cluster.currentSize");
+        assertEquals(cluster.getMembers().size(), targetSize, "cluster.members.size");
+        assertEquals(cluster.sensors().get(DynamicCluster.QUARANTINE_GROUP).getMembers().size(), quarantineSize, "cluster.quarantine.size");
+        assertEquals(mgmt.getEntityManager().findEntities(Predicates.instanceOf(EmptySoftwareProcess.class)).size(), targetSize + quarantineSize, "instanceCount(EmptySoftwareProcess)");
+    }
+    
+    protected void assertSizeEventually(int targetSize) {
+        assertSizeEventually(targetSize, 0, 0);
+    }
+    
+    protected void assertSizeEventually(final int targetSize, final int quarantineSize, final int deletedSize) {
+        Asserts.succeedsEventually(new Runnable() {
+            public void run() {
+                assertSize(targetSize, quarantineSize);
+                assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+            }});
+    }
+    
+    protected void assertSizeContinually(final int targetSize, final int quarantineSize, final int deletedSize) {
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+            public void run() {
+                assertSize(targetSize, quarantineSize);
+                assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+            }});
+    }
+}
diff --git a/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index 2bd7b5c..15aa76e 100644
--- a/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1115,22 +1115,29 @@
         if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
     }
     
-    /** Tests that an exception is not {@link ShouldHaveFailedPreviouslyAssertionError}
-     * and is one of the given types. 
+    /**
+     * Tests that an exception is not {@link ShouldHaveFailedPreviouslyAssertionError}
+     * and is either one of the given types, or has a caused-by of one of the given types.
+     * 
+     * If you want *just* the instanceof (without checking the caused-by), then just catch
+     * those exception types, treat that as success, and let any other exception be propagated.
      * 
      * @return If the test is satisfied, this method returns normally. 
      * The caller can decide whether anything more should be done with the exception.
      * If the test fails, then either it is propagated, 
      * if the {@link Throwable} is a fatal ({@link Exceptions#propagateIfFatal(Throwable)}) other than an {@link AssertionError}, 
      * or more usually the test failure of this method is thrown, 
-     * with detail of the original {@link Throwable} logged. */
+     * with detail of the original {@link Throwable} logged and included in the caused-by.
+     */
     public static void expectedFailureOfType(Throwable e, Class<?> ...permittedSupertypes) {
         if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
-        for (Class<?> t: permittedSupertypes) {
-            if (t.isInstance(e)) return;
+        for (Class<?> clazz: permittedSupertypes) {
+            @SuppressWarnings("unchecked")
+            Throwable match = Exceptions.getFirstThrowableOfType(e, (Class<? extends Throwable>)clazz);
+            if (match != null) return;
         }
         rethrowPreferredException(e, 
-            new AssertionError("Error "+JavaClassNames.simpleClassName(e)+" is not any of the expected types: " + Arrays.asList(permittedSupertypes)));
+            new AssertionError("Error "+JavaClassNames.simpleClassName(e)+" is not any of the expected types: " + Arrays.asList(permittedSupertypes), e));
     }
     
     /** Tests {@link #expectedFailure(Throwable)} and that the <code>toString</code>