blob: 6917e621b1c5899e5329b170ee9caab03c17a6c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.nosql.cassandra;
import java.io.IOException;
import java.lang.reflect.Method;
import java.math.BigInteger;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.management.ObjectName;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.MachineLocation;
import org.apache.brooklyn.api.location.MachineProvisioningLocation;
import org.apache.brooklyn.api.sensor.EnricherSpec;
import org.apache.brooklyn.core.effector.EffectorBody;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.location.Machines;
import org.apache.brooklyn.core.location.access.BrooklynAccessUtils;
import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.enricher.stock.Enrichers;
import org.apache.brooklyn.entity.java.JavaAppUtils;
import org.apache.brooklyn.entity.java.UsesJmx;
import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
import org.apache.brooklyn.feed.jmx.JmxAttributePollConfig;
import org.apache.brooklyn.feed.jmx.JmxFeed;
import org.apache.brooklyn.feed.jmx.JmxHelper;
import org.apache.brooklyn.feed.jmx.JmxOperationPollConfig;
import org.apache.brooklyn.policy.enricher.RollingTimeWindowMeanEnricher;
import org.apache.brooklyn.policy.enricher.TimeWeightedDeltaEnricher;
import org.apache.brooklyn.util.collections.MutableSet;
import org.apache.brooklyn.util.core.config.ConfigBag;
import org.apache.brooklyn.util.core.text.TemplateProcessor;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.guava.Functionals;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
/**
* Implementation of {@link CassandraNode}.
*/
public class CassandraNodeImpl extends SoftwareProcessImpl implements CassandraNode {
private static final Logger log = LoggerFactory.getLogger(CassandraNodeImpl.class);
private final AtomicReference<Boolean> detectedCloudSensors = new AtomicReference<Boolean>(false);
public CassandraNodeImpl() {
}
@Override
public void init() {
super.init();
// TODO PERSISTENCE WORKAROUND kept anonymous class in case referenced in persisted state
new EffectorBody<String>() {
@Override
public String call(ConfigBag parameters) {
return executeScript((String)parameters.getStringKey("commands"));
}
};
getMutableEntityType().addEffector(EXECUTE_SCRIPT, new ExecuteScriptEffectorBody(this));
Entities.checkRequiredUrl(this, getCassandraConfigTemplateUrl());
Entities.getRequiredUrlConfig(this, CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL);
connectEnrichers();
}
private static class ExecuteScriptEffectorBody extends EffectorBody<String> {
private final CassandraNode entity;
public ExecuteScriptEffectorBody(CassandraNode entity) {
this.entity = entity;
}
@Override
public String call(ConfigBag parameters) {
return entity.executeScript((String)parameters.getStringKey("commands"));
}
}
/**
* Some clouds (e.g. Rackspace) give us VMs that have two nics: one for private and one for public.
* If the private IP is used then it doesn't work, even for a cluster purely internal to Rackspace!
*
* TODO Ugly. Need to understand more and find a better fix. Perhaps in Cassandra itself if necessary.
* Also need to investigate further:
* - does it still fail if BroadcastAddress is set to private IP?
* - is `openIptables` opening it up for both interfaces?
* - for aws->rackspace comms between nodes (thus using the public IP), will it be listening on an accessible port?
* - ideally do a check, open a server on one port on the machine, see if it is contactable on the public address;
* and set that as a flag on the cloud
*/
protected void setCloudPreferredSensorNames() {
if (detectedCloudSensors.get()) return;
synchronized (detectedCloudSensors) {
if (detectedCloudSensors.get()) return;
MachineProvisioningLocation<?> loc = getProvisioningLocation();
if (loc != null) {
try {
Method method = loc.getClass().getMethod("getProvider");
method.setAccessible(true);
String provider = (String) method.invoke(loc);
String result = "(nothing special)";
if (provider!=null) {
if (provider.contains("rackspace") || provider.contains("cloudservers") || provider.contains("softlayer")) {
/* These clouds have 2 NICs and it has to be consistent, so use public IP here to allow external access;
* (TODO internal access could be configured to improve performance / lower cost,
* if we know all nodes are visible to each other) */
if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
config().set(LISTEN_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName());
if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
config().set(BROADCAST_ADDRESS_SENSOR, CassandraNode.ADDRESS.getName());
result = "public IP for both listen and broadcast";
} else if (provider.contains("google-compute")) {
/* Google nodes cannot reach themselves/each-other on the public IP,
* and there is no hostname, so use private IP here */
if (getConfig(LISTEN_ADDRESS_SENSOR)==null)
config().set(LISTEN_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName());
if (getConfig(BROADCAST_ADDRESS_SENSOR)==null)
config().set(BROADCAST_ADDRESS_SENSOR, CassandraNode.SUBNET_HOSTNAME.getName());
result = "private IP for both listen and broadcast";
}
}
log.debug("Cassandra NICs inferred {} for {}; using location {}, based on provider {}", new Object[] {result, this, loc, provider});
} catch (Exception e) {
log.debug("Cassandra NICs auto-detection failed for {} in location {}: {}", new Object[] {this, loc, e});
}
}
detectedCloudSensors.set(true);
}
}
@Override
protected void preStart() {
super.preStart();
setCloudPreferredSensorNames();
}
// Used for freemarker
@Override
public String getMajorMinorVersion() {
String version = getConfig(CassandraNode.SUGGESTED_VERSION);
if (Strings.isBlank(version)) return "";
List<String> versionParts = ImmutableList.copyOf(Splitter.on(".").split(version));
return versionParts.get(0) + (versionParts.size() > 1 ? "."+versionParts.get(1) : "");
}
public String getCassandraConfigTemplateUrl() {
String templatedUrl = getConfig(CassandraNode.CASSANDRA_CONFIG_TEMPLATE_URL);
return TemplateProcessor.processTemplateContents(templatedUrl, this, ImmutableMap.<String, Object>of());
}
@Override public Integer getGossipPort() { return getAttribute(CassandraNode.GOSSIP_PORT); }
@Override public Integer getSslGossipPort() { return getAttribute(CassandraNode.SSL_GOSSIP_PORT); }
@Override public Integer getThriftPort() { return getAttribute(CassandraNode.THRIFT_PORT); }
@Override public Integer getNativeTransportPort() { return getAttribute(CassandraNode.NATIVE_TRANSPORT_PORT); }
@Override public String getClusterName() { return getAttribute(CassandraNode.CLUSTER_NAME); }
@Override public int getNumTokensPerNode() {
return getConfig(CassandraNode.NUM_TOKENS_PER_NODE);
}
@Override public Set<BigInteger> getTokens() {
// Prefer an already-set attribute over the config.
// Prefer TOKENS over TOKEN.
Set<BigInteger> tokens = getAttribute(CassandraNode.TOKENS);
if (tokens == null) {
tokens = getConfig(CassandraNode.TOKENS);
}
return tokens;
}
@Override public String getTokensAsString() {
// TODO check what is required when replacing failed node.
// with vnodes in Cassandra 2.x, don't bother supplying token
Set<BigInteger> tokens = getTokens();
if (tokens == null) return "";
return Joiner.on(",").join(tokens);
}
@Override public String getListenAddress() {
String sensorName = getConfig(LISTEN_ADDRESS_SENSOR);
if (Strings.isNonBlank(sensorName))
return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS);
}
@Override public String getBroadcastAddress() {
String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
if (Strings.isNonBlank(sensorName))
return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) {
// http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
// describes that the listen_address is set to the private IP, and the broadcast_address is set to the public IP.
return getAttribute(CassandraNode.ADDRESS);
} else if (!getDriver().isClustered()) {
return getListenAddress();
} else {
// In other situations, prefer the hostname, so other regions can see it
// *Unless* hostname resolves at the target to a local-only interface which is different to ADDRESS
// (workaround for issue deploying to localhost)
String hostname = getAttribute(CassandraNode.HOSTNAME);
try {
String resolvedAddress = getDriver().getResolvedAddress(hostname);
if (resolvedAddress==null) {
log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" could not be resolved at remote machine");
return getListenAddress();
}
if (resolvedAddress.equals("127.0.0.1")) {
log.debug("Cassandra using broadcast address "+getListenAddress()+" for "+this+" because hostname "+hostname+" resolves to 127.0.0.1");
return getListenAddress();
}
return hostname;
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
log.warn("Error resolving hostname "+hostname+" for "+this+": "+e, e);
return hostname;
}
}
}
/** not always the private IP, if public IP has been insisted on for broadcast, e.g. setting up a rack topology */
// have not confirmed this does the right thing in all clouds ... only used for rack topology however
@Override
public String getPrivateIp() {
String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
if (Strings.isNonBlank(sensorName)) {
return getAttribute(Sensors.newStringSensor(sensorName));
} else {
String subnetAddress = getAttribute(CassandraNode.SUBNET_ADDRESS);
return Strings.isNonBlank(subnetAddress) ? subnetAddress : getAttribute(CassandraNode.ADDRESS);
}
}
@Override
public String getPublicIp() {
// may need to be something else in google
return getAttribute(CassandraNode.ADDRESS);
}
@Override public String getRpcAddress() {
String sensorName = getConfig(RPC_ADDRESS_SENSOR);
if (Strings.isNonBlank(sensorName))
return Entities.submit(this, DependentConfiguration.attributeWhenReady(this, Sensors.newStringSensor(sensorName))).getUnchecked();
return "0.0.0.0";
}
@Override public String getSeeds() {
Set<Entity> seeds = getConfig(CassandraNode.INITIAL_SEEDS);
if (seeds==null) {
log.warn("No seeds available when requested for "+this, new Throwable("source of no Cassandra seeds when requested"));
return null;
}
String snitchName = getConfig(CassandraNode.ENDPOINT_SNITCH_NAME);
MutableSet<String> seedsHostnames = MutableSet.of();
for (Entity entity : seeds) {
// tried removing ourselves if there are other nodes, but that is a BAD idea!
// blows up with a "java.lang.RuntimeException: No other nodes seen!"
if (snitchName.equals("Ec2MultiRegionSnitch") || snitchName.contains("MultiCloudSnitch")) {
// http://www.datastax.com/documentation/cassandra/2.0/mobile/cassandra/architecture/architectureSnitchEC2MultiRegion_c.html
// says the seeds should be public IPs.
seedsHostnames.add(entity.getAttribute(CassandraNode.ADDRESS));
} else {
String sensorName = getConfig(BROADCAST_ADDRESS_SENSOR);
if (Strings.isNonBlank(sensorName)) {
seedsHostnames.add(entity.getAttribute(Sensors.newStringSensor(sensorName)));
} else {
Maybe<String> optionalSeedHostname = Machines.findSubnetOrPublicHostname(entity);
if (optionalSeedHostname.isPresent()) {
String seedHostname = optionalSeedHostname.get();
seedsHostnames.add(seedHostname);
} else {
log.warn("In node {}, seed hostname missing for {}; not including in seeds list", this, entity);
}
}
}
}
String result = Strings.join(seedsHostnames, ",");
log.info("Seeds for {}: {}", this, result);
return result;
}
// referenced by cassandra-rackdc.properties, read by some of the cassandra snitches
public String getDatacenterName() {
String name = getAttribute(CassandraNode.DATACENTER_NAME);
if (name == null) {
MachineLocation machine = getMachineOrNull();
MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation();
if (machine != null) {
name = machine.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
}
if (name == null && provisioningLocation != null) {
name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_REGION_ID);
}
if (name == null) {
name = "UNKNOWN_DATACENTER";
}
sensors().set(DATACENTER_NAME, name);
}
return name;
}
public String getRackName() {
String name = getAttribute(CassandraNode.RACK_NAME);
if (name == null) {
MachineLocation machine = getMachineOrNull();
MachineProvisioningLocation<?> provisioningLocation = getProvisioningLocation();
if (machine != null) {
name = machine.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
}
if (name == null && provisioningLocation != null) {
name = provisioningLocation.getConfig(CloudLocationConfig.CLOUD_AVAILABILITY_ZONE_ID);
}
if (name == null) {
name = "UNKNOWN_RACK";
}
sensors().set(RACK_NAME, name);
}
return name;
}
@Override
public Class<? extends CassandraNodeDriver> getDriverInterface() {
return CassandraNodeDriver.class;
}
@Override
public CassandraNodeDriver getDriver() {
return (CassandraNodeDriver) super.getDriver();
}
private volatile JmxFeed jmxFeed;
private volatile FunctionFeed functionFeed;
private JmxFeed jmxMxBeanFeed;
private JmxHelper jmxHelper;
private ObjectName storageServiceMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=StorageService");
private ObjectName readStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=ReadStage");
private ObjectName mutationStageMBean = JmxHelper.createObjectName("org.apache.cassandra.request:type=MutationStage");
private ObjectName snitchMBean = JmxHelper.createObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo");
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void connectSensors() {
// "cassandra" isn't really a protocol, but okay for now
sensors().set(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT));
super.connectSensors();
jmxHelper = new JmxHelper(this);
boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
if (getDriver().isJmxEnabled()) {
jmxFeed = JmxFeed.builder()
.entity(this)
.period(3000, TimeUnit.MILLISECONDS)
.helper(jmxHelper)
.pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
.objectName(storageServiceMBean)
.attributeName("Initialized")
.onSuccess(Functions.forPredicate(Predicates.notNull()))
.onException(Functions.constant(false))
.suppressDuplicates(true))
.pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
.objectName(storageServiceMBean)
.attributeName("TokenToEndpointMap")
.onSuccess(new Function<Object, Set<BigInteger>>() {
@Override
public Set<BigInteger> apply(@Nullable Object arg) {
Map input = (Map)arg;
if (input == null || input.isEmpty()) return null;
// FIXME does not work on aws-ec2, uses RFC1918 address
Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
Set<String> tokens = Maps.filterValues(input, self).keySet();
Set<BigInteger> result = Sets.newLinkedHashSet();
for (String token : tokens) {
result.add(new BigInteger(token));
}
return result;
}})
.onException(Functions.<Set<BigInteger>>constant(null))
.suppressDuplicates(true))
.pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
.period(60, TimeUnit.SECONDS)
.objectName(snitchMBean)
.operationName("getDatacenter")
.operationParams(ImmutableList.of(getBroadcastAddress()))
.onException(Functions.<String>constant(null))
.suppressDuplicates(true))
.pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
.period(60, TimeUnit.SECONDS)
.objectName(snitchMBean)
.operationName("getRack")
.operationParams(ImmutableList.of(getBroadcastAddress()))
.onException(Functions.<String>constant(null))
.suppressDuplicates(true))
.pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
.objectName(storageServiceMBean)
.attributeName("TokenToEndpointMap")
.onSuccess(new Function<Object, Integer>() {
@Override
public Integer apply(@Nullable Object arg) {
Map input = (Map)arg;
if (input == null || input.isEmpty()) return 0;
return input.size();
}
})
.onException(Functions.constant(-1)))
.pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
.objectName(storageServiceMBean)
.attributeName("LiveNodes")
.onSuccess(new Function<Object, Integer>() {
@Override
public Integer apply(@Nullable Object arg) {
List input = (List)arg;
if (input == null || input.isEmpty()) return 0;
return input.size();
}
})
.onException(Functions.constant(-1)))
.pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
.objectName(readStageMBean)
.attributeName("ActiveCount")
.onException(Functions.constant((Integer)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
.objectName(readStageMBean)
.attributeName("PendingTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
.objectName(readStageMBean)
.attributeName("CompletedTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
.objectName(mutationStageMBean)
.attributeName("ActiveCount")
.onException(Functions.constant((Integer)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
.objectName(mutationStageMBean)
.attributeName("PendingTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
.objectName(mutationStageMBean)
.attributeName("CompletedTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.build();
jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
}
if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
functionFeed = FunctionFeed.builder()
.entity(this)
.period(3000, TimeUnit.MILLISECONDS)
.poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
.onException(Functions.constant(-1L))
.callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
.enabled(retrieveUsageMetrics))
.build();
}
connectServiceUpIsRunning();
}
protected void connectEnrichers() {
connectEnrichers(Duration.TEN_SECONDS);
}
protected void connectEnrichers(Duration windowPeriod) {
JavaAppUtils.connectJavaAppServerPolicies(this);
enrichers().add(EnricherSpec.create(TimeWeightedDeltaEnricher.class)
.configure("producer", this)
.configure("source", READ_COMPLETED)
.configure("target", READS_PER_SECOND_LAST)
.configure("unitMillis", 1000));
if (windowPeriod!=null) {
enrichers().add(EnricherSpec.create(RollingTimeWindowMeanEnricher.class)
.configure("producer", this)
.configure("source", THRIFT_PORT_LATENCY)
.configure("target", THRIFT_PORT_LATENCY_IN_WINDOW)
.configure("timePeriod", windowPeriod));
enrichers().add(EnricherSpec.create(RollingTimeWindowMeanEnricher.class)
.configure("producer", this)
.configure("source", READS_PER_SECOND_LAST)
.configure("target", READS_PER_SECOND_IN_WINDOW)
.configure("timePeriod", windowPeriod));
enrichers().add(EnricherSpec.create(RollingTimeWindowMeanEnricher.class)
.configure("producer", this)
.configure("source", WRITES_PER_SECOND_LAST)
.configure("target", WRITES_PER_SECOND_IN_WINDOW)
.configure("timePeriod", windowPeriod));
}
// service-up checks
if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
.from(THRIFT_PORT_LATENCY)
.computing(Functionals.ifEquals(-1L).value("Thrift latency polling failed") )
.build());
}
if (Boolean.TRUE.equals(getConfig(UsesJmx.USE_JMX))) {
enrichers().add(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS)
.from(SERVICE_UP_JMX)
.computing(Functionals.ifEquals(false).value("JMX reports not up") )
.build());
}
}
@Override
public void disconnectSensors() {
super.disconnectSensors();
disconnectServiceUpIsRunning();
if (jmxFeed != null) jmxFeed.stop();
if (jmxMxBeanFeed != null) jmxMxBeanFeed.stop();
if (jmxHelper != null) jmxHelper.terminate();
if (functionFeed != null) functionFeed.stop();
}
@Override
public void setToken(String token) {
try {
if (!jmxHelper.isConnected()) jmxHelper.connect();;
jmxHelper.operation(storageServiceMBean, "move", token);
log.info("Moved server {} to token {}", getId(), token);
} catch (IOException ioe) {
Throwables.propagate(ioe);
}
}
@Override
public String executeScript(String commands) {
return getDriver().executeScriptAsync(commands).block().getStdout();
}
private static class ThriftLatencyChecker implements Callable<Long> {
private final CassandraNode entity;
public ThriftLatencyChecker(CassandraNode entity) {
this.entity = entity;
}
@Override
public Long call() {
Integer privatePort = entity.getThriftPort();
if (privatePort == null) return -1L;
HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(entity, privatePort);
try {
long start = System.currentTimeMillis();
Socket s = new Socket(hp.getHostText(), hp.getPort());
s.close();
long latency = System.currentTimeMillis() - start;
return latency;
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
if (log.isDebugEnabled())
log.debug("Cassandra thrift port poll failure: "+e);
return -1L;
}
}
}
}