| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.cassandra; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.location.Location; |
| import org.apache.brooklyn.api.mgmt.TaskWrapper; |
| import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.entity.Entities; |
| import org.apache.brooklyn.core.location.Machines; |
| import org.apache.brooklyn.core.location.access.BrooklynAccessUtils; |
| import org.apache.brooklyn.core.sensor.DependentConfiguration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| |
| import org.apache.brooklyn.entity.database.DatastoreMixins; |
| import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver; |
| import org.apache.brooklyn.entity.java.UsesJmx; |
| import org.apache.brooklyn.location.ssh.SshMachineLocation; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.collections.MutableSet; |
| import org.apache.brooklyn.util.core.task.DynamicTasks; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; |
| import org.apache.brooklyn.util.core.text.TemplateProcessor; |
| import org.apache.brooklyn.util.exceptions.Exceptions; |
| import org.apache.brooklyn.util.guava.Maybe; |
| import org.apache.brooklyn.util.net.Networking; |
| import org.apache.brooklyn.util.os.Os; |
| import org.apache.brooklyn.util.ssh.BashCommands; |
| import org.apache.brooklyn.util.stream.Streams; |
| import org.apache.brooklyn.util.text.Identifiers; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.apache.brooklyn.util.time.Duration; |
| import org.apache.brooklyn.util.time.Time; |
| |
| /** |
| * Start a {@link CassandraNode} in a {@link Location} accessible over ssh. |
| */ |
| public class CassandraNodeSshDriver extends JavaSoftwareProcessSshDriver implements CassandraNodeDriver { |
| |
| private static final Logger log = LoggerFactory.getLogger(CassandraNodeSshDriver.class); |
| |
| protected Maybe<String> resolvedAddressCache = Maybe.absent(); |
| |
| public CassandraNodeSshDriver(CassandraNodeImpl entity, SshMachineLocation machine) { |
| super(entity, machine); |
| } |
| |
| @Override |
| protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(),"cassandra.log"); } |
| |
| @Override |
| public Integer getGossipPort() { return entity.getAttribute(CassandraNode.GOSSIP_PORT); } |
| |
| @Override |
| public Integer getSslGossipPort() { return entity.getAttribute(CassandraNode.SSL_GOSSIP_PORT); } |
| |
| @Override |
| public Integer getThriftPort() { return entity.getAttribute(CassandraNode.THRIFT_PORT); } |
| |
| @Override |
| public Integer getNativeTransportPort() { return entity.getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); } |
| |
| @Override |
| public String getClusterName() { return entity.getAttribute(CassandraNode.CLUSTER_NAME); } |
| |
| @Override |
| public String getCassandraConfigTemplateUrl() { |
| String templatedUrl = entity.getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL); |
| return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of()); |
| } |
| |
| @Override |
| public String getCassandraConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_CONFIG_FILE_NAME); } |
| |
| public String getEndpointSnitchName() { return entity.getConfig(CassandraNode.ENDPOINT_SNITCH_NAME); } |
| |
| public String getCassandraRackdcConfigTemplateUrl() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL); } |
| |
| public String getCassandraRackdcConfigFileName() { return entity.getConfig(CassandraNode.CASSANDRA_RACKDC_CONFIG_FILE_NAME); } |
| |
| public String getMirrorUrl() { return entity.getConfig(CassandraNode.MIRROR_URL); } |
| |
| protected boolean isV2() { |
| String version = getVersion(); |
| return version.startsWith("2."); |
| } |
| |
| @Override |
| public boolean installJava() { |
| if (isV2()) { |
| return checkForAndInstallJava("1.8"); |
| } else { |
| return super.installJava(); |
| } |
| } |
| |
| @Override |
| public void install() { |
| List<String> urls = resolver.getTargets(); |
| String saveAs = resolver.getFilename(); |
| |
| List<String> commands = ImmutableList.<String>builder() |
| .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) |
| .add(BashCommands.INSTALL_TAR) |
| .add("tar xzfv " + saveAs) |
| .build(); |
| |
| newScript(INSTALLING) |
| .body.append(commands) |
| .execute(); |
| } |
| |
| @Override |
| public Set<Integer> getPortsUsed() { |
| return ImmutableSet.<Integer>builder() |
| .addAll(super.getPortsUsed()) |
| .addAll(getPortMap().values()) |
| .build(); |
| } |
| |
| protected Map<String, Integer> getPortMap() { |
| return ImmutableMap.<String, Integer>builder() |
| .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT)) |
| .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT)) |
| .put("gossipPort", getGossipPort()) |
| .put("sslGossipPort", getSslGossipPort()) |
| .put("thriftPort", getThriftPort()) |
| .build(); |
| } |
| |
| @Override |
| public void customize() { |
| log.debug("Customizing {} (Cluster {})", entity, getClusterName()); |
| Networking.checkPortsValid(getPortMap()); |
| |
| customizeInitialSeeds(); |
| |
| String logFileEscaped = getLogFileLocation().replace("/", "\\/"); // escape slashes |
| |
| ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>() |
| .add(String.format("cp -R %s/{bin,conf,lib,interface,pylib,tools} .", getExpandedInstallDir())) |
| .add("mkdir -p data") |
| .add("mkdir -p brooklyn_commands") |
| .add(String.format("sed -i.bk 's/log4j.appender.R.File=.*/log4j.appender.R.File=%s/g' %s/conf/log4j-server.properties", logFileEscaped, getRunDir())) |
| .add(String.format("sed -i.bk '/JMX_PORT/d' %s/conf/cassandra-env.sh", getRunDir())) |
| // Script sets 180k on Linux which gives Java error: The stack size specified is too small, Specify at least 228k |
| .add(String.format("sed -i.bk 's/-Xss180k/-Xss280k/g' %s/conf/cassandra-env.sh", getRunDir())); |
| |
| newScript(CUSTOMIZING) |
| .body.append(commands.build()) |
| .failOnNonZeroResultCode() |
| .execute(); |
| |
| // Copy the cassandra.yaml configuration file across |
| String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraConfigFileName()); |
| copyTemplate(getCassandraConfigTemplateUrl(), destinationConfigFile); |
| |
| // Copy the cassandra-rackdc.properties configuration file across |
| String rackdcDestinationFile = Os.mergePathsUnix(getRunDir(), "conf", getCassandraRackdcConfigFileName()); |
| copyTemplate(getCassandraRackdcConfigTemplateUrl(), rackdcDestinationFile); |
| |
| customizeCopySnitch(); |
| } |
| |
| protected void customizeCopySnitch() { |
| // Copy the custom snitch jar file across |
| String customSnitchJarUrl = entity.getConfig(CassandraNode.CUSTOM_SNITCH_JAR_URL); |
| if (Strings.isNonBlank(customSnitchJarUrl)) { |
| int lastSlashIndex = customSnitchJarUrl.lastIndexOf("/"); |
| String customSnitchJarName = (lastSlashIndex > 0) ? customSnitchJarUrl.substring(lastSlashIndex+1) : "customBrooklynSnitch.jar"; |
| String jarDestinationFile = Os.mergePathsUnix(getRunDir(), "lib", customSnitchJarName); |
| InputStream customSnitchJarStream = checkNotNull(resource.getResourceFromUrl(customSnitchJarUrl), "%s could not be loaded", customSnitchJarUrl); |
| try { |
| getMachine().copyTo(customSnitchJarStream, jarDestinationFile); |
| } finally { |
| Streams.closeQuietly(customSnitchJarStream); |
| } |
| } |
| } |
| |
| protected void customizeInitialSeeds() { |
| if (entity.getConfig(CassandraNode.INITIAL_SEEDS)==null) { |
| if (isClustered()) { |
| entity.config().set(CassandraNode.INITIAL_SEEDS, |
| DependentConfiguration.attributeWhenReady(entity.getParent(), CassandraDatacenter.CURRENT_SEEDS)); |
| } else { |
| entity.config().set(CassandraNode.INITIAL_SEEDS, MutableSet.<Entity>of(entity)); |
| } |
| } |
| } |
| |
| @Override |
| public boolean isClustered() { |
| return entity.getParent() instanceof CassandraDatacenter; |
| } |
| |
| @Override |
| public void launch() { |
| String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get(); |
| Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS); |
| List<Entity> ancestors = getCassandraAncestors(); |
| log.info("Launching " + entity + ": " + |
| "cluster "+getClusterName()+", " + |
| "hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + |
| "hostname (subnet) " + subnetHostname + ", " + |
| "seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")"); |
| |
| boolean isFirst = seeds.iterator().next().equals(entity); |
| if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) { |
| // wait for the first node |
| long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady( |
| ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked(); |
| // optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST |
| Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() - System.currentTimeMillis()); |
| if (toWait.toMilliseconds()>0) { |
| log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements"); |
| Tasks.setBlockingDetails("Pausing to ensure first node has time to start"); |
| Time.sleep(toWait); |
| Tasks.resetBlockingDetails(); |
| } |
| } |
| |
| List<Entity> queuedStart = null; |
| if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) { |
| Entity root = ancestors.get(ancestors.size()-1); |
| // TODO currently use the class as a semaphore; messy, and obviously will not federate; |
| // should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation) |
| // and use it - note however the synch block is very very short so relatively safe at least |
| synchronized (CassandraNode.class) { |
| queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); |
| if (queuedStart==null) { |
| queuedStart = new ArrayList<Entity>(); |
| root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart); |
| } |
| queuedStart.add(getEntity()); |
| root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart); |
| } |
| do { |
| // get it again in case it is backed by something external |
| queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES); |
| if (queuedStart.get(0).equals(getEntity())) break; |
| synchronized (queuedStart) { |
| try { |
| queuedStart.wait(1000); |
| } catch (InterruptedException e) { |
| Exceptions.propagate(e); |
| } |
| } |
| } while (true); |
| |
| // TODO should look at last start time... but instead we always wait |
| CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked(); |
| } |
| |
| try { |
| // Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves. |
| newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING) |
| .body.append( |
| // log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement |
| // (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!) |
| "echo date on cassandra server `hostname` when launching is `date`", |
| launchEssentialCommand(), |
| "echo after essential command") |
| .execute(); |
| if (!isClustered()) { |
| InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity); |
| if (creationScript!=null) { |
| Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script"); |
| Time.sleep(Duration.seconds(20)); |
| Tasks.resetBlockingDetails(); |
| executeScriptAsync(Streams.readFullyStringAndClose(creationScript)); |
| } |
| } |
| if (isClustered() && isFirst) { |
| for (Entity ancestor: getCassandraAncestors()) { |
| ancestor.sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis()); |
| } |
| } |
| } finally { |
| if (queuedStart!=null) { |
| Entity head = queuedStart.remove(0); |
| checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity()); |
| synchronized (queuedStart) { |
| queuedStart.notifyAll(); |
| } |
| } |
| } |
| } |
| |
| /** returns cassandra-related ancestors (datacenter, fabric), with datacenter first and fabric last */ |
| protected List<Entity> getCassandraAncestors() { |
| List<Entity> result = new ArrayList<Entity>(); |
| Entity ancestor = getEntity().getParent(); |
| while (ancestor!=null) { |
| if (ancestor instanceof CassandraDatacenter || ancestor instanceof CassandraFabric) |
| result.add(ancestor); |
| ancestor = ancestor.getParent(); |
| } |
| return result; |
| } |
| |
| protected String launchEssentialCommand() { |
| if (isV2()) { |
| return String.format("./bin/cassandra -p %s > ./cassandra-console.log 2>&1", getPidFile()); |
| } else { |
| // TODO Could probably get rid of the nohup here, as script does equivalent itself |
| // with `exec ... <&- &` |
| return String.format("nohup ./bin/cassandra -p %s > ./cassandra-console.log 2>&1 &", getPidFile()); |
| } |
| } |
| |
| public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "cassandra.pid"); } |
| |
| @Override |
| public boolean isRunning() { |
| return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; |
| } |
| |
| @Override |
| public void stop() { |
| newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| protected Map<String,String> getCustomJavaSystemProperties() { |
| return MutableMap.<String, String>builder() |
| .putAll(super.getCustomJavaSystemProperties()) |
| .put("cassandra.config", getCassandraConfigFileName()) |
| .build(); |
| } |
| |
| @Override |
| public Map<String, String> getShellEnvironment() { |
| return MutableMap.<String, String>builder() |
| .putAll(super.getShellEnvironment()) |
| .put("CASSANDRA_HOME", getRunDir()) |
| .put("CASSANDRA_CONF", Os.mergePathsUnix(getRunDir(), "conf")) |
| .renameKey("JAVA_OPTS", "JVM_OPTS") |
| .build(); |
| } |
| |
| @Override |
| public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) { |
| String fileToRun = Os.mergePathsUnix("brooklyn_commands", "cassandra-commands-"+Identifiers.makeRandomId(8)); |
| TaskWrapper<Void> task = SshEffectorTasks.put(Os.mergePathsUnix(getRunDir(), fileToRun)) |
| .machine(getMachine()) |
| .contents(commands) |
| .summary("copying cassandra script to execute "+fileToRun) |
| .newTask(); |
| DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()).andWaitForSuccess(); |
| return executeScriptFromInstalledFileAsync(fileToRun); |
| } |
| |
| public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String fileToRun) { |
| ProcessTaskWrapper<Integer> task = SshEffectorTasks.ssh( |
| "cd "+getRunDir(), |
| scriptInvocationCommand(getThriftPort(), fileToRun)) |
| .machine(getMachine()) |
| .summary("executing cassandra script "+fileToRun) |
| .newTask(); |
| DynamicTasks.queueIfPossible(task).orSubmitAndBlock(getEntity()); |
| return task; |
| } |
| |
| protected String scriptInvocationCommand(Integer optionalThriftPort, String fileToRun) { |
| return "bin/cassandra-cli " + |
| (optionalThriftPort != null ? "--port " + optionalThriftPort : "") + |
| " --file "+fileToRun; |
| } |
| |
| @Override |
| public String getResolvedAddress(String hostname) { |
| return resolvedAddressCache.or(BrooklynAccessUtils.resolvedAddressSupplier(getEntity(), getMachine(), hostname)); |
| } |
| |
| } |