blob: 03f335eefa0ea8ea85c9a20cee5c4aa9e19d2d59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.messaging.storm;
import static java.lang.String.format;
import java.util.List;
import java.util.Map;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.location.Machines;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver;
import org.apache.brooklyn.entity.software.base.SoftwareProcess;
import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
import org.apache.brooklyn.location.ssh.SshMachineLocation;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.net.Networking;
import org.apache.brooklyn.util.os.Os;
import org.apache.brooklyn.util.ssh.BashCommands;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver {
private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class);
public StormSshDriver(EntityLocal entity, SshMachineLocation machine) {
super(entity, machine);
}
public String getRoleName() {
return entity.getConfig(Storm.ROLE).name().toLowerCase();
}
public String getZeromqVersion() {
return entity.getConfig(Storm.ZEROMQ_VERSION);
}
public String getLocalDir() {
return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm"));
}
public String getNimbusHostname() {
String result = entity.getConfig(Storm.NIMBUS_HOSTNAME);
if (result != null) return result;
Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
if (nimbus == null) {
log.warn("No nimbus hostname available; using 'localhost'");
return "localhost";
}
return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked();
}
public Integer getUiPort() {
return entity.getAttribute(Storm.UI_PORT);
}
public Map<String, Integer> getPortMap() {
return MutableMap.of("uiPort", getUiPort());
}
@Override
protected List<String> getCustomJavaConfigOptions() {
List<String> result = super.getCustomJavaConfigOptions();
if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) {
result.add("-verbose:gc");
result.add("-XX:+PrintGCTimeStamps");
result.add("-XX:+PrintGCDetails");
}
if ("ui".equals(getRoleName())) {
result.add("-Xmx768m");
}
return result;
}
@Override
public String getJvmOptsLine() {
return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or("");
}
public List<String> getZookeeperServers() {
ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE);
Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
return supplier.get();
}
public String getStormConfigTemplateUrl() {
return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL);
}
@Override
public void install() {
List<String> urls = resolver.getTargets();
String saveAs = resolver.getFilename();
ImmutableList.Builder<String> commands= ImmutableList.<String> builder();
if (!getLocation().getOsDetails().isMac()) {
commands.add(BashCommands.installPackage(ImmutableMap.of(
"yum", "libuuid-devel",
"apt", "build-essential uuid-dev pkg-config libtool automake"),
"libuuid-devel"));
commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'")));
}
commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git"))
.add(BashCommands.INSTALL_UNZIP)
.addAll(installNativeDependencies())
.addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
.add("unzip " + saveAs)
.add("mkdir -p " + getLocalDir())
.add("chmod 777 " + getLocalDir()); // FIXME
newScript(INSTALLING)
.body.append(commands.build())
.gatherOutput()
.execute();
}
public String getPidFile() {
return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName()));
}
@Override
protected String getLogFileLocation() {
return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName()));
}
@Override
public void launch() {
boolean needsSleep = false;
if (getRoleName().equals("supervisor")) {
Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
if (nimbus == null) {
log.warn("No nimbus entity available; not blocking before starting supervisors");
} else {
Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT));
needsSleep = true;
}
}
String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) "
+ getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")");
// ensure only one node at a time tries to start
// attempting to eliminate the causes of:
// 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor
// org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments
// TODO use SoftwareProcess#START_LATCH instead here?
Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object());
synchronized (startMutex) {
if (needsSleep) {
// give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise
// (this could be optimized by recording nimbus service_up time)
Time.sleep(Duration.TEN_SECONDS);
}
newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
.body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation()))
.execute();
}
}
@Override
public boolean isRunning() {
return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
}
@Override
public void stop() {
newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
}
@Override
public void customize() {
log.debug("Customizing {}", entity);
Networking.checkPortsValid(getPortMap());
newScript(CUSTOMIZING)
.body.append(format("cp -R %s/* .", getExpandedInstallDir()))
.execute();
String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml");
copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile);
}
protected List<String> installNativeDependencies() {
String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion());
String targz = format("zeromq-%s.tar.gz", getZeromqVersion());
String jzmq = "https://github.com/nathanmarz/jzmq.git";
ImmutableList.Builder<String> commands = ImmutableList.<String>builder();
if (getLocation().getOsDetails().isMac()) {
commands.add("export PATH=$PATH:/usr/local/bin")
.add("export JAVA_HOME=$(/usr/libexec/java_home)")
.add("cd " + getInstallDir())
.add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make"))
.add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool"))
.add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config"))
.add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq"))
.add("git clone https://github.com/asmaier/jzmq")
.add("cd jzmq")
.add("./autogen.sh")
.add("./configure")
.add("make")
.add((BashCommands.sudo("make install")))
.add("cd " + getInstallDir());
} else {
commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java")
.add("cd " + getInstallDir())
.add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz))
.add("tar xzf " + targz)
.add(format("cd zeromq-%s", getZeromqVersion()))
.add("./configure")
.add("make")
.add((BashCommands.sudo("make install")))
// install jzmq
.add("cd " + getInstallDir())
.add("git clone " + jzmq)
.add("cd jzmq")
.add("./autogen.sh")
.add("./configure")
// hack needed on ubuntu 12.04; ignore if it fails
// see https://github.com/zeromq/jzmq/issues/114
.add(BashCommands.ok(
"pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH "
+ "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java"))
.add(BashCommands.ok("popd"))
.add("make")
.add((BashCommands.sudo("make install")))
.add("cd " + getInstallDir());
}
return commands.build();
}
}