blob: e712791db0757768f1c3407235efc6f3f1bae77d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.messaging.storm;
import static org.apache.brooklyn.core.sensor.DependentConfiguration.attributeWhenReady;
import static org.apache.brooklyn.entity.messaging.storm.Storm.NIMBUS_HOSTNAME;
import static org.apache.brooklyn.entity.messaging.storm.Storm.ZOOKEEPER_ENSEMBLE;
import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS;
import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR;
import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI;
import java.io.File;
import java.util.Map;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.messaging.storm.topologies.ExclamationBolt;
import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.ResourceUtils;
import org.apache.brooklyn.util.core.file.ArchiveBuilder;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport {
protected static final Logger log = LoggerFactory
.getLogger(StormAbstractCloudLiveTest.class);
private Location location;
private ZooKeeperEnsemble zooKeeperEnsemble;
private Storm nimbus;
private Storm supervisor;
private Storm ui;
@BeforeMethod(alwaysRun = true)
@Override
public void setUp() throws Exception {
super.setUp();
location = mgmt.getLocationRegistry().getLocationManaged(getLocation(), getFlags());
}
public abstract String getLocation();
public Map<String, ?> getFlags() {
return MutableMap.of();
}
@Test(groups = {"Live","WIP"}) // needs repair to avoid hard dependency on Andrea's environment
public void deployStorm() throws Exception {
try {
zooKeeperEnsemble = app.createAndManageChild(EntitySpec.create(
ZooKeeperEnsemble.class).configure(
ZooKeeperEnsemble.INITIAL_SIZE, 3));
nimbus = app.createAndManageChild(EntitySpec
.create(Storm.class)
.configure(Storm.ROLE, NIMBUS)
.configure(NIMBUS_HOSTNAME, "localhost")
.configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
);
supervisor = app.createAndManageChild(EntitySpec
.create(Storm.class)
.configure(Storm.ROLE, SUPERVISOR)
.configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
.configure(NIMBUS_HOSTNAME,
attributeWhenReady(nimbus, Attributes.HOSTNAME)));
ui = app.createAndManageChild(EntitySpec
.create(Storm.class)
.configure(Storm.ROLE, UI)
.configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
.configure(NIMBUS_HOSTNAME,
attributeWhenReady(nimbus, Attributes.HOSTNAME)));
log.info("Started Storm deployment on '" + getLocation() + "'");
app.start(ImmutableList.of(location));
Entities.dumpInfo(app);
EntityAsserts.assertAttributeEqualsEventually(app, Startable.SERVICE_UP, true);
EntityAsserts.assertAttributeEqualsEventually(zooKeeperEnsemble, Startable.SERVICE_UP, true);
EntityAsserts.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
EntityAsserts.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
EntityAsserts.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
StormTopology stormTopology = createTopology();
submitTopology(stormTopology, "myExclamation", 3, true, 60000);
} catch (Exception e) {
log.error("Failed to deploy Storm", e);
Assert.fail();
throw e;
}
}
private StormTopology createTopology()
throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
return builder.createTopology();
}
public boolean submitTopology(StormTopology stormTopology, String topologyName, int numOfWorkers, boolean debug, long timeoutMs) {
if (log.isDebugEnabled()) log.debug("Connecting to NimbusClient: {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME));
Config conf = new Config();
conf.setDebug(debug);
conf.setNumWorkers(numOfWorkers);
// TODO - confirm this creats the JAR correctly
String jar = createJar(
new File(Os.mergePaths(ResourceUtils.create(this).getClassLoaderDir(), "org/apache/brooklyn/entity/messaging/storm/topologies")),
"org/apache/brooklyn/entity/messaging/storm/");
System.setProperty("storm.jar", jar);
long startMs = System.currentTimeMillis();
long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
long currentTime = startMs;
Throwable lastError = null;
int attempt = 0;
while (currentTime <= endMs) {
currentTime = System.currentTimeMillis();
if (attempt != 0) Time.sleep(Duration.ONE_SECOND);
if (log.isTraceEnabled()) log.trace("trying connection to {} at time {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME), currentTime);
try {
StormSubmitter.submitTopology(topologyName, conf, stormTopology);
return true;
} catch (Exception e) {
if (shouldRetryOn(e)) {
if (log.isDebugEnabled()) log.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()});
lastError = e;
} else {
throw Throwables.propagate(e);
}
}
attempt++;
}
log.warn("unable to connect to Nimbus client: ", lastError);
Assert.fail();
return false;
}
private boolean shouldRetryOn(Exception e) {
if (e.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused")) return true;
return false;
}
private String createJar(File dir, String parentDirInJar) {
if (dir.isDirectory()) {
File jarFile = ArchiveBuilder.jar().addAt(dir, parentDirInJar).create(Os.newTempDir(getClass())+"/topologies.jar");
return jarFile.getAbsolutePath();
} else {
return dir.getAbsolutePath(); // An existing Jar archive?
}
}
}