This closes #1151
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
index 68c9398..36e6ba8 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/entity/trait/Resizable.java
@@ -31,6 +31,21 @@
*/
public interface Resizable {
+ /**
+ * Indicates that resizing up to the desired size is not possible - only resized to the
+ * {@link Resizable#getCurrentSize()}, because there is insufficient capacity.
+ */
+ public static class InsufficientCapacityException extends RuntimeException {
+ private static final long serialVersionUID = 953230498564942446L;
+
+ public InsufficientCapacityException(String msg) {
+ super(msg);
+ }
+ public InsufficientCapacityException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+
MethodEffector<Integer> RESIZE = new MethodEffector<Integer>(Resizable.class, "resize");
/**
@@ -38,6 +53,9 @@
*
* @param desiredSize the new size of the entity group.
* @return the new size of the group.
+ *
+ * @throws InsufficientCapacityException If the request was to grow, but there is no capacity to grow to
+ * the desired size.
*/
@Effector(description="Changes the size of the entity (e.g. the number of nodes in a cluster)")
Integer resize(@EffectorParam(name="desiredSize", description="The new size of the cluster") Integer desiredSize);
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
index f528db7..781cb0c 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicCluster.java
@@ -48,6 +48,7 @@
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.reflect.TypeToken;
@@ -101,6 +102,15 @@
ConfigKey<Boolean> QUARANTINE_FAILED_ENTITIES = ConfigKeys.newBooleanConfigKey(
"dynamiccluster.quarantineFailedEntities", "If true, will quarantine entities that fail to start; if false, will get rid of them (i.e. delete them)", true);
+ @SetFromFlag("quarantineFilter")
+ ConfigKey<Predicate<? super Throwable>> QUARANTINE_FILTER = ConfigKeys.newConfigKey(
+ new TypeToken<Predicate<? super Throwable>>() {},
+ "dynamiccluster.quarantineFilter",
+ "Quarantine the failed nodes that pass this filter (given the exception thrown by the node). "
+ + "Default is those that did not fail with NoMachinesAvailableException "
+ + "(Config ignored if quarantineFailedEntities is false)",
+ null);
+
AttributeSensor<Lifecycle> SERVICE_STATE_ACTUAL = Attributes.SERVICE_STATE_ACTUAL;
BasicNotificationSensor<Entity> ENTITY_QUARANTINED = new BasicNotificationSensor<Entity>(Entity.class, "dynamiccluster.entityQuarantined", "Entity failed to start, and has been quarantined");
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index b8e5c63..16a82d4 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -38,6 +38,7 @@
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -50,6 +51,7 @@
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.core.entity.trait.Resizable;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.entity.trait.StartableMethods;
import org.apache.brooklyn.core.location.Locations;
@@ -80,6 +82,7 @@
import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
@@ -331,6 +334,19 @@
return getAttribute(QUARANTINE_GROUP);
}
+ protected Predicate<? super Throwable> getQuarantineFilter() {
+ Predicate<? super Throwable> result = getConfig(QUARANTINE_FILTER);
+ if (result != null) {
+ return result;
+ } else {
+ return new Predicate<Throwable>() {
+ @Override public boolean apply(Throwable input) {
+ return Exceptions.getFirstThrowableOfType(input, NoMachinesAvailableException.class) == null;
+ }
+ };
+ }
+ }
+
protected int getInitialQuorumSize() {
int initialSize = getConfig(INITIAL_SIZE).intValue();
int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue();
@@ -518,7 +534,20 @@
} else {
if (LOG.isDebugEnabled()) LOG.debug("Resize no-op {} from {} to {}", new Object[] {this, originalSize, desiredSize});
}
- resizeByDelta(delta);
+ // If we managed to grow at all, then expect no exception.
+ // Otherwise, if failed because NoMachinesAvailable, then propagate as InsufficientCapacityException.
+ // This tells things like the AutoScalerPolicy to not keep retrying.
+ try {
+ resizeByDelta(delta);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ NoMachinesAvailableException nmae = Exceptions.getFirstThrowableOfType(e, NoMachinesAvailableException.class);
+ if (nmae != null) {
+ throw new Resizable.InsufficientCapacityException("Failed to resize", e);
+ } else {
+ throw Exceptions.propagate(e);
+ }
+ }
}
return getCurrentSize();
}
@@ -669,7 +698,7 @@
}
}
- /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
+ /** <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}. */
protected Collection<Entity> grow(int delta) {
Preconditions.checkArgument(delta > 0, "Must call grow with positive delta.");
@@ -696,8 +725,10 @@
chosenLocations = Collections.nCopies(delta, getLocation());
}
- // create and start the entities
- return addInEachLocation(chosenLocations, ImmutableMap.of()).getWithError();
+ // create and start the entities.
+ // if any fail, then propagate the error.
+ ReferenceWithError<Collection<Entity>> result = addInEachLocation(chosenLocations, ImmutableMap.of());
+ return result.getWithError();
}
/** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
@@ -786,7 +817,7 @@
// quarantine/cleanup as necessary
if (!errors.isEmpty()) {
if (isQuarantineEnabled()) {
- quarantineFailedNodes(errors.keySet());
+ quarantineFailedNodes(errors);
} else {
cleanupFailedNodes(errors.keySet());
}
@@ -796,11 +827,18 @@
return ReferenceWithError.newInstanceWithoutError(result);
}
- protected void quarantineFailedNodes(Collection<Entity> failedEntities) {
- for (Entity entity : failedEntities) {
- sensors().emit(ENTITY_QUARANTINED, entity);
- getQuarantineGroup().addMember(entity);
- removeMember(entity);
+ protected void quarantineFailedNodes(Map<Entity, Throwable> failedEntities) {
+ for (Map.Entry<Entity, Throwable> entry : failedEntities.entrySet()) {
+ Entity entity = entry.getKey();
+ Throwable cause = entry.getValue();
+ if (cause == null || getQuarantineFilter().apply(cause)) {
+ sensors().emit(ENTITY_QUARANTINED, entity);
+ getQuarantineGroup().addMember(entity);
+ removeMember(entity);
+ } else {
+ LOG.info("Cluster {} discarding failed node {}, rather than quarantining", this, entity);
+ discardNode(entity);
+ }
}
}
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
index 5e74e7c..b059858 100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
@@ -89,6 +89,7 @@
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.KeyTransformingLoadingCache.KeyTransformingSameTypeLoadingCache;
+import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.pool.BasicPool;
import org.apache.brooklyn.util.pool.Pool;
import org.apache.brooklyn.util.ssh.BashCommands;
@@ -389,7 +390,7 @@
private BasicPool<SshTool> buildPool(final Map<String, ?> properties) {
return BasicPool.<SshTool>builder()
.name(getDisplayName()+"@"+address+":"+getPort()+
- (config().getRaw(SSH_HOST).isPresent() ? "("+getConfig(SSH_HOST)+":"+getConfig(SSH_PORT)+")" : "")+
+ (config().getRaw(SSH_HOST).isPresent() ? "("+getConfig(SSH_HOST)+":"+getPort()+")" : "")+
":hash"+System.identityHashCode(this))
.supplier(new Supplier<SshTool>() {
@Override public SshTool get() {
@@ -550,7 +551,21 @@
/** port for SSHing */
public int getPort() {
- return getConfig(SshTool.PROP_PORT);
+ // Prefer PROP_PORT (i.e. "port"). However if that is explicitly null or is not set, then see if
+ // SSH_PORT (i.e. "brooklyn.ssh.config.port") has been set and use that.
+ // If neither is set (or is explicitly set to null), then use the default PROP_PORT value.
+ //
+ // Note we don't just rely on config().get(PROP_PORT) returning the default, because we hit a rebind
+ // error where the location's port configuration had been explicitly set to null.
+ // See https://issues.apache.org/jira/browse/BROOKLYN-215
+
+ Maybe<Object> raw = config().getRaw(SshTool.PROP_PORT);
+ if (raw.orNull() == null && config().getRaw(SSH_PORT).orNull() != null) {
+ return config().get(SSH_PORT);
+ } else {
+ Integer result = config().get(SshTool.PROP_PORT);
+ return (result != null) ? result : SshTool.PROP_PORT.getDefaultValue();
+ }
}
protected <T> T execSsh(final Map<String, ?> props, final Function<ShellTool, T> task) {
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
index 532eb4d..7845326 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntity.java
@@ -60,7 +60,7 @@
ConfigKey<Predicate<? super FailingEntity>> FAIL_ON_RESTART_CONDITION = (ConfigKey) ConfigKeys.newConfigKey(Predicate.class, "failOnRestartCondition", "Whether to throw exception on call to restart", null);
@SetFromFlag("exceptionClazz")
- ConfigKey<Class<? extends RuntimeException>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);
+ ConfigKey<Class<? extends Exception>> EXCEPTION_CLAZZ = (ConfigKey) ConfigKeys.newConfigKey(Class.class, "exceptionClazz", "Type of exception to throw", IllegalStateException.class);
@SetFromFlag("execOnFailure")
ConfigKey<Function<? super FailingEntity,?>> EXEC_ON_FAILURE = (ConfigKey) ConfigKeys.newConfigKey(Function.class, "execOnFailure", "Callback to execute before throwing an exception, on any failure", Functions.identity());
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
index a675c78..423bdd3 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/trait/FailingEntityImpl.java
@@ -79,7 +79,12 @@
private RuntimeException newException(String msg) {
try {
- return getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
+ Exception result = getConfig(EXCEPTION_CLAZZ).getConstructor(String.class).newInstance("Simulating entity stop failure for test");
+ if (!(result instanceof RuntimeException)) {
+ return new RuntimeException("wrapping", result);
+ } else {
+ return (RuntimeException)result;
+ }
} catch (Exception e) {
throw Exceptions.propagate(e);
}
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
index 92fdb55..ec8cacb 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindSshMachineLocationTest.java
@@ -80,5 +80,23 @@
assertEquals(newChildLoc.execScript(Collections.<String,Object>emptyMap(), "mysummary", ImmutableList.of("true")), 0);
}
-
+
+ // See https://issues.apache.org/jira/browse/BROOKLYN-215
+ @Test(groups="Integration")
+ public void testRebindWhenPortNull() throws Exception {
+ SshMachineLocation machine = origManagementContext.getLocationManager().createLocation(LocationSpec.create(SshMachineLocation.class)
+ .configure("address", "localhost")
+ .configure("port", null));
+ FixedListMachineProvisioningLocation<?> byon = origManagementContext.getLocationManager().createLocation(LocationSpec.create(FixedListMachineProvisioningLocation.class)
+ .configure("machines", ImmutableList.of(machine)));
+ origApp.start(ImmutableList.of(byon));
+ LOG.info("Before rebind, machine="+machine.toString());
+
+ newApp = (TestApplication) rebind();
+
+ FixedListMachineProvisioningLocation<?> newByon = (FixedListMachineProvisioningLocation<?>) Iterables.getOnlyElement(newApp.getLocations(), 0);
+ SshMachineLocation newMachine = (SshMachineLocation) Iterables.get(newByon.getChildren(), 0);
+
+ LOG.info("After rebind, machine="+newMachine.toString());
+ }
}
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
index 3ff61f0..9593203 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestCluster.java
@@ -18,8 +18,12 @@
*/
package org.apache.brooklyn.core.test.entity;
+import java.util.List;
+
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.ImplementedBy;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.entity.group.DynamicCluster;
/**
@@ -27,4 +31,10 @@
*/
@ImplementedBy(TestClusterImpl.class)
public interface TestCluster extends DynamicCluster, EntityLocal {
+
+ ConfigKey<Integer> MAX_SIZE = ConfigKeys.newIntegerConfigKey("testCluster.maxSize", "Size after which it will throw InsufficientCapacityException", Integer.MAX_VALUE);
+
+ List<Integer> getSizeHistory();
+
+ List<Integer> getDesiredSizeHistory();
}
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
index bf8d0cb..0edea8f 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/test/entity/TestClusterImpl.java
@@ -18,22 +18,32 @@
*/
package org.apache.brooklyn.core.test.entity;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.entity.group.DynamicClusterImpl;
import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import com.google.common.collect.ImmutableList;
+
/**
* Mock cluster entity for testing.
*/
public class TestClusterImpl extends DynamicClusterImpl implements TestCluster {
private volatile int size;
+ private final List<Integer> desiredSizeHistory = Collections.synchronizedList(new ArrayList<Integer>());
+ private final List<Integer> sizeHistory = Collections.synchronizedList(new ArrayList<Integer>());
+
public TestClusterImpl() {
}
@Override
public void init() {
super.init();
+ sizeHistory.add(size);
size = getConfig(INITIAL_SIZE);
sensors().set(Startable.SERVICE_UP, true);
}
@@ -48,11 +58,36 @@
@Override
public Integer resize(Integer desiredSize) {
- this.size = desiredSize;
+ desiredSizeHistory.add(desiredSize);
+ int achievableSize = Math.min(desiredSize, getConfig(MAX_SIZE));
+
+ if (achievableSize != size) {
+ this.sizeHistory.add(achievableSize);
+ this.size = achievableSize;
+ }
+
+ if (desiredSize > achievableSize) {
+ throw new InsufficientCapacityException("Simulating insufficient capacity (desiredSize="+desiredSize+"; maxSize="+getConfig(MAX_SIZE)+"; newSize="+size+")");
+ }
+
return size;
}
@Override
+ public List<Integer> getSizeHistory() {
+ synchronized (sizeHistory) {
+ return ImmutableList.copyOf(sizeHistory);
+ }
+ }
+
+ @Override
+ public List<Integer> getDesiredSizeHistory() {
+ synchronized (desiredSizeHistory) {
+ return ImmutableList.copyOf(desiredSizeHistory);
+ }
+ }
+
+ @Override
public void stop() {
size = 0;
super.stop();
diff --git a/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java b/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
index dd37e58..f58ac90 100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/entity/group/DynamicClusterTest.java
@@ -44,6 +44,7 @@
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.NoMachinesAvailableException;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.core.entity.Attributes;
@@ -54,6 +55,7 @@
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.trait.Changeable;
import org.apache.brooklyn.core.entity.trait.FailingEntity;
+import org.apache.brooklyn.core.entity.trait.Resizable;
import org.apache.brooklyn.core.location.SimulatedLocation;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
@@ -72,13 +74,13 @@
import org.testng.annotations.Test;
import com.google.common.base.Function;
+import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Atomics;
@@ -202,6 +204,65 @@
assertEquals(entity.getApplication(), app);
}
+ @Test
+ public void testResizeWhereChildThrowsNoMachineAvailableExceptionIsPropagatedAsInsufficientCapacityException() throws Exception {
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_START, true)
+ .configure(FailingEntity.EXCEPTION_CLAZZ, NoMachinesAvailableException.class))
+ .configure(DynamicCluster.INITIAL_SIZE, 0));
+ cluster.start(ImmutableList.of(loc));
+
+ try {
+ cluster.resize(1);
+ Asserts.shouldHaveFailedPreviously();
+ } catch (Exception e) {
+ Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+ }
+ }
+
+ @Test
+ public void testResizeWhereSubsetOfChildrenThrowsNoMachineAvailableExceptionIsPropagatedAsInsuffientCapacityException() throws Exception {
+ final DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(FailingEntity.class)
+ .configure(FailingEntity.FAIL_ON_START_CONDITION, new Predicate<FailingEntity>() {
+ final AtomicInteger counter = new AtomicInteger();
+ @Override public boolean apply(FailingEntity input) {
+ // Only second and subsequent entities fail
+ int index = counter.getAndIncrement();
+ return (index >= 1);
+ }})
+ .configure(FailingEntity.EXCEPTION_CLAZZ, NoMachinesAvailableException.class))
+ .configure(DynamicCluster.INITIAL_SIZE, 0));
+ cluster.start(ImmutableList.of(loc));
+
+ // Managed to partially resize, but will still throw exception.
+ // The getCurrentSize will report how big we managed to get.
+ // The children that failed due to NoMachinesAvailableException will have been unmanaged automatically.
+ try {
+ cluster.resize(2);
+ Asserts.shouldHaveFailedPreviously();
+ } catch (Exception e) {
+ Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+ }
+ assertEquals(cluster.getCurrentSize(), (Integer)1);
+ Iterable<FailingEntity> children1 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+ assertEquals(Iterables.size(children1), 1);
+ assertEquals(Iterables.getOnlyElement(children1).sensors().get(TestEntity.SERVICE_UP), Boolean.TRUE);
+
+ // This attempt will also fail, because all new children will fail
+ try {
+ cluster.resize(2);
+ Asserts.shouldHaveFailedPreviously();
+ } catch (Exception e) {
+ Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+ }
+ assertEquals(cluster.getCurrentSize(), (Integer)1);
+ Iterable<FailingEntity> children2 = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+ assertEquals(Iterables.size(children2), 1);
+ assertEquals(Iterables.getOnlyElement(children2), Iterables.getOnlyElement(children1));
+ }
+
/** This can be sensitive to order, e.g. if TestEntity set expected RUNNING before setting SERVICE_UP,
* there would be a point when TestEntity is ON_FIRE.
* <p>
@@ -249,6 +310,7 @@
assertEquals(Iterables.size(Entities.descendants(cluster, TestEntity.class)), 0);
}
+
@Test
public void currentSizePropertyReflectsActualClusterSize() throws Exception {
DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
@@ -583,6 +645,62 @@
}
@Test
+ public void testQuarantineFailedEntitiesRespectsCustomFilter() throws Exception {
+ Predicate<Throwable> filter = new Predicate<Throwable>() {
+ @Override public boolean apply(Throwable input) {
+ return Exceptions.getFirstThrowableOfType(input, AllowedException.class) != null;
+ }
+ };
+ runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, DisallowedException.class, filter);
+ }
+ @SuppressWarnings("serial")
+ public static class AllowedException extends RuntimeException {
+ public AllowedException(String message) {
+ super(message);
+ }
+ }
+ @SuppressWarnings("serial")
+ public static class DisallowedException extends RuntimeException {
+ public DisallowedException(String message) {
+ super(message);
+ }
+ }
+
+ @Test
+ public void testQuarantineFailedEntitiesRespectsDefaultFilter() throws Exception {
+ Predicate<Throwable> filter = null;
+ runQuarantineFailedEntitiesRespectsFilter(AllowedException.class, NoMachinesAvailableException.class, filter);
+ }
+
+ protected void runQuarantineFailedEntitiesRespectsFilter(Class<? extends Exception> allowedException,
+ Class<? extends Exception> disallowedException, Predicate<Throwable> quarantineFilter) throws Exception {
+ final List<Class<? extends Exception>> failureCauses = ImmutableList.<Class<? extends Exception>>of(allowedException, disallowedException);
+ final AtomicInteger counter = new AtomicInteger(0);
+ DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure("quarantineFailedEntities", true)
+ .configure("initialSize", 0)
+ .configure("quarantineFilter", quarantineFilter)
+ .configure("factory", new EntityFactory() {
+ @Override public Entity newEntity(Map flags, Entity parent) {
+ int num = counter.getAndIncrement();
+ return app.getManagementContext().getEntityManager().createEntity(EntitySpec.create(FailingEntity.class)
+ .configure(flags)
+ .configure(FailingEntity.FAIL_ON_START, true)
+ .configure(FailingEntity.EXCEPTION_CLAZZ, failureCauses.get(num))
+ .parent(parent));
+ }}));
+
+ cluster.start(ImmutableList.of(loc));
+ resizeExpectingError(cluster, 2);
+ Iterable<FailingEntity> children = Iterables.filter(cluster.getChildren(), FailingEntity.class);
+ Collection<Entity> quarantineMembers = cluster.sensors().get(DynamicCluster.QUARANTINE_GROUP).getMembers();
+
+ assertEquals(cluster.getCurrentSize(), (Integer)0);
+ assertEquals(Iterables.getOnlyElement(children).config().get(FailingEntity.EXCEPTION_CLAZZ), allowedException);
+ assertEquals(Iterables.getOnlyElement(quarantineMembers), Iterables.getOnlyElement(children));
+ }
+
+ @Test
public void defaultRemovalStrategyShutsDownNewestFirstWhenResizing() throws Exception {
final List<Entity> creationOrder = Lists.newArrayList();
DynamicCluster cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
diff --git a/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index d315260..b484359 100644
--- a/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/brooklyn-server/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-import groovy.lang.Closure;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -30,8 +29,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.brooklyn.api.catalog.Catalog;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
@@ -55,6 +52,8 @@
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -62,6 +61,8 @@
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import groovy.lang.Closure;
+
/**
* Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to
@@ -290,6 +291,7 @@
.defaultValue(1)
.reconfigurable(true)
.build();
+
@SetFromFlag("resizeUpIterationMax")
public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
.name("autoscaler.resizeUpIterationMax")
@@ -297,6 +299,7 @@
.description("Maximum change to the size on a single iteration when scaling up")
.reconfigurable(true)
.build();
+
@SetFromFlag("resizeDownIterationIncrement")
public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
.name("autoscaler.resizeDownIterationIncrement")
@@ -304,6 +307,7 @@
.defaultValue(1)
.reconfigurable(true)
.build();
+
@SetFromFlag("resizeDownIterationMax")
public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
.name("autoscaler.resizeDownIterationMax")
@@ -346,6 +350,12 @@
.reconfigurable(true)
.build();
+ public static final ConfigKey<Integer> INSUFFICIENT_CAPACITY_HIGH_WATER_MARK = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.insufficientCapacityHighWaterMark")
+ .defaultValue(null)
+ .reconfigurable(true)
+ .build();
+
@SetFromFlag("resizeOperator")
public static final ConfigKey<ResizeOperator> RESIZE_OPERATOR = BasicConfigKey.builder(ResizeOperator.class)
.name("autoscaler.resizeOperator")
@@ -564,6 +574,10 @@
return getConfig(MAX_POOL_SIZE);
}
+ private Integer getInsufficientCapacityHighWaterMark() {
+ return getConfig(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK);
+ }
+
private ResizeOperator getResizeOperator() {
return getConfig(RESIZE_OPERATOR);
}
@@ -620,6 +634,14 @@
throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
}
onPoolSizeLimitsChanged(getConfig(MIN_POOL_SIZE), newMax);
+ } else if (key.equals(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK)) {
+ Integer newVal = (Integer) val;
+ Integer oldVal = config().get(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK);
+ if (oldVal != null && (newVal == null || newVal > oldVal)) {
+ LOG.info("{} resetting {} to {}, which will enable resizing above previous level of {}",
+ new Object[] {AutoScalerPolicy.this, INSUFFICIENT_CAPACITY_HIGH_WATER_MARK.getName(), newVal, oldVal});
+ // TODO see above about changing metricLowerBound; not triggering resize now
+ }
} else {
throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this);
}
@@ -848,9 +870,17 @@
onNewUnboundedPoolSize(desiredSizeUnconstrained);
}
+ private int applyMinMaxConstraints(long desiredSize) {
+ return applyMinMaxConstraints(desiredSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)desiredSize);
+ }
+
private int applyMinMaxConstraints(int desiredSize) {
- desiredSize = Math.max(getMinPoolSize(), desiredSize);
- desiredSize = Math.min(getMaxPoolSize(), desiredSize);
+ int minSize = getMinPoolSize();
+ int maxSize = getMaxPoolSize();
+ Integer insufficientCapacityHighWaterMark = getInsufficientCapacityHighWaterMark();
+ desiredSize = Math.max(minSize, desiredSize);
+ desiredSize = Math.min(maxSize, desiredSize);
+ if (insufficientCapacityHighWaterMark != null) desiredSize = Math.min(insufficientCapacityHighWaterMark, desiredSize);
return desiredSize;
}
@@ -989,36 +1019,47 @@
}
private void resizeNow() {
- long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+ final int currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
- final long desiredPoolSize = calculatedDesiredPoolSize.size;
+ long desiredPoolSize = calculatedDesiredPoolSize.size;
boolean stable = calculatedDesiredPoolSize.stable;
+ final int targetPoolSize = applyMinMaxConstraints(desiredPoolSize);
+
if (!stable) {
// the desired size fluctuations are not stable; ensure we check again later (due to time-window)
// even if no additional events have been received
// (note we continue now with as "good" a resize as we can given the instability)
if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...",
- new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+ new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
scheduleResize();
}
- if (currentPoolSize == desiredPoolSize) {
+ if (currentPoolSize == targetPoolSize) {
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}",
- new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+ new Object[] {this, poolEntity, currentPoolSize, targetPoolSize});
return;
}
if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}",
- new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
+ new Object[] {this, targetPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
Entities.submit(entity, Tasks.<Void>builder().displayName("Auto-scaler")
- .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize)
+ .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+targetPoolSize)
.tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
.body(new Callable<Void>() {
@Override
public Void call() throws Exception {
// TODO Should we use int throughout, rather than casting here?
- getResizeOperator().resize(poolEntity, (int) desiredPoolSize);
+ try {
+ getResizeOperator().resize(poolEntity, (int) targetPoolSize);
+ } catch (Resizable.InsufficientCapacityException e) {
+ // cannot resize beyond this; set the high-water mark
+ int insufficientCapacityHighWaterMark = getCurrentSizeOperator().apply(poolEntity);
+ LOG.warn("{} failed to resize {} due to insufficient capacity; setting high-water mark to {}, "
+ + "and will not attempt to resize above that level again",
+ new Object[] {AutoScalerPolicy.this, poolEntity, insufficientCapacityHighWaterMark});
+ config().set(INSUFFICIENT_CAPACITY_HIGH_WATER_MARK, insufficientCapacityHighWaterMark);
+ }
return null;
}
}).build())
diff --git a/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java b/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
index e04f714..ad67b75 100644
--- a/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
+++ b/brooklyn-server/policy/src/test/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicyMetricTest.java
@@ -36,17 +36,18 @@
import org.apache.brooklyn.core.test.entity.TestCluster;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
public class AutoScalerPolicyMetricTest {
- private static long TIMEOUT_MS = 10000;
- private static long SHORT_WAIT_MS = 50;
+ private static long SHORT_WAIT_MS = 250;
private static final AttributeSensor<Integer> MY_ATTRIBUTE = Sensors.newIntegerSensor("autoscaler.test.intAttrib");
TestApplication app;
@@ -75,7 +76,7 @@
Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 1));
tc.sensors().set(MY_ATTRIBUTE, 101);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
}
@Test
@@ -89,7 +90,7 @@
Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
tc.sensors().set(MY_ATTRIBUTE, 49);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 1));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 1));
}
@Test(groups="Integration")
@@ -101,11 +102,11 @@
// workload 200 so requires doubling size to 10 to handle: (200*5)/100 = 10
tc.sensors().set(MY_ATTRIBUTE, 200);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 10));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 10));
// workload 5, requires 1 entity: (10*110)/100 = 11
tc.sensors().set(MY_ATTRIBUTE, 110);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 11));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 11));
}
@Test(groups="Integration")
@@ -117,14 +118,14 @@
// workload can be handled by 4 servers, within its valid range: (49*5)/50 = 4.9
tc.sensors().set(MY_ATTRIBUTE, 49);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 4));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 4));
// workload can be handled by 4 servers, within its valid range: (25*4)/50 = 2
tc.sensors().set(MY_ATTRIBUTE, 25);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
tc.sensors().set(MY_ATTRIBUTE, 0);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 1));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 1));
}
@Test(groups="Integration")
@@ -139,11 +140,11 @@
// Decreases to min-size only
tc.sensors().set(MY_ATTRIBUTE, 0);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
// Increases to max-size only
tc.sensors().set(MY_ATTRIBUTE, 100000);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 6));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 6));
}
@Test(groups="Integration",invocationCount=20)
@@ -167,14 +168,14 @@
// workload can be handled by 6 servers, so no need to notify: 6 <= (100*6)/50
tc.sensors().set(MY_ATTRIBUTE, 600);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 6));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 6));
assertTrue(maxReachedEvents.isEmpty());
// Increases to above max capacity: would require (100000*6)/100 = 6000
tc.sensors().set(MY_ATTRIBUTE, 100000);
// Assert our listener gets notified (once)
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), new Runnable() {
+ Asserts.succeedsEventually(new Runnable() {
public void run() {
assertEquals(maxReachedEvents.size(), 1);
assertEquals(maxReachedEvents.get(0).getMaxAllowed(), 6);
@@ -245,7 +246,7 @@
policy.suspend();
policy.resume();
tc.sensors().set(MY_ATTRIBUTE, 101);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
}
@Test
@@ -268,6 +269,84 @@
// Then confirm we listen to the correct "entityWithMetric"
entityWithMetric.sensors().set(TestEntity.SEQUENCE, 101);
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(tc, 2));
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+ }
+
+ @Test(groups="Integration")
+ public void testOnFailedGrowWillSetHighwaterMarkAndNotResizeAboveThatAgain() {
+ tc = app.createAndManageChild(EntitySpec.create(TestCluster.class)
+ .configure("initialSize", 0)
+ .configure(TestCluster.MAX_SIZE, 2));
+
+ tc.resize(1);
+
+ tc.policies().add(AutoScalerPolicy.builder()
+ .metric(MY_ATTRIBUTE)
+ .metricLowerBound(50)
+ .metricUpperBound(100)
+ .buildSpec());
+
+ // workload 200 so requires doubling size to 2 to handle: (200*1)/100 = 2
+ tc.sensors().set(MY_ATTRIBUTE, 200);
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2), "desired="+tc.getDesiredSizeHistory());
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+ tc.sensors().set(MY_ATTRIBUTE, 100);
+
+ // workload 110, requires 1 more entity: (2*110)/100 = 2.1
+ // But max size is 2, so resize will fail.
+ tc.sensors().set(MY_ATTRIBUTE, 110);
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory()); // TODO succeeds eventually?
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+
+ // another attempt to resize will not cause it to ask
+ tc.sensors().set(MY_ATTRIBUTE, 110);
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory());
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
+
+ // but if we shrink down again, then we'll still be able to go up to the previous level.
+ // We'll only try to go as high as the previous high-water mark though.
+ tc.sensors().set(MY_ATTRIBUTE, 1);
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 1));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3, 1), "desired="+tc.getDesiredSizeHistory());
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2, 1), "sizes="+tc.getSizeHistory());
+
+ tc.sensors().set(MY_ATTRIBUTE, 10000);
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", SHORT_WAIT_MS), currentSizeAsserter(tc, 2));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3, 1, 2), "desired="+tc.getDesiredSizeHistory());
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2, 1, 2), "sizes="+tc.getSizeHistory());
+ }
+
+ // When there is a resizeUpStabilizationDelay, it remembers all the previously requested sizes (in the recent history)
+ // and then looks at those in the stabilization-delay to determine the sustained desired. This test checks that
+ // we apply the highwater-mark even when the desired size had been recorded prior to the highwater mark being
+ // discovered.
+ @Test(groups="Integration")
+ public void testOnFailedGrowWithStabilizationDelayWillSetHighwaterMarkAndNotResizeAboveThatAgain() throws Exception {
+ tc = app.createAndManageChild(EntitySpec.create(TestCluster.class)
+ .configure("initialSize", 0)
+ .configure(TestCluster.MAX_SIZE, 2));
+
+ tc.resize(1);
+
+ tc.policies().add(AutoScalerPolicy.builder()
+ .metric(MY_ATTRIBUTE)
+ .metricLowerBound(50)
+ .metricUpperBound(100)
+ .resizeUpStabilizationDelay(Duration.ONE_SECOND)
+ .buildSpec());
+
+ // workload 200 so requires doubling size to 2 to handle: (200*1)/100 = 2
+ for (int i = 0; i < 10; i++) {
+ tc.sensors().set(MY_ATTRIBUTE, 200 + (i*100));
+ Thread.sleep(100);
+ }
+
+ Asserts.succeedsEventually(currentSizeAsserter(tc, 2));
+ Asserts.succeedsContinually(currentSizeAsserter(tc, 2));
+ assertEquals(tc.getDesiredSizeHistory(), ImmutableList.of(1, 2, 3), "desired="+tc.getDesiredSizeHistory());
+ assertEquals(tc.getSizeHistory(), ImmutableList.of(0, 1, 2), "sizes="+tc.getSizeHistory());
}
}
diff --git a/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java b/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java
new file mode 100644
index 0000000..77175d2
--- /dev/null
+++ b/brooklyn-server/software/base/src/test/java/org/apache/brooklyn/entity/software/base/test/autoscaling/AutoScalerPolicyNoMoreMachinesTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.test.autoscaling;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.trait.Resizable;
+import org.apache.brooklyn.core.mgmt.internal.CollectionChangeListener;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestCluster;
+import org.apache.brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess;
+import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+public class AutoScalerPolicyNoMoreMachinesTest extends BrooklynAppUnitTestSupport {
+
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(AutoScalerPolicyNoMoreMachinesTest.class);
+
+ private static long SHORT_WAIT_MS = 250;
+
+ DynamicCluster cluster;
+ Location loc;
+ AutoScalerPolicy policy;
+ Set<Entity> entitiesAdded;
+ Set<Entity> entitiesRemoved;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+ cluster = app.createAndManageChild(EntitySpec.create(DynamicCluster.class)
+ .configure(TestCluster.INITIAL_SIZE, 0)
+ .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(EmptySoftwareProcess.class)
+ .configure(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION, true)));
+ loc = mgmt.getLocationRegistry().resolve("byon(hosts='1.1.1.1,1.1.1.2')");
+ app.start(ImmutableList.of(loc));
+
+ entitiesAdded = Sets.newLinkedHashSet();
+ entitiesRemoved = Sets.newLinkedHashSet();
+ mgmt.addEntitySetListener(new CollectionChangeListener<Entity>() {
+ @Override public void onItemAdded(Entity item) {
+ entitiesAdded.add(item);
+ }
+ @Override public void onItemRemoved(Entity item) {
+ entitiesRemoved.add(item);
+ }});
+ }
+
+ @Test
+ public void testResizeDirectly() throws Exception {
+ assertSize(0);
+
+ cluster.resize(2);
+ assertSize(2);
+
+ // Won't get a location to successfully resize (byon location only has 2 machines);
+ // so still left with 2 members (failed node not quarantined, because exception well understood)
+ try {
+ cluster.resize(3);
+ Asserts.shouldHaveFailedPreviously();
+ } catch (Exception e) {
+ Asserts.expectedFailureOfType(e, Resizable.InsufficientCapacityException.class);
+ }
+ assertSize(2, 0, 1);
+
+ // Resize down; will delete one of our nodes
+ cluster.resize(1);
+ assertSize(1, 0, 2);
+
+ // Resize back up to 2 should be allowed
+ cluster.resize(2);
+ assertSize(2, 0, 2);
+ }
+
+ @Test
+ public void testPoolHotSensorResizingBeyondMaxMachines() throws Exception {
+ cluster.resize(1);
+ policy = cluster.policies().add(PolicySpec.create(AutoScalerPolicy.class)
+ .configure(AutoScalerPolicy.MIN_PERIOD_BETWEEN_EXECS, Duration.millis(10)));
+
+ // Single node trying to handle a load of 21; too high, so will add one more node
+ cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L));
+ assertSizeEventually(2);
+
+ // Two nodes handing an aggregated load of 41; too high for 2 nodes so tries to scale to 3.
+ // But byon location only has 2 nodes so will fail.
+ cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L));
+ assertSizeEventually(2, 0, 1);
+
+ // Should not repeatedly retry
+ assertSizeContinually(2, 0, 1);
+
+ // If there is another indication of too much load, should not retry yet again.
+ cluster.sensors().emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(42L, 10L, 20L));
+ assertSizeContinually(2, 0, 1);
+ }
+
+ @Test
+ public void testMetricResizingBeyondMaxMachines() throws Exception {
+ AttributeSensor<Integer> metric = Sensors.newIntegerSensor("test.aggregatedLoad");
+
+ cluster.resize(1);
+ policy = cluster.policies().add(PolicySpec.create(AutoScalerPolicy.class)
+ .configure(AutoScalerPolicy.METRIC, metric)
+ .configure(AutoScalerPolicy.METRIC_LOWER_BOUND, 10)
+ .configure(AutoScalerPolicy.METRIC_UPPER_BOUND, 20)
+ .configure(AutoScalerPolicy.MIN_PERIOD_BETWEEN_EXECS, Duration.millis(10)));
+
+ // Single node trying to handle a load of 21; too high, so will add one more node.
+ // That takes the load back to within acceptable limits
+ cluster.sensors().set(metric, 21);
+ assertSizeEventually(2);
+ cluster.sensors().set(metric, 19);
+
+ // With two nodes, load is now too high, so will try (and fail) to add one more node.
+ // Trigger another attempt to resize.
+ // Any nodes that fail with NoMachinesAvailableException will be immediately deleted.
+ cluster.sensors().set(metric, 22);
+ assertSizeEventually(2, 0, 1);
+ assertSizeContinually(2, 0, 1);
+
+ // Metric is re-published; should not keep retrying
+ cluster.sensors().set(metric, 21);
+ assertSizeContinually(2, 0, 1);
+ }
+
+ protected Map<String, Object> message(double currentWorkrate, double lowThreshold, double highThreshold) {
+ return message(cluster.getCurrentSize(), currentWorkrate, lowThreshold, highThreshold);
+ }
+
+ protected Map<String, Object> message(int currentSize, double currentWorkrate, double lowThreshold, double highThreshold) {
+ return ImmutableMap.<String,Object>of(
+ AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, currentSize,
+ AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, currentWorkrate,
+ AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, lowThreshold,
+ AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, highThreshold);
+ }
+
+ protected void assertSize(Integer targetSize) {
+ assertSize(targetSize, 0);
+ }
+
+ protected void assertSize(int targetSize, int quarantineSize, final int deletedSize) {
+ assertSize(targetSize, quarantineSize);
+ assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+ }
+
+ protected void assertSize(int targetSize, int quarantineSize) {
+ assertEquals(cluster.getCurrentSize(), (Integer) targetSize, "cluster.currentSize");
+ assertEquals(cluster.getMembers().size(), targetSize, "cluster.members.size");
+ assertEquals(cluster.sensors().get(DynamicCluster.QUARANTINE_GROUP).getMembers().size(), quarantineSize, "cluster.quarantine.size");
+ assertEquals(mgmt.getEntityManager().findEntities(Predicates.instanceOf(EmptySoftwareProcess.class)).size(), targetSize + quarantineSize, "instanceCount(EmptySoftwareProcess)");
+ }
+
+ protected void assertSizeEventually(int targetSize) {
+ assertSizeEventually(targetSize, 0, 0);
+ }
+
+ protected void assertSizeEventually(final int targetSize, final int quarantineSize, final int deletedSize) {
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ assertSize(targetSize, quarantineSize);
+ assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+ }});
+ }
+
+ protected void assertSizeContinually(final int targetSize, final int quarantineSize, final int deletedSize) {
+ Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() {
+ public void run() {
+ assertSize(targetSize, quarantineSize);
+ assertEquals(entitiesRemoved.size(), deletedSize, "removed="+entitiesRemoved);
+ }});
+ }
+}
diff --git a/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java b/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
index 2bd7b5c..15aa76e 100644
--- a/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
+++ b/brooklyn-server/utils/common/src/main/java/org/apache/brooklyn/test/Asserts.java
@@ -1115,22 +1115,29 @@
if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
}
- /** Tests that an exception is not {@link ShouldHaveFailedPreviouslyAssertionError}
- * and is one of the given types.
+ /**
+ * Tests that an exception is not {@link ShouldHaveFailedPreviouslyAssertionError}
+ * and is either one of the given types, or has a caused-by of one of the given types.
+ *
+ * If you want *just* the instanceof (without checking the caused-by), then just catch
+ * those exception types, treat that as success, and let any other exception be propagated.
*
* @return If the test is satisfied, this method returns normally.
* The caller can decide whether anything more should be done with the exception.
* If the test fails, then either it is propagated,
* if the {@link Throwable} is a fatal ({@link Exceptions#propagateIfFatal(Throwable)}) other than an {@link AssertionError},
* or more usually the test failure of this method is thrown,
- * with detail of the original {@link Throwable} logged. */
+ * with detail of the original {@link Throwable} logged and included in the caused-by.
+ */
public static void expectedFailureOfType(Throwable e, Class<?> ...permittedSupertypes) {
if (e instanceof ShouldHaveFailedPreviouslyAssertionError) throw (Error)e;
- for (Class<?> t: permittedSupertypes) {
- if (t.isInstance(e)) return;
+ for (Class<?> clazz: permittedSupertypes) {
+ @SuppressWarnings("unchecked")
+ Throwable match = Exceptions.getFirstThrowableOfType(e, (Class<? extends Throwable>)clazz);
+ if (match != null) return;
}
rethrowPreferredException(e,
- new AssertionError("Error "+JavaClassNames.simpleClassName(e)+" is not any of the expected types: " + Arrays.asList(permittedSupertypes)));
+ new AssertionError("Error "+JavaClassNames.simpleClassName(e)+" is not any of the expected types: " + Arrays.asList(permittedSupertypes), e));
}
/** Tests {@link #expectedFailure(Throwable)} and that the <code>toString</code>