| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.brooklyn.entity.nosql.couchbase; |
| |
| import static java.lang.String.format; |
| import static org.apache.brooklyn.util.ssh.BashCommands.INSTALL_CURL; |
| import static org.apache.brooklyn.util.ssh.BashCommands.alternatives; |
| import static org.apache.brooklyn.util.ssh.BashCommands.chainGroup; |
| import static org.apache.brooklyn.util.ssh.BashCommands.installPackage; |
| import static org.apache.brooklyn.util.ssh.BashCommands.ok; |
| import static org.apache.brooklyn.util.ssh.BashCommands.sudo; |
| |
| import java.net.URI; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| |
| import com.google.common.base.Function; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.net.HostAndPort; |
| |
| import org.apache.brooklyn.api.entity.Entity; |
| import org.apache.brooklyn.api.entity.Group; |
| import org.apache.brooklyn.api.location.OsDetails; |
| import org.apache.brooklyn.api.mgmt.Task; |
| import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; |
| import org.apache.brooklyn.core.entity.Attributes; |
| import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadRequirement; |
| import org.apache.brooklyn.core.entity.drivers.downloads.DownloadProducerFromUrlAttribute; |
| import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; |
| import org.apache.brooklyn.core.location.access.BrooklynAccessUtils; |
| import org.apache.brooklyn.core.sensor.DependentConfiguration; |
| import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; |
| import org.apache.brooklyn.feed.http.HttpValueFunctions; |
| import org.apache.brooklyn.location.ssh.SshMachineLocation; |
| import org.apache.brooklyn.util.collections.MutableMap; |
| import org.apache.brooklyn.util.core.task.DynamicTasks; |
| import org.apache.brooklyn.util.core.task.TaskBuilder; |
| import org.apache.brooklyn.util.core.task.TaskTags; |
| import org.apache.brooklyn.util.core.task.Tasks; |
| import org.apache.brooklyn.util.http.HttpTool; |
| import org.apache.brooklyn.util.http.HttpToolResponse; |
| import org.apache.brooklyn.util.repeat.Repeater; |
| import org.apache.brooklyn.util.ssh.BashCommands; |
| import org.apache.brooklyn.util.text.NaturalOrderComparator; |
| import org.apache.brooklyn.util.text.StringEscapes.BashStringEscapes; |
| import org.apache.brooklyn.util.text.Strings; |
| import org.apache.brooklyn.util.time.Duration; |
| |
| public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver { |
| |
| public CouchbaseNodeSshDriver(final CouchbaseNodeImpl entity, final SshMachineLocation machine) { |
| super(entity, machine); |
| } |
| |
| public static String couchbaseCli(String cmd) { |
| return "/opt/couchbase/bin/couchbase-cli " + cmd + " "; |
| } |
| |
| @Override |
| public void install() { |
| //for reference https://github.com/urbandecoder/couchbase/blob/master/recipes/server.rb |
| //installation instructions (http://docs.couchbase.com/couchbase-manual-2.5/cb-install/#preparing-to-install) |
| |
| List<String> urls = resolver.getTargets(); |
| String saveAs = resolver.getFilename(); |
| |
| OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); |
| |
| if (osDetails.isLinux()) { |
| List<String> commands = installLinux(urls, saveAs); |
| //FIXME installation return error but the server is up and running. |
| newScript(INSTALLING) |
| .body.append(commands).execute(); |
| } else { |
| Tasks.markInessential(); |
| throw new IllegalStateException("Unsupported OS for installing Couchbase. Will continue but may fail later."); |
| } |
| } |
| |
| private List<String> installLinux(List<String> urls, String saveAs) { |
| |
| log.info("Installing " + getEntity() + " using couchbase-server-{} {}", getCommunityOrEnterprise(), getVersion()); |
| |
| String apt = chainGroup( |
| installPackage(MutableMap.of("apt", "python-httplib2 libssl0.9.8"), null), |
| sudo(format("dpkg -i %s", saveAs))); |
| |
| String yum = chainGroup( |
| "which yum", |
| // The following prevents failure on RHEL AWS nodes: |
| // https://forums.aws.amazon.com/thread.jspa?threadID=100509 |
| ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")), |
| ok(sudo("yum check-update")), |
| sudo("yum install -y pkgconfig"), |
| // RHEL requires openssl version 098 |
| sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"), |
| sudo(format("rpm --install %s", saveAs))); |
| |
| String link = new DownloadProducerFromUrlAttribute().apply(new BasicDownloadRequirement(this)).getPrimaryLocations().iterator().next(); |
| return ImmutableList.<String>builder() |
| .add(INSTALL_CURL) |
| .addAll(Arrays.asList(INSTALL_CURL, |
| BashCommands.require(BashCommands.alternatives(BashCommands.simpleDownloadUrlAs(urls, saveAs), |
| // Referer link is required for 3.0.0; note mis-spelling is correct, as per http://en.wikipedia.org/wiki/HTTP_referer |
| "curl -f -L -k " + BashStringEscapes.wrapBash(link) |
| + " -H 'Referer: http://www.couchbase.com/downloads'" |
| + " -o " + saveAs), |
| "Could not retrieve " + saveAs + " (from " + urls.size() + " sites)", 9))) |
| .add(alternatives(apt, yum)) |
| .build(); |
| } |
| |
| @Override |
| public void customize() { |
| //TODO: add linux tweaks for couchbase |
| //http://blog.couchbase.com/often-overlooked-linux-os-tweaks |
| //http://blog.couchbase.com/kirk |
| |
| //turn off swappiness |
| //vm.swappiness=0 |
| //sudo echo 0 > /proc/sys/vm/swappiness |
| |
| //os page cache = 20% |
| |
| //disable THP |
| //sudo echo never > /sys/kernel/mm/transparent_hugepage/enabled |
| //sudo echo never > /sys/kernel/mm/transparent_hugepage/defrag |
| |
| //turn off transparent huge pages |
| //limit page cache disty bytes |
| //control the rate page cache is flused ... vm.dirty_* |
| } |
| |
| @Override |
| public void launch() { |
| String clusterPrefix = "--cluster-" + (isPreV3() ? "init-" : ""); |
| // in v30, the cluster arguments were changed, and it became mandatory to supply a url + password (if there is none, these are ignored) |
| newScript(LAUNCHING) |
| .body.append( |
| sudo("/etc/init.d/couchbase-server start"), |
| "for i in {0..120}\n" + |
| "do\n" + |
| " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" + |
| " curl -s " + String.format("http://localhost:%s", getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" + |
| " sleep 1\n" + |
| "done\n" + |
| couchbaseCli("cluster-init") + |
| (isPreV3() ? getCouchbaseHostnameAndPort() : getCouchbaseHostnameAndCredentials()) + |
| " " + clusterPrefix + "username=" + getUsername() + |
| " " + clusterPrefix + "password=" + getPassword() + |
| " " + clusterPrefix + "port=" + getWebPort() + |
| " " + clusterPrefix + "ramsize=" + getClusterInitRamSize()) |
| .execute(); |
| } |
| |
| @Override |
| public boolean isRunning() { |
| //TODO add a better way to check if couchbase server is running |
| return (newScript(CHECK_RUNNING) |
| .body.append(format("curl -u %s:%s http://localhost:%s/pools/nodes", getUsername(), getPassword(), getWebPort())) |
| .execute() == 0); |
| } |
| |
| @Override |
| public void stop() { |
| newScript(STOPPING) |
| .body.append(sudo("/etc/init.d/couchbase-server stop")) |
| .execute(); |
| } |
| |
| @Override |
| public String getVersion() { |
| return entity.getConfig(CouchbaseNode.SUGGESTED_VERSION); |
| } |
| |
| @Override |
| public String getOsTag() { |
| return newDownloadLinkSegmentComputer().getOsTag(); |
| } |
| |
| protected DownloadLinkSegmentComputer newDownloadLinkSegmentComputer() { |
| return new DownloadLinkSegmentComputer(getLocation().getOsDetails(), !isPreV3(), Strings.toString(getEntity())); |
| } |
| |
| public static class DownloadLinkSegmentComputer { |
| // links are: |
| // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.rpm |
| // http://packages.couchbase.com/releases/2.2.0/couchbase-server-community_2.2.0_x86_64.deb |
| // ^^^ preV3 is _ everywhere |
| // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community_3.0.0-ubuntu12.04_amd64.deb |
| // ^^^ most V3 is _${version}- |
| // http://packages.couchbase.com/releases/3.0.0/couchbase-server-community-3.0.0-centos6.x86_64.rpm |
| // ^^^ but RHEL is -${version}- |
| |
| @Nullable |
| private final OsDetails os; |
| @Nonnull |
| private final boolean isV3OrLater; |
| @Nonnull |
| private final String context; |
| @Nonnull |
| private final String osName; |
| @Nonnull |
| private final boolean isRpm; |
| @Nonnull |
| private final boolean is64bit; |
| |
| public DownloadLinkSegmentComputer(@Nullable OsDetails os, boolean isV3OrLater, @Nonnull String context) { |
| this.os = os; |
| this.isV3OrLater = isV3OrLater; |
| this.context = context; |
| if (os == null) { |
| // guess centos as RPM is sensible default |
| log.warn("No details known for OS of " + context + "; assuming 64-bit RPM distribution of Couchbase"); |
| osName = "centos"; |
| isRpm = true; |
| is64bit = true; |
| return; |
| } |
| osName = os.getName().toLowerCase(); |
| isRpm = !(osName.contains("deb") || osName.contains("ubuntu")); |
| is64bit = os.is64bit(); |
| } |
| |
| /** |
| * separator after the version number used to be _ but is - in 3.0 and later |
| */ |
| public String getPreVersionSeparator() { |
| if (!isV3OrLater) return "_"; |
| if (isRpm) return "-"; |
| return "_"; |
| } |
| |
| public String getOsTag() { |
| // couchbase only provide certain versions; if on other platforms let's suck-it-and-see |
| String family; |
| if (osName.contains("debian")) family = "debian7_"; |
| else if (osName.contains("ubuntu")) family = "ubuntu12.04_"; |
| else if (osName.contains("centos") || osName.contains("rhel") || (osName.contains("red") && osName.contains("hat"))) |
| family = "centos6."; |
| else { |
| log.warn("Unrecognised OS " + os + " of " + context + "; assuming RPM distribution of Couchbase"); |
| family = "centos6."; |
| } |
| |
| if (!is64bit && !isV3OrLater) { |
| // NB: 32-bit binaries aren't (yet?) available for v30 |
| log.warn("32-bit binaries for Couchbase might not be available, when deploying " + context); |
| } |
| String arch = !is64bit ? "x86" : !isRpm && isV3OrLater ? "amd64" : "x86_64"; |
| String fileExtension = isRpm ? ".rpm" : ".deb"; |
| |
| if (isV3OrLater) |
| return family + arch + fileExtension; |
| else |
| return arch + fileExtension; |
| } |
| |
| public String getOsTagWithPrefix() { |
| return (!isV3OrLater ? "_" : "-") + getOsTag(); |
| } |
| } |
| |
| @Override |
| public String getDownloadLinkOsTagWithPrefix() { |
| return newDownloadLinkSegmentComputer().getOsTagWithPrefix(); |
| } |
| |
| @Override |
| public String getDownloadLinkPreVersionSeparator() { |
| return newDownloadLinkSegmentComputer().getPreVersionSeparator(); |
| } |
| |
| private boolean isPreV3() { |
| return NaturalOrderComparator.INSTANCE.compare(getEntity().getConfig(CouchbaseNode.SUGGESTED_VERSION), "3.0") < 0; |
| } |
| |
| @Override |
| public String getCommunityOrEnterprise() { |
| Boolean isEnterprise = getEntity().getConfig(CouchbaseNode.USE_ENTERPRISE); |
| return isEnterprise ? "enterprise" : "community"; |
| } |
| |
| private String getUsername() { |
| return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); |
| } |
| |
| private String getPassword() { |
| return entity.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); |
| } |
| |
| private String getWebPort() { |
| return "" + entity.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT); |
| } |
| |
| private String getCouchbaseHostnameAndCredentials() { |
| return format("-c %s:%s -u %s -p %s", getSubnetHostname(), getWebPort(), getUsername(), getPassword()); |
| } |
| |
| private String getCouchbaseHostnameAndPort() { |
| return format("-c %s:%s", getSubnetHostname(), getWebPort()); |
| } |
| |
| private String getClusterInitRamSize() { |
| return entity.getConfig(CouchbaseNode.COUCHBASE_CLUSTER_INIT_RAM_SIZE).toString(); |
| } |
| |
| @Override |
| public void rebalance() { |
| entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "explicitly started"); |
| newScript("rebalance") |
| .body.append( |
| couchbaseCli("rebalance") + getCouchbaseHostnameAndCredentials()) |
| .failOnNonZeroResultCode() |
| .execute(); |
| |
| // wait until the re-balance is started |
| // (if it's quick, this might miss it, but it will only block for 30s if so) |
| Repeater.create() |
| .backoff(Repeater.DEFAULT_REAL_QUICK_PERIOD, 2, Duration.millis(500)) |
| .limitTimeTo(Duration.THIRTY_SECONDS) |
| .until(new Callable<Boolean>() { |
| @Override |
| public Boolean call() throws Exception { |
| for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { |
| if (isNodeRebalancing(nodeHostAndPort.toString())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| } |
| ).run(); |
| |
| entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "waiting for completion"); |
| // Wait until the Couchbase node finishes the re-balancing |
| Task<Boolean> reBalance = TaskBuilder.<Boolean>builder() |
| .displayName("Waiting until node is rebalancing") |
| .body(new Callable<Boolean>() { |
| @Override |
| public Boolean call() throws Exception { |
| return Repeater.create() |
| .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS) |
| .limitTimeTo(Duration.FIVE_MINUTES) |
| .until(new Callable<Boolean>() { |
| @Override |
| public Boolean call() throws Exception { |
| for (HostAndPort nodeHostAndPort : getNodesHostAndPort()) { |
| if (isNodeRebalancing(nodeHostAndPort.toString())) { |
| return false; |
| } |
| } |
| return true; |
| } |
| }) |
| .run(); |
| } |
| }) |
| .build(); |
| Boolean completed = DynamicTasks.queueIfPossible(reBalance) |
| .orSubmitAndBlock() |
| .andWaitForSuccess(); |
| if (completed) { |
| entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "completed"); |
| ServiceStateLogic.ServiceNotUpLogic.clearNotUpIndicator(getEntity(), "rebalancing"); |
| log.info("Rebalanced cluster via primary node {}", getEntity()); |
| } else { |
| entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "timed out"); |
| ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(getEntity(), "rebalancing", "rebalance did not complete within time limit"); |
| log.warn("Timeout rebalancing cluster via primary node {}", getEntity()); |
| } |
| } |
| |
| private Iterable<HostAndPort> getNodesHostAndPort() { |
| Group group = Iterables.getFirst(getEntity().groups(), null); |
| if (group == null) return Lists.newArrayList(); |
| return Iterables.transform(group.getAttribute(CouchbaseCluster.COUCHBASE_CLUSTER_UP_NODES), |
| new Function<Entity, HostAndPort>() { |
| @Override |
| public HostAndPort apply(Entity input) { |
| return BrooklynAccessUtils.getBrooklynAccessibleAddress(input, input.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT)); |
| } |
| }); |
| } |
| |
| private boolean isNodeRebalancing(String nodeHostAndPort) { |
| HttpToolResponse response = getApiResponse("http://" + nodeHostAndPort + "/pools/default/rebalanceProgress"); |
| if (response.getResponseCode() != 200) { |
| throw new IllegalStateException("failed retrieving rebalance status: " + response); |
| } |
| return !"none".equals(HttpValueFunctions.jsonContents("status", String.class).apply(response)); |
| } |
| |
| private HttpToolResponse getApiResponse(String uri) { |
| return HttpTool.httpGet(HttpTool.httpClientBuilder() |
| // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials |
| .uri(uri) |
| .credentials(new UsernamePasswordCredentials(getUsername(), getPassword())) |
| .build(), |
| URI.create(uri), |
| ImmutableMap.<String, String>of()); |
| } |
| |
| @Override |
| public void serverAdd(String serverToAdd, String username, String password) { |
| newScript("serverAdd").body.append(couchbaseCli("server-add") |
| + getCouchbaseHostnameAndCredentials() + |
| " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + |
| " --server-add-username=" + BashStringEscapes.wrapBash(username) + |
| " --server-add-password=" + BashStringEscapes.wrapBash(password)) |
| .failOnNonZeroResultCode() |
| .execute(); |
| } |
| |
| @Override |
| public void serverAddAndRebalance(String serverToAdd, String username, String password) { |
| newScript("serverAddAndRebalance").body.append(couchbaseCli("rebalance") |
| + getCouchbaseHostnameAndCredentials() + |
| " --server-add=" + BashStringEscapes.wrapBash(serverToAdd) + |
| " --server-add-username=" + BashStringEscapes.wrapBash(username) + |
| " --server-add-password=" + BashStringEscapes.wrapBash(password)) |
| .failOnNonZeroResultCode() |
| .execute(); |
| entity.sensors().set(CouchbaseNode.REBALANCE_STATUS, "triggered as part of server-add"); |
| } |
| |
| @Override |
| public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { |
| log.info("Adding bucket: {} to cluster {} primary node: {}", new Object[]{bucketName, CouchbaseClusterImpl.getClusterOrNode(getEntity()), getEntity()}); |
| |
| newScript("bucketCreate").body.append(couchbaseCli("bucket-create") |
| + getCouchbaseHostnameAndCredentials() + |
| " --bucket=" + BashStringEscapes.wrapBash(bucketName) + |
| " --bucket-type=" + BashStringEscapes.wrapBash(bucketType) + |
| " --bucket-port=" + bucketPort + |
| " --bucket-ramsize=" + bucketRamSize + |
| " --bucket-replica=" + bucketReplica) |
| .failOnNonZeroResultCode() |
| .execute(); |
| } |
| |
| @Override |
| public void addReplicationRule(Entity toCluster, String fromBucket, String toBucket) { |
| DynamicTasks.queue(DependentConfiguration.attributeWhenReady(toCluster, Attributes.SERVICE_UP)).getUnchecked(); |
| |
| String destName = CouchbaseClusterImpl.getClusterName(toCluster); |
| |
| log.info("Setting up XDCR for " + fromBucket + " from " + CouchbaseClusterImpl.getClusterName(getEntity()) + " (via " + getEntity() + ") " |
| + "to " + destName + " (" + toCluster + ")"); |
| |
| Entity destPrimaryNode = toCluster.getAttribute(CouchbaseCluster.COUCHBASE_PRIMARY_NODE); |
| String destHostname = destPrimaryNode.getAttribute(Attributes.HOSTNAME); |
| String destUsername = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); |
| String destPassword = toCluster.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); |
| |
| // on the REST API there is mention of a 'type' 'continuous' but i don't see other refs to this |
| |
| // PROTOCOL Select REST protocol or memcached for replication. xmem indicates memcached while capi indicates REST protocol. |
| // looks like xmem is the default; leave off for now |
| // String replMode = "xmem"; |
| |
| DynamicTasks.queue(TaskTags.markInessential(SshEffectorTasks.ssh( |
| couchbaseCli("xdcr-setup") + |
| getCouchbaseHostnameAndCredentials() + |
| " --create" + |
| " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + |
| " --xdcr-hostname=" + BashStringEscapes.wrapBash(destHostname) + |
| " --xdcr-username=" + BashStringEscapes.wrapBash(destUsername) + |
| " --xdcr-password=" + BashStringEscapes.wrapBash(destPassword) |
| ).summary("create xdcr destination " + destName).newTask())); |
| |
| // would be nice to auto-create bucket, but we'll need to know the parameters; the port in particular is tedious |
| // ((CouchbaseNode)destPrimaryNode).bucketCreate(toBucket, "couchbase", null, 0, 0); |
| |
| DynamicTasks.queue(SshEffectorTasks.ssh( |
| couchbaseCli("xdcr-replicate") + |
| getCouchbaseHostnameAndCredentials() + |
| " --create" + |
| " --xdcr-cluster-name=" + BashStringEscapes.wrapBash(destName) + |
| " --xdcr-from-bucket=" + BashStringEscapes.wrapBash(fromBucket) + |
| " --xdcr-to-bucket=" + BashStringEscapes.wrapBash(toBucket) |
| // + " --xdcr-replication-mode="+replMode |
| ).summary("configure replication for " + fromBucket + " to " + destName + ":" + toBucket).newTask()); |
| } |
| } |