blob: 20b0b83516357c9ab3face6acd5dc40d1b764aee [file] [log] [blame]
package brooklyn.extras.whirr
import groovy.transform.InheritConstructors
import java.io.File
import java.net.InetAddress
import java.util.List
import org.apache.log4j.lf5.util.StreamUtils;
import org.apache.whirr.service.hadoop.HadoopCluster
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import brooklyn.config.BrooklynProperties
import brooklyn.entity.Entity
import brooklyn.entity.basic.AbstractApplication
import brooklyn.entity.basic.DynamicGroup
import brooklyn.entity.basic.Entities
import brooklyn.entity.trait.Startable
import brooklyn.entity.webapp.ControlledDynamicWebAppCluster;
import brooklyn.entity.webapp.DynamicWebAppCluster
import brooklyn.entity.webapp.jboss.JBoss7Server
import brooklyn.event.SensorEvent
import brooklyn.event.SensorEventListener
import brooklyn.event.adapter.HttpResponseContext
import brooklyn.extras.whirr.hadoop.WhirrHadoopCluster
import brooklyn.launcher.BrooklynLauncher
import brooklyn.location.Location
import brooklyn.location.basic.LocationRegistry
import brooklyn.location.basic.SshMachineLocation
import brooklyn.management.Task
import brooklyn.policy.ResizerPolicy
import brooklyn.policy.basic.AbstractPolicy
import brooklyn.util.CommandLineUtil
import brooklyn.util.ResourceUtils;
import com.google.common.base.Charsets
import com.google.common.collect.Iterables
import com.google.common.io.Files
/**
* Starts hadoop and a webapp using hadoop in the location supplied (just one location),
* configuring the webapp to connect to hadoop
*/
@InheritConstructors
public class WebClusterWithHadoopExample extends AbstractApplication {
private static final Logger log = LoggerFactory.getLogger(WebClusterWithHadoopExample.class);
static final List<String> DEFAULT_LOCATIONS = [
// note the 1d: we need our machines to be in the same availability zone
// so they can see each other on the internal IPs (where hadoop binds)
// (cross-site magic is shown in the hadoop fabric example)
// ((also note some availability zones don't work, that's amazon for you...))
// most other clouds "just work" :)
"aws-ec2:us-east-1d",
];
public static final String WAR_PATH = "classpath://hello-world-hadoop-webapp.war";
static BrooklynProperties config = BrooklynProperties.Factory.newDefault()
WhirrHadoopCluster hadoopCluster = new WhirrHadoopCluster(this, size: 2, memory: 2048, name: "Whirr Hadoop Cluster");
{
// specify hadoop version (1.0.2 has a nice, smaller hadoop client jar)
hadoopCluster.addRecipeLine("whirr.hadoop.version=1.0.2");
// for this example we'll allow access from anywhere
hadoopCluster.addRecipeLine("whirr.client-cidrs=0.0.0.0/0");
hadoopCluster.addRecipeLine("whirr.firewall-rules=8020,8021,50010");
}
ControlledDynamicWebAppCluster webCluster = new ControlledDynamicWebAppCluster(this, war: WAR_PATH);
{
webCluster.addPolicy(new ResizerPolicy(DynamicWebAppCluster.AVERAGE_REQUESTS_PER_SECOND).
setSizeRange(1, 5).
setMetricRange(10, 100));
}
DynamicGroup webVms = new DynamicGroup(this, name: "Web VMs", { it in JBoss7Server });
void start(Collection locations) {
Iterables.getOnlyElement(locations);
log.debug("starting "+this);
super.start(locations);
// start the web cluster, and hadoop cluster, in the single location (above)
// then register a policy to configure the appservers to use hadoop
log.debug("started "+this+", now starting policy");
PrepVmsForHadoop.newPolicyFromGroupToHadoop(webVms, hadoopCluster);
}
public static class PrepVmsForHadoop extends AbstractPolicy {
private static final Logger log = LoggerFactory.getLogger(WebClusterWithHadoopExample.class);
WhirrHadoopCluster hadoopCluster;
String hadoopSiteXmlContents;
Set<String> configuredIds = [];
public PrepVmsForHadoop(WhirrHadoopCluster hadoopCluster) {
this.hadoopCluster = hadoopCluster;
}
public void start() {
//read the contents, and disable socks proxy since we're in the same cluster
hadoopSiteXmlContents = new File("${System.getProperty('user.home')}/.whirr/"+
hadoopCluster.clusterSpec.clusterName+"/hadoop-site.xml").text
hadoopSiteXmlContents = hadoopSiteXmlContents.replaceAll("<\\?xml.*\\?>\\s*", "");
hadoopSiteXmlContents = hadoopSiteXmlContents.replaceAll("hadoop.socks.server", "ignore.hadoop.socks.server");
subscriptionTracker.subscribeToMembers(entity, Startable.SERVICE_UP,
{ SensorEvent evt ->
log.debug "hadoop set up policy recieved {}", evt
if (evt.value && !configuredIds.contains(evt.source.id))
setupMachine(evt.source)
} as SensorEventListener);
}
public void setupMachine(Entity e) {
try {
if (!e.getAttribute(Startable.SERVICE_UP)) return;
if (!configuredIds.add(e.id)) return;
if (log.isDebugEnabled()) log.debug "setting up machine for hadoop at {}", e
URL updateConfig = new URL(e.getAttribute(JBoss7Server.ROOT_URL)+
"configure.jsp?"+
"key=brooklyn.example.hadoop.site.xml.contents"+"&"+
"value="+URLEncoder.encode(hadoopSiteXmlContents));
def result = new HttpResponseContext(updateConfig.openConnection());
if (log.isDebugEnabled()) log.debug "http config update for {} got: {}", e, result.content
} catch (Exception err) {
log.warn "unable to configure {} for hadoop: {}", e, err
configuredIds.remove(e.id);
}
}
public static PrepVmsForHadoop newPolicyFromGroupToHadoop(DynamicGroup target, WhirrHadoopCluster hadoopCluster) {
log.debug "creating policy for hadoop clusters target {} hadoop ", target, hadoopCluster
PrepVmsForHadoop prepVmsForHadoop = new PrepVmsForHadoop(hadoopCluster);
target.addPolicy(prepVmsForHadoop);
prepVmsForHadoop.start();
log.debug "running policy over existing members {}", target.members
target.members.each { prepVmsForHadoop.setupMachine(it) }
return prepVmsForHadoop;
}
}
public static void main(String[] argv) {
ArrayList args = new ArrayList(Arrays.asList(argv));
int port = CommandLineUtil.getCommandLineOptionInt(args, "--port", 8081);
List<Location> locations = new LocationRegistry().getLocationsById(args ?: DEFAULT_LOCATIONS)
log.info("starting WebClusterWithHadoop, locations {}, mgmt on port {}", locations, port)
WebClusterWithHadoopExample app = new WebClusterWithHadoopExample(name: 'Brooklyn Global Web Fabric with Hadoop Example');
BrooklynLauncher.manage(app, port)
app.start(locations)
Entities.dumpInfo(app)
}
}