Refactor LoadTest
- Add AbstractLoadTest (including small test to sanity-check that
test actually works!)
- Change to use yams-based blueprints
- Adds SimulatedVanillaSoftwareProcessImpl
- Adds SimulatedExternalMonitor (for setting sensors on entities)
diff --git a/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitor.java b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitor.java
new file mode 100644
index 0000000..3424842
--- /dev/null
+++ b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.qa.load;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.ImplementedBy;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.trait.Startable;
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+
+@ImplementedBy(SimulatedExternalMonitorImpl.class)
+public interface SimulatedExternalMonitor extends Entity, Startable {
+
+ @SuppressWarnings("serial")
+ ConfigKey<Predicate<? super Entity>> ENTITY_FILTER = ConfigKeys.newConfigKey(
+ new TypeToken<Predicate<? super Entity>>() {},
+ "entityFilter",
+ "Entities to set the sensors on",
+ Predicates.instanceOf(VanillaSoftwareProcess.class));
+
+ ConfigKey<Integer> NUM_SENSORS = ConfigKeys.newIntegerConfigKey(
+ "numSensors",
+ "Number of attribute sensors to set on each entity",
+ 1);
+
+ ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(
+ Duration.class,
+ "pollPeriod",
+ "Period for polling to get the sensors (delay between polls)",
+ Duration.ONE_SECOND);
+
+ AttributeSensor<Boolean> SERVICE_UP = Attributes.SERVICE_UP;
+}
diff --git a/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitorImpl.java b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitorImpl.java
new file mode 100644
index 0000000..a80c5a5
--- /dev/null
+++ b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedExternalMonitorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.qa.load;
+
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class SimulatedExternalMonitorImpl extends AbstractEntity implements SimulatedExternalMonitor {
+
+ private ScheduledExecutorService executor;
+ private Future<?> future;
+
+ @Override
+ public void rebind() {
+ super.rebind();
+ if (Boolean.TRUE.equals(sensors().get(SERVICE_UP))) {
+ startPolling();
+ }
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ if (Boolean.TRUE.equals(sensors().get(SERVICE_UP))) {
+ // already up; no-op
+ }
+ sensors().set(SERVICE_UP, true);
+ startPolling();
+ }
+
+ @Override
+ public void stop() {
+ sensors().set(SERVICE_UP, false);
+ stopPolling();
+ }
+
+ @Override
+ public void restart() {
+ stop();
+ start(ImmutableList.<Location>of());
+ }
+
+ protected void startPolling() {
+ Duration pollPeriod = config().get(POLL_PERIOD);
+ executor = Executors.newScheduledThreadPool(1);
+ executor.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ simulatePoll();
+ }},
+ 0, pollPeriod.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ protected void stopPolling() {
+ if (executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+ }
+
+ protected void simulatePoll() {
+ String val = "val-" + System.currentTimeMillis();
+ Predicate<? super Entity> filter = config().get(ENTITY_FILTER);
+ Integer numSensors = config().get(NUM_SENSORS);
+ Iterable<Entity> entities = Iterables.filter(getManagementContext().getEntityManager().getEntities(), filter);
+ for (Entity entity : entities) {
+ for (int i = 0; i < numSensors; i++) {
+ AttributeSensor<String> sensor = Sensors.newStringSensor("externalSensor"+i);
+ entity.sensors().set(sensor, val);
+ }
+ }
+ }
+}
diff --git a/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedTheeTierApp.java b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedTheeTierApp.java
index 0e41beb..b0178e5 100644
--- a/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedTheeTierApp.java
+++ b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedTheeTierApp.java
@@ -33,6 +33,7 @@
import org.apache.brooklyn.core.entity.StartableApplication;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.location.PortRanges;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.database.mysql.MySqlNode;
import org.apache.brooklyn.entity.group.DynamicCluster;
@@ -95,7 +96,8 @@
.configure(JavaWebAppService.ROOT_WAR, WAR_PATH)
.configure(JavaEntityMethods.javaSysProp("brooklyn.example.db.url"),
formatString("jdbc:%s%s?user=%s\\&password=%s",
- attributeWhenReady(mysql, MySqlNode.DATASTORE_URL), DB_TABLE, DB_USERNAME, DB_PASSWORD))
+ DependentConfiguration.builder().attributeWhenReady(mysql, MySqlNode.DATASTORE_URL).build(),
+ DB_TABLE, DB_USERNAME, DB_PASSWORD))
.configure(DynamicCluster.INITIAL_SIZE, 2)
.configure(WebAppService.ENABLED_PROTOCOLS, ImmutableSet.of(USE_HTTPS ? "https" : "http")) );
diff --git a/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedVanillaSoftwareProcessImpl.java b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedVanillaSoftwareProcessImpl.java
new file mode 100644
index 0000000..e5aea80
--- /dev/null
+++ b/qa/src/main/java/org/apache/brooklyn/qa/load/SimulatedVanillaSoftwareProcessImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.qa.load;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcessImpl;
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcessSshDriver;
+import org.apache.brooklyn.feed.function.FunctionFeed;
+import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.feed.http.HttpFeed;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.feed.http.HttpValueFunctions;
+import org.apache.brooklyn.location.ssh.SshMachineLocation;
+import org.apache.brooklyn.util.time.Duration;
+
+/**
+ * For simulating various aspects of the {@link VanillaSoftwareProcess} entity.
+ *
+ * It is assumed that the ssh commands for install, launch, etc will be written with testability in mind.
+ * For example, they might just be {@code echo} statements, because there is insufficient resources to
+ * run 100s of processes.
+ *
+ * It is thus possible to simulate aspects of the behaviour, for performance and load testing purposes.
+ *
+ * There is configuration for:
+ * <ul>
+ * <li>{@code skipSshOnStart}
+ * <ul>
+ * <li>If true, then no ssh commands will be executed at deploy-time.
+ * This is useful for speeding up load testing, to get to the desired number of entities.
+ * <li>If false, the ssh commands will be executed.
+ * </ul>
+ * <li>{@code simulateEntity}
+ * <ul>
+ * <li>if true, no underlying entity will be started. Instead a sleep 100000 job will be run and monitored.
+ * <li>if false, the underlying entity (i.e. a JBoss app-server) will be started as normal.
+ * </ul>
+ * <li>{@code simulateExternalMonitoring}
+ * <ul>
+ * <li>if true, disables the default monitoring mechanism. Instead, a function will periodically execute
+ * to set the entity's sensors (as though the values had been obtained from the external monitoring tool).
+ * <li>if false, then:
+ * <ul>
+ * <li>If {@code simulateEntity==true} it will execute comparable commands (e.g. execute a command of the same
+ * size over ssh or do a comparable number of http GET requests).
+ * <li>If {@code simulateEntity==false} then normal monitoring will be done.
+ * </ul>
+ * </ul>
+ * </ul>
+ */
+public class SimulatedVanillaSoftwareProcessImpl extends VanillaSoftwareProcessImpl {
+
+ public static final ConfigKey<Boolean> EXEC_SSH_ON_START = ConfigKeys.newBooleanConfigKey(
+ "execSshOnStart",
+ "If true, will execute the ssh commands on install/launch; if false, will skip them",
+ true);
+
+ public static final ConfigKey<URI> HTTP_FEED_URI = ConfigKeys.newConfigKey(
+ URI.class,
+ "httpFeed.uri",
+ "If non-null, the URI to poll periodically using a HttpFeed", null);
+
+ public static final ConfigKey<Duration> HTTP_FEED_POLL_PERIOD = ConfigKeys.newConfigKey(
+ Duration.class,
+ "httpFeed.pollPeriod",
+ "The poll priod for the HttpFeed (if 'httpFeed.uri' was non-null)",
+ Duration.ONE_SECOND);
+
+ public static final ConfigKey<Duration> FUNCTION_FEED_POLL_PERIOD = ConfigKeys.newConfigKey(
+ Duration.class,
+ "functionFeed.pollPeriod",
+ "The poll priod for a function that increments 'counter' periodically (if null, then no-op)",
+ Duration.ONE_SECOND);
+
+ // see SERVICE_PROCESS_IS_RUNNING_POLL_PERIOD
+ // Inspired by EmptySoftwareProcess.USE_SSH_MONITORING
+ public static final ConfigKey<Boolean> USE_SSH_MONITORING = ConfigKeys.newConfigKey(
+ "sshMonitoring.enabled",
+ "Whether to poll periodically over ssh, using the driver.isRunning check",
+ Boolean.TRUE);
+
+ private static final AttributeSensor<String> HTTP_STRING_ATTRIBUTE = Sensors.newStringSensor("httpStringAttribute");
+
+ private static final AttributeSensor<Integer> HTTP_INT_ATTRIBUTE = Sensors.newIntegerSensor("httpIntAttribute");
+
+ private static final AttributeSensor<Long> FUNCTION_COUNTER = Sensors.newLongSensor("functionCounter");
+
+ private FunctionFeed functionFeed;
+ private HttpFeed httpFeed;
+
+ @Override
+ public void init() {
+ super.init();
+ if (Boolean.FALSE.equals(config().get(EXEC_SSH_ON_START))) {
+ config().set(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION, true);
+ }
+ }
+
+ @Override
+ public Class<?> getDriverInterface() {
+ return SimulatedVanillaSoftwareProcessSshDriver.class;
+ }
+
+ @Override
+ protected void connectServiceUpIsRunning() {
+ boolean useSshMonitoring = Boolean.TRUE.equals(config().get(USE_SSH_MONITORING));
+ if (useSshMonitoring) {
+ super.connectServiceUpIsRunning();
+ }
+ }
+
+ @Override
+ protected void initEnrichers() {
+ super.initEnrichers();
+ }
+
+ @Override
+ protected void connectSensors() {
+ super.connectSensors();
+
+ boolean useSshMonitoring = Boolean.TRUE.equals(config().get(USE_SSH_MONITORING));
+ Duration functionFeedPeriod = config().get(FUNCTION_FEED_POLL_PERIOD);
+ URI httpFeedUri = config().get(HTTP_FEED_URI);
+
+ if (!useSshMonitoring) {
+ ServiceNotUpLogic.clearNotUpIndicator(this, SERVICE_PROCESS_IS_RUNNING);
+ }
+
+ if (functionFeedPeriod != null) {
+ functionFeed = feeds().add(FunctionFeed.builder()
+ .entity(this)
+ .period(functionFeedPeriod)
+ .poll(FunctionPollConfig.forSensor(FUNCTION_COUNTER)
+ .callable(new Callable<Long>() {
+ @Override public Long call() throws Exception {
+ Long oldVal = sensors().get(FUNCTION_COUNTER);
+ return (oldVal == null) ? 1 : oldVal + 1;
+ }
+ }))
+ .build());
+ }
+
+ if (httpFeedUri != null) {
+ httpFeed = feeds().add(HttpFeed.builder()
+ .entity(this)
+ .period(config().get(HTTP_FEED_POLL_PERIOD))
+ .baseUri(httpFeedUri)
+ .poll(new HttpPollConfig<Integer>(HTTP_INT_ATTRIBUTE)
+ .onSuccess(HttpValueFunctions.responseCode()))
+ .poll(new HttpPollConfig<String>(HTTP_STRING_ATTRIBUTE)
+ .onSuccess(HttpValueFunctions.stringContentsFunction()))
+ .build());
+ }
+ }
+
+ @Override
+ protected void disconnectSensors() {
+ super.disconnectSensors();
+ if (functionFeed != null) functionFeed.stop();
+ if (httpFeed != null) httpFeed.stop();
+ }
+
+ public static class SimulatedVanillaSoftwareProcessSshDriver extends VanillaSoftwareProcessSshDriver {
+ public SimulatedVanillaSoftwareProcessSshDriver(SimulatedVanillaSoftwareProcessImpl entity, SshMachineLocation machine) {
+ super(entity, machine);
+ }
+
+ @Override
+ public void install() {
+ if (Boolean.TRUE.equals(entity.getConfig(EXEC_SSH_ON_START))) {
+ super.install();
+ } else {
+ // no-op
+ }
+ }
+
+ @Override
+ public void customize() {
+ if (Boolean.TRUE.equals(entity.getConfig(EXEC_SSH_ON_START))) {
+ super.customize();
+ } else {
+ // no-op
+ }
+ }
+
+ @Override
+ public void launch() {
+ if (Boolean.TRUE.equals(entity.getConfig(EXEC_SSH_ON_START))) {
+ super.launch();
+ } else {
+ // no-op
+ }
+ }
+ }
+}
diff --git a/qa/src/test/java/org/apache/brooklyn/qa/load/AbstractLoadTest.java b/qa/src/test/java/org/apache/brooklyn/qa/load/AbstractLoadTest.java
new file mode 100644
index 0000000..c07f329
--- /dev/null
+++ b/qa/src/test/java/org/apache/brooklyn/qa/load/AbstractLoadTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.qa.load;
+
+import static org.testng.Assert.assertEquals;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
+import org.apache.brooklyn.camp.brooklyn.AbstractYamlTest;
+import org.apache.brooklyn.core.entity.StartableApplication;
+import org.apache.brooklyn.core.entity.trait.Startable;
+import org.apache.brooklyn.core.location.PortRanges;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.mgmt.persist.PersistMode;
+import org.apache.brooklyn.core.test.HttpService;
+import org.apache.brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.launcher.BrooklynLauncher;
+import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy;
+import org.apache.brooklyn.test.performance.PerformanceTestUtils;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Customers ask about the scalability of Brooklyn. These load tests investigate how many
+ * concurrent apps can be deployed and managed by a single Brooklyn management node.
+ *
+ * The apps are "simulated" in that they don't create the underlying resources
+ * (we are not checking if the test machine can run 100s of app-servers simultaneously!)
+ *
+ * See the configuration options on {@link SimulatedVanillaSoftwareProcessImpl}.
+ *
+ * The {@link SimulatedExternalMonitor} is used to simulate us not polling the entities directly
+ * (over ssh, http or whatever). Instead we simulate the metrics being retrieved from some external
+ * source, and injected directly into the entities by calling {@code sensors().set()}. For example,
+ * this could be collected from a Graphite server.
+ *
+ * If using {@link TestConfig#simulateExternalMonitor(Predicate, int, Duration)}, it will
+ * automatically turn off {@code useSshMonitoring}, {@code useHttpMonitoring} and
+ * {@code useFunctionMonitoring} for <em>all</em> entities (not just for those that match
+ * the predicate passed to simulateExternalMonitor).
+ */
+public class AbstractLoadTest extends AbstractYamlTest {
+
+ // TODO Could/should issue provisioning request through REST api, rather than programmatically;
+ // and poll to detect completion.
+
+ /*
+ * Useful commands when investigating:
+ * LOG_FILE=usage/qa/brooklyn-camp-tests.log
+ * grep -E "OutOfMemoryError|[P|p]rovisioning time|sleeping before|CPU fraction|LoadTest using" $LOG_FILE | less
+ * grep -E "OutOfMemoryError|[P|p]rovisioning time" $LOG_FILE; grep "CPU fraction" $LOG_FILE | tail -1; grep "LoadTest using" $LOG_FILE | tail -1
+ * grep -E "OutOfMemoryError|LoadTest using" $LOG_FILE
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractLoadTest.class);
+
+ private File persistenceDir;
+ private BrooklynLauncher launcher;
+ private ListeningExecutorService executor;
+ private Future<?> cpuFuture;
+
+ private Location localhost;
+
+ List<Duration> provisioningTimes;
+
+ private HttpService httpService;
+ private URI httpServiceUri;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() throws Exception {
+ super.setUp();
+
+ localhost = mgmt().getLocationRegistry().getLocationManaged("localhost");
+
+ provisioningTimes = Collections.synchronizedList(Lists.<Duration>newArrayList());
+
+ // Create executors
+ executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+
+ // Monitor utilisation (memory/CPU) while tests run
+ executor.submit(new Callable<Void>() {
+ public Void call() {
+ try {
+ mgmt().getExecutionManager(); // force GC to be instantiated
+ while (true) {
+ String usage = ((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString();
+ LOG.info("LoadTest using "+usage);
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // exit gracefully
+ } catch (Exception e) {
+ LOG.error("Error getting usage info", e);
+ }
+ return null;
+ }});
+
+ cpuFuture = PerformanceTestUtils.sampleProcessCpuTime(Duration.ONE_SECOND, "during LoadTest");
+
+ httpService = new HttpService(PortRanges.fromString("9000+"), true).start();
+ httpServiceUri = new URI(httpService.getUrl());
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() throws Exception {
+ try {
+ if (httpService != null) httpService.shutdown();
+ if (cpuFuture != null) cpuFuture.cancel(true);
+ if (executor != null) executor.shutdownNow();
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ @Override
+ protected ManagementContext setUpPlatform() {
+ // Create management node
+ persistenceDir = Files.createTempDir();
+ launcher = BrooklynLauncher.newInstance()
+ .persistMode(PersistMode.CLEAN)
+ .highAvailabilityMode(HighAvailabilityMode.MASTER)
+ .persistenceDir(persistenceDir)
+ .start();
+
+ String webServerUrl = launcher.getServerDetails().getWebServerUrl();
+ LOG.info("Brooklyn web-console running at " + webServerUrl);
+
+ return launcher.getServerDetails().getManagementContext();
+ }
+
+ @Override
+ protected void tearDownPlatform() {
+ if (launcher != null) launcher.terminate();
+ if (persistenceDir != null) Os.deleteRecursively(persistenceDir);
+ }
+
+ public static class TestConfig {
+ public int totalApps = 1;
+ public int numAppsPerBatch = 1;
+ public Duration sleepBetweenBatch = Duration.ZERO;
+
+ int clusterSize = 2;
+
+ boolean simulateExternalMonitor = false;
+ Predicate<? super Entity> externalMonitorFilter;
+ int externalMonitorNumSensors;
+ Duration externalMonitorPollPeriod;
+
+
+ boolean execSshOnStart = SimulatedVanillaSoftwareProcessImpl.EXEC_SSH_ON_START.getDefaultValue();
+ Duration functionFeedPollPeriod = SimulatedVanillaSoftwareProcessImpl.FUNCTION_FEED_POLL_PERIOD.getDefaultValue();
+ boolean useSshMonitoring = SimulatedVanillaSoftwareProcessImpl.USE_SSH_MONITORING.getDefaultValue();
+ Duration httpFeedPollPeriod = SimulatedVanillaSoftwareProcessImpl.HTTP_FEED_POLL_PERIOD.getDefaultValue();
+ URI httpFeedUri;
+
+ public TestConfig(AbstractLoadTest tester) {
+ httpFeedUri = tester.httpServiceUri;
+ }
+ public TestConfig simulateExternalMonitor(Predicate<? super Entity> filter, int numSensors, Duration pollPeriod) {
+ simulateExternalMonitor = true;
+ externalMonitorFilter = filter;
+ externalMonitorNumSensors = numSensors;
+ externalMonitorPollPeriod = pollPeriod;
+ useSshMonitoring(false);
+ useHttpMonitoring(false);
+ useFunctionMonitoring(false);
+ return this;
+ }
+ public TestConfig totalApps(int totalApps) {
+ return totalApps(totalApps, totalApps);
+ }
+ public TestConfig totalApps(int totalApps, int numAppsPerBatch) {
+ this.totalApps = totalApps;
+ this.numAppsPerBatch = numAppsPerBatch;
+ return this;
+ }
+ public TestConfig sleepBetweenBatch(Duration val) {
+ sleepBetweenBatch = val;
+ return this;
+ }
+ public TestConfig clusterSize(int val) {
+ clusterSize = val;
+ return this;
+ }
+ public TestConfig execSshOnStart(boolean val) {
+ execSshOnStart = val;
+ return this;
+ }
+ public TestConfig useSshMonitoring(boolean val) {
+ useSshMonitoring = val;
+ return this;
+ }
+ public TestConfig useHttpMonitoring(boolean val) {
+ if (val) {
+ if (httpFeedUri == null) {
+ throw new IllegalStateException("No HTTP URI; expected to be set by AbstractLoadTest.httpServiceUri");
+ }
+ } else {
+ httpFeedUri = null;
+ }
+ return this;
+ }
+ public TestConfig useFunctionMonitoring(boolean val) {
+ if (val) {
+ functionFeedPollPeriod = SimulatedVanillaSoftwareProcessImpl.FUNCTION_FEED_POLL_PERIOD.getDefaultValue();
+ } else {
+ functionFeedPollPeriod = null;
+ }
+ return this;
+ }
+ }
+
+ protected void runLocalhostManyApps(TestConfig config) throws Exception {
+ final int totalApps = config.totalApps;
+ final int numAppsPerBatch = config.numAppsPerBatch;
+ final int numCycles = (totalApps / numAppsPerBatch);
+ final Duration sleepBetweenBatch = config.sleepBetweenBatch;
+
+ int counter = 0;
+
+ if (config.simulateExternalMonitor) {
+ SimulatedExternalMonitor externalMonitor = mgmt().getEntityManager().createEntity(EntitySpec.create(SimulatedExternalMonitor.class)
+ .configure(SimulatedExternalMonitor.ENTITY_FILTER, config.externalMonitorFilter)
+ .configure(SimulatedExternalMonitor.NUM_SENSORS, config.externalMonitorNumSensors)
+ .configure(SimulatedExternalMonitor.POLL_PERIOD, config.externalMonitorPollPeriod));
+ externalMonitor.start(ImmutableList.<Location>of());
+ }
+ for (int i = 0; i < numCycles; i++) {
+ List<ListenableFuture<? extends Entity>> futures = Lists.newArrayList();
+ for (int j = 0; j < numAppsPerBatch; j++) {
+ String yamlApp = newYamlApp("Simulated App " + i, config);
+
+ ListenableFuture<? extends Entity> future = executor.submit(newProvisionAppTask(yamlApp));
+ futures.add(future);
+ counter++;
+ }
+
+ List<? extends Entity> apps = Futures.allAsList(futures).get();
+
+ for (Entity app : apps) {
+ assertEquals(app.getAttribute(Startable.SERVICE_UP), (Boolean)true);
+ }
+
+ synchronized (provisioningTimes) {
+ LOG.info("cycle="+i+"; numApps="+counter+": provisioning times: "+provisioningTimes);
+ provisioningTimes.clear();
+ }
+
+ LOG.info("cycle="+i+"; numApps="+counter+": sleeping for "+sleepBetweenBatch+" before next batch of apps");
+ Time.sleep(sleepBetweenBatch);
+ }
+ }
+
+ protected String newYamlApp(String appName, TestConfig config) {
+ return Joiner.on("\n").join(
+ "name: " + appName,
+ "location: localhost",
+
+ "services:",
+ "- type: " + DynamicCluster.class.getName(),
+ " id: cluster",
+ " brooklyn.config:",
+ " cluster.initial.size: " + config.clusterSize,
+ " memberSpec:",
+ " $brooklyn:entitySpec:",
+ " type: " + SimulatedVanillaSoftwareProcessImpl.class.getName(),
+ " brooklyn.config:",
+ " shell.env:",
+ " ENV1: val1",
+ " ENV2: val2",
+ " install.command: echo myInstallCommand",
+ " customize.command: echo myCustomizeCommand",
+ " launch.command: echo myLaunchCommand",
+ " checkRunning.command: echo myCheckRunningCommand",
+ " " + SimulatedVanillaSoftwareProcessImpl.EXEC_SSH_ON_START.getName() + ": " + config.execSshOnStart,
+ " " + SimulatedVanillaSoftwareProcessImpl.USE_SSH_MONITORING.getName() + ": " + config.useSshMonitoring,
+ " " + SimulatedVanillaSoftwareProcessImpl.FUNCTION_FEED_POLL_PERIOD.getName() + ": " + config.functionFeedPollPeriod,
+ " " + SimulatedVanillaSoftwareProcessImpl.HTTP_FEED_POLL_PERIOD.getName() + ": " + config.httpFeedPollPeriod,
+ " " + (config.httpFeedUri != null ? SimulatedVanillaSoftwareProcessImpl.HTTP_FEED_URI.getName() + ": " + config.httpFeedUri : ""),
+ " brooklyn.enrichers:",
+ " - type: org.apache.brooklyn.enricher.stock.Aggregator",
+ " brooklyn.config:",
+ " enricher.sourceSensor: counter",
+ " enricher.targetSensor: counter",
+ " transformation: sum",
+ " brooklyn.policies:",
+ " - type: " + AutoScalerPolicy.class.getName(),
+ " brooklyn.config:",
+ " metric: sensorDoesNotExist",
+ " metricLowerBound: 1",
+ " metricUpperBound: 3",
+ " minPoolSize: " + config.clusterSize,
+ " maxPoolSize: " + (config.clusterSize + 3));
+ }
+
+ protected Callable<Entity> newProvisionAppTask(final String yaml) {
+ return new Callable<Entity>() {
+ public Entity call() throws Exception {
+ try {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Entity app = createAndStartApplication(yaml);
+ Duration duration = Duration.of(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ LOG.info("Provisioning time: "+duration);
+ provisioningTimes.add(duration);
+
+ return app;
+ } catch (Throwable t) {
+ LOG.error("Error deploying app (rethrowing)", t);
+ throw Exceptions.propagate(t);
+ }
+ }
+ };
+ }
+
+ protected <T extends StartableApplication> Callable<T> newProvisionAppTask(final EntitySpec<T> appSpec) {
+ return new Callable<T>() {
+ public T call() {
+ try {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ T app = mgmt().getEntityManager().createEntity(appSpec);
+ app.start(ImmutableList.of(localhost));
+ Duration duration = Duration.of(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ LOG.info("Provisioning time: "+duration);
+ provisioningTimes.add(duration);
+
+ return app;
+ } catch (Throwable t) {
+ LOG.error("Error deploying app (rethrowing)", t);
+ throw Exceptions.propagate(t);
+ }
+ }
+ };
+ }
+}
diff --git a/qa/src/test/java/org/apache/brooklyn/qa/load/LoadSanityTest.java b/qa/src/test/java/org/apache/brooklyn/qa/load/LoadSanityTest.java
new file mode 100644
index 0000000..5f082f2
--- /dev/null
+++ b/qa/src/test/java/org/apache/brooklyn/qa/load/LoadSanityTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.qa.load;
+
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Predicates;
+
+/**
+ * A trivially small "load" test, which just checks that our test is actually working.
+ * It deploys just one app.
+ */
+public class LoadSanityTest extends AbstractLoadTest {
+
+ @Test(groups="Integration")
+ public void testApp() throws Exception {
+ super.runLocalhostManyApps(new TestConfig(this)
+ .execSshOnStart(true) // default is true, but be explicit
+ .useSshMonitoring(true) // default is true, but be explicit
+ .useHttpMonitoring(true) // default is true, but be explicit
+ .useFunctionMonitoring(true) // default is true, but be explicit
+ .totalApps(1));
+ }
+
+ @Test(groups="Integration")
+ public void testAppExternallyMonitored() throws Exception {
+ super.runLocalhostManyApps(new TestConfig(this)
+ .simulateExternalMonitor(Predicates.instanceOf(VanillaSoftwareProcess.class), 5, Duration.ONE_SECOND)
+ .useSshMonitoring(false)
+ .useHttpMonitoring(false)
+ .useFunctionMonitoring(false)
+ .totalApps(1));
+ }
+}
diff --git a/qa/src/test/java/org/apache/brooklyn/qa/load/LoadTest.java b/qa/src/test/java/org/apache/brooklyn/qa/load/LoadTest.java
index cc779a9..50ffb6b 100644
--- a/qa/src/test/java/org/apache/brooklyn/qa/load/LoadTest.java
+++ b/qa/src/test/java/org/apache/brooklyn/qa/load/LoadTest.java
@@ -18,135 +18,15 @@
*/
package org.apache.brooklyn.qa.load;
-import static org.testng.Assert.assertEquals;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.mgmt.ManagementContext;
-import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.StartableApplication;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
-import org.apache.brooklyn.core.mgmt.persist.PersistMode;
-import org.apache.brooklyn.launcher.BrooklynLauncher;
-import org.apache.brooklyn.test.PerformanceTestUtils;
-import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess;
+import org.apache.brooklyn.qa.load.AbstractLoadTest.TestConfig;
import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.base.Predicates;
-/**
- * Customers ask about the scalability of Brooklyn. These load tests investigate how many
- * concurrent apps can be deployed and managed by a single Brooklyn management node.
- *
- * The apps are "simulated" in that they don't create the underlying resources
- * (we are not checking if the test machine can run 100s of app-servers simultaneously!)
- * The install/customize/launch will instead execute ssh commands of comparable length,
- * but that just echo rather than execute the actual commands.
- *
- * "SIMULATE_EXTERNAL_MONITORING" means that we do not poll the entities directly (over ssh, http or
- * whatever). Instead we simulate the metrics being injected directly to be set on the entity (e.g.
- * having been collected from a Graphite server).
- *
- * "SKIP_SSH_ON_START" means don't do the normal install+customize+launch ssh commands. Instead, just
- * startup the entities so we can monitor their resource usage.
- */
-public class LoadTest {
+public class LoadTest extends AbstractLoadTest {
- // TODO Could/should issue provisioning request through REST api, rather than programmatically;
- // and poll to detect completion.
-
- /*
- * Useful commands when investigating:
- * LOG_FILE=usage/qa/brooklyn-camp-tests.log
- * grep -E "OutOfMemoryError|[P|p]rovisioning time|sleeping before|CPU fraction|LoadTest using" $LOG_FILE | less
- * grep -E "OutOfMemoryError|[P|p]rovisioning time" $LOG_FILE; grep "CPU fraction" $LOG_FILE | tail -1; grep "LoadTest using" $LOG_FILE | tail -1
- * grep -E "OutOfMemoryError|LoadTest using" $LOG_FILE
- */
- private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class);
-
- private File persistenceDir;
- private BrooklynLauncher launcher;
- private String webServerUrl;
- private ManagementContext managementContext;
- private ListeningExecutorService executor;
- private Future<?> cpuFuture;
-
- private Location localhost;
-
- List<Duration> provisioningTimes;
-
-
- @BeforeMethod(alwaysRun=true)
- public void setUp() throws Exception {
- // Create management node
- persistenceDir = Files.createTempDir();
- launcher = BrooklynLauncher.newInstance()
- .persistMode(PersistMode.CLEAN)
- .highAvailabilityMode(HighAvailabilityMode.MASTER)
- .persistenceDir(persistenceDir)
- .start();
- webServerUrl = launcher.getServerDetails().getWebServerUrl();
- managementContext = launcher.getServerDetails().getManagementContext();
-
- localhost = managementContext.getLocationRegistry().getLocationManaged("localhost");
-
- provisioningTimes = Collections.synchronizedList(Lists.<Duration>newArrayList());
-
- // Create executors
- executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
-
- // Monitor utilisation (memory/CPU) while tests run
- executor.submit(new Callable<Void>() {
- public Void call() {
- try {
- while (true) {
- managementContext.getExecutionManager(); // force GC to be instantiated
- String usage = ((LocalManagementContext)managementContext).getGarbageCollector().getUsageString();
- LOG.info("LoadTest using "+usage);
- Thread.sleep(1000);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // exit gracefully
- } catch (Exception e) {
- LOG.error("Error getting usage info", e);
- }
- return null;
- }});
-
- cpuFuture = PerformanceTestUtils.sampleProcessCpuTime(Duration.ONE_SECOND, "during testProvisionAppsConcurrently");
-
- }
-
- @AfterMethod(alwaysRun=true)
- public void tearDown() throws Exception {
- if (cpuFuture != null) cpuFuture.cancel(true);
- if (executor != null) executor.shutdownNow();
- if (launcher != null) launcher.terminate();
- if (persistenceDir != null) Os.deleteRecursively(persistenceDir);
- }
-
/**
* Creates multiple apps simultaneously.
*
@@ -158,84 +38,35 @@
* TODO Does not measure the cost of jclouds for creating all the VMs/containers.
*/
@Test(groups="Acceptance")
- public void testLocalhostProvisioningAppsConcurrently() throws Exception {
- final int NUM_CONCURRENT_APPS_PROVISIONING = 20;
-
- List<ListenableFuture<StartableApplication>> futures = Lists.newArrayList();
- for (int i = 0; i < NUM_CONCURRENT_APPS_PROVISIONING; i++) {
- ListenableFuture<StartableApplication> future = executor.submit(newProvisionAppTask(managementContext,
- EntitySpec.create(StartableApplication.class, SimulatedTheeTierApp.class)
- .configure(SimulatedTheeTierApp.SIMULATE_EXTERNAL_MONITORING, true)
- .displayName("Simulated app "+i)));
- futures.add(future);
- }
-
- List<StartableApplication> apps = Futures.allAsList(futures).get();
-
- for (StartableApplication app : apps) {
- assertEquals(app.getAttribute(Startable.SERVICE_UP), (Boolean)true);
- }
+ public void testProvisioningConcurrently() throws Exception {
+ // TODO Getting ssh error (SocketException: Connection reset) with 10 entities, if don't disable ssh-on-start.
+ // Will still execute checkRunning to wait for process to start (even if execSshOnStart is false).
+ super.runLocalhostManyApps(new TestConfig(this)
+ .useSshMonitoring(false)
+ .execSshOnStart(false) // getting ssh errors otherwise!
+ .totalApps(10));
}
-
+
/**
* Creates many apps, to monitor resource usage etc.
*
- * "SIMULATE_EXTERNAL_MONITORING" means that we do not poll the entities directly (over ssh, http or
- * whatever). Instead we simulate the metrics being injected directly to be set on the entity (e.g.
- * having been collected from a Graphite server).
- *
* Long-term target is 2500 VMs under management.
* Until we reach that point, we can partition the load across multiple (separate) brooklyn management nodes.
*/
@Test(groups="Acceptance")
- public void testLocalhostManyApps() throws Exception {
- final int NUM_APPS = 630; // target is 2500 VMs; each blueprint has 4 (rounding up)
+ public void testManyAppsExternallyMonitored() throws Exception {
+ // TODO Getting ssh error ("Server closed connection during identification exchange")
+ // with only two cycles (i.e. 20 entities).
+ //
+ // The ssh activity is from `SoftwareProcessImpl.waitForEntityStart`, which calls
+ // `VanillaSoftwareProcessSshDriver.isRunning`.
+ final int TOTAL_APPS = 600; // target is 2500 VMs; each blueprint has 2 VanillaSoftwareProcess
final int NUM_APPS_PER_BATCH = 10;
- final int SLEEP_BETWEEN_BATCHES = 10*1000;
- final boolean SKIP_SSH_ON_START = true; // getting ssh errors otherwise!
-
- int counter = 0;
-
- for (int i = 0; i < NUM_APPS / NUM_APPS_PER_BATCH; i++) {
- List<ListenableFuture<StartableApplication>> futures = Lists.newArrayList();
- for (int j = 0; j < NUM_APPS_PER_BATCH; j++) {
- ListenableFuture<StartableApplication> future = executor.submit(newProvisionAppTask(
- managementContext,
- EntitySpec.create(StartableApplication.class, SimulatedTheeTierApp.class)
- .configure(SimulatedTheeTierApp.SIMULATE_EXTERNAL_MONITORING, true)
- .configure(SimulatedTheeTierApp.SKIP_SSH_ON_START, SKIP_SSH_ON_START)
- .displayName("Simulated app "+(++counter))));
- futures.add(future);
- }
-
- List<StartableApplication> apps = Futures.allAsList(futures).get();
-
- for (StartableApplication app : apps) {
- assertEquals(app.getAttribute(Startable.SERVICE_UP), (Boolean)true);
- }
-
- synchronized (provisioningTimes) {
- LOG.info("cycle="+i+"; numApps="+counter+": provisioning times: "+provisioningTimes);
- provisioningTimes.clear();
- }
-
- LOG.info("cycle="+i+"; numApps="+counter+": sleeping before next batch of apps");
- Thread.sleep(SLEEP_BETWEEN_BATCHES);
- }
- }
-
- protected <T extends StartableApplication> Callable<T> newProvisionAppTask(final ManagementContext managementContext, final EntitySpec<T> appSpec) {
- return new Callable<T>() {
- public T call() {
- Stopwatch stopwatch = Stopwatch.createStarted();
- T app = managementContext.getEntityManager().createEntity(appSpec);
- app.start(ImmutableList.of(localhost));
- Duration duration = Duration.of(stopwatch.elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
- LOG.info("Provisioning time: "+duration);
- provisioningTimes.add(duration);
-
- return app;
- }
- };
+ super.runLocalhostManyApps(new TestConfig(this)
+ .execSshOnStart(false) // getting ssh errors otherwise!
+ .simulateExternalMonitor(Predicates.instanceOf(VanillaSoftwareProcess.class), 5, Duration.ONE_SECOND)
+ .clusterSize(2)
+ .totalApps(TOTAL_APPS, NUM_APPS_PER_BATCH)
+ .sleepBetweenBatch(Duration.TEN_SECONDS));
}
}