blob: 14ee88c2824eab0cc1e06a0afbbca916eaeaf281 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.brooklyn.entity.nosql.cassandra;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.math.BigInteger;
import java.net.Socket;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
import org.apache.brooklyn.entity.nosql.cassandra.AstyanaxSupport.AstyanaxSample;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
/**
* A live test of the {@link CassandraDatacenter} entity.
*
* Tests that a two node cluster can be started on Amazon EC2 and data written on one {@link CassandraNode}
* can be read from another, using the Astyanax API.
*/
public class CassandraDatacenterLiveTest extends BrooklynAppLiveTestSupport {
private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterLiveTest.class);
private String provider =
"aws-ec2:eu-west-1";
// "rackspace-cloudservers-uk";
// "named:hpcloud-compute-at";
// "localhost";
// "jcloudsByon:(provider=\"aws-ec2\",region=\"us-east-1\",user=\"aled\",hosts=\"i-6f374743,i-35324219,i-1135453d\")";
protected Location testLocation;
protected CassandraDatacenter cluster;
@BeforeMethod(alwaysRun = true)
@Override
public void setUp() throws Exception {
super.setUp();
testLocation = mgmt.getLocationRegistry().getLocationManaged(provider);
}
@AfterMethod(alwaysRun=true)
@Override
public void tearDown() throws Exception {
super.tearDown();
}
@Test(groups = "Live")
public void testDatacenter() throws Exception {
EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
.configure("initialSize", 2)
.configure("clusterName", "CassandraClusterLiveTest");
runCluster(spec, false);
}
@Test(groups = "Live")
public void testDatacenterWithVnodes() throws Exception {
EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
.configure("initialSize", 2)
.configure(CassandraDatacenter.USE_VNODES, true)
.configure("clusterName", "CassandraClusterLiveTest");
runCluster(spec, true);
}
/*
* TODO on some distros (e.g. CentOS?), it comes pre-installed with java 6. Installing java 7
* didn't seem to be enough. I also had to set JAVA_HOME:
* .configure("shell.env", MutableMap.of("JAVA_HOME", "/etc/alternatives/java_sdk_1.7.0"))
* However, that would break other deployments such as on Ubuntu where JAVA_HOME would be different.
*
* TODO In AWS eu-west-1, with the default image choice (ami-ad2404da, ubuntu-trusty-14.04-amd64-server-20150908),
* `apt-get install openjdk-8-jdk` failed with the error "Unable to locate package openjdk-8-jdk".
* This meant Cassandra failed to start, because no Java was installed.
*/
@Test(groups = "Live")
public void testDatacenterWithVnodesVersion2() throws Exception {
EntitySpec<CassandraDatacenter> spec = EntitySpec.create(CassandraDatacenter.class)
.configure("initialSize", 2)
.configure(CassandraNode.SUGGESTED_VERSION, "2.0.9")
.configure(CassandraDatacenter.USE_VNODES, true)
.configure("clusterName", "CassandraClusterLiveTest");
runCluster(spec, true);
}
@Test(groups = {"Live", "Acceptance"}, invocationCount=10)
public void testManyTimes() throws Exception {
testDatacenter();
}
/**
* Test a Cassandra Datacenter:
* <ol>
* <li>Create two node datacenter
* <li>Confirm allows access via the Astyanax API through both nodes.
* <li>Confirm can size
* </ol>
*/
protected void runCluster(EntitySpec<CassandraDatacenter> datacenterSpec, boolean usesVnodes) throws Exception {
cluster = app.createAndManageChild(datacenterSpec);
assertEquals(cluster.getCurrentSize().intValue(), 0);
app.start(ImmutableList.of(testLocation));
// Check cluster is up and healthy
EntityAsserts.assertAttributeEqualsEventually(cluster, CassandraDatacenter.GROUP_SIZE, 2);
Entities.dumpInfo(app);
List<CassandraNode> members = castToCassandraNodes(cluster.getMembers());
assertNodesConsistent(members);
if (usesVnodes) {
assertVnodeTokensConsistent(members);
} else {
assertSingleTokenConsistent(members);
}
// Can connect via Astyanax
checkConnectionRepeatedly(2, 5, members);
// Resize
cluster.resize(3);
assertEquals(cluster.getMembers().size(), 3, "members="+cluster.getMembers());
if (usesVnodes) {
assertVnodeTokensConsistent(castToCassandraNodes(cluster.getMembers()));
} else {
assertSingleTokenConsistent(castToCassandraNodes(cluster.getMembers()));
}
checkConnectionRepeatedly(2, 5, cluster.getMembers());
}
protected static List<CassandraNode> castToCassandraNodes(Collection<? extends Entity> rawnodes) {
final List<CassandraNode> nodes = Lists.newArrayList();
for (Entity node : rawnodes) {
nodes.add((CassandraNode) node);
}
return nodes;
}
protected static void assertNodesConsistent(final List<CassandraNode> nodes) {
final Integer expectedLiveNodeCount = nodes.size();
// may take some time to be consistent (with new thrift_latency checks on the node,
// contactability should not be an issue, but consistency still might be)
Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
@Override
public void run() {
for (Entity n : nodes) {
CassandraNode node = (CassandraNode) n;
EntityAsserts.assertAttributeEquals(node, Startable.SERVICE_UP, true);
String errmsg = "node="+node+"; hostname="+node.getAttribute(Attributes.HOSTNAME)+"; port="+node.getThriftPort();
assertTrue(isSocketOpen(node), errmsg);
assertTrue(areVersionsConsistent(node), errmsg);
EntityAsserts.assertAttributeEquals(node, CassandraNode.LIVE_NODE_COUNT, expectedLiveNodeCount);
}
}});
}
protected static void assertSingleTokenConsistent(final List<CassandraNode> nodes) {
final int numNodes = nodes.size();
Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
@Override
public void run() {
Set<BigInteger> alltokens = Sets.newLinkedHashSet();
for (Entity node : nodes) {
EntityAsserts.assertAttributeEquals(node, Startable.SERVICE_UP, true);
EntityAsserts.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 1);
EntityAsserts.assertAttributeEquals(node, CassandraNode.PEERS, numNodes);
Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
assertNotNull(tokens);
alltokens.addAll(tokens);
}
assertFalse(alltokens.contains(null), "tokens="+alltokens);
assertEquals(alltokens.size(), numNodes);
}});
}
protected static void assertVnodeTokensConsistent(final List<CassandraNode> nodes) {
final int numNodes = nodes.size();
final int tokensPerNode = Iterables.get(nodes, 0).getNumTokensPerNode();
Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Runnable() {
@Override
public void run() {
Set<BigInteger> alltokens = Sets.newLinkedHashSet();
for (Entity node : nodes) {
EntityAsserts.assertAttributeEquals(node, Startable.SERVICE_UP, true);
EntityAsserts.assertAttributeEquals(node, CassandraNode.PEERS, tokensPerNode*numNodes);
EntityAsserts.assertConfigEquals(node, CassandraNode.NUM_TOKENS_PER_NODE, 256);
Set<BigInteger> tokens = node.getAttribute(CassandraNode.TOKENS);
assertNotNull(tokens);
assertEquals(tokens.size(), tokensPerNode, "tokens="+tokens);
alltokens.addAll(tokens);
}
assertFalse(alltokens.contains(null));
assertEquals(alltokens.size(), tokensPerNode*numNodes);
}});
}
protected static void checkConnectionRepeatedly(int totalAttemptsAllowed, int numRetriesPerAttempt, Iterable<? extends Entity> nodes) throws Exception {
int attemptNum = 0;
while (true) {
try {
checkConnection(numRetriesPerAttempt, nodes);
return;
} catch (Exception e) {
attemptNum++;
if (attemptNum >= totalAttemptsAllowed) {
log.warn("Cassandra not usable, "+attemptNum+" attempts; failing: "+e, e);
throw e;
}
log.warn("Cassandra not usable (attempt "+attemptNum+" of "+totalAttemptsAllowed+"), trying again after delay: "+e, e);
Time.sleep(Duration.TEN_SECONDS);
}
}
}
protected static void checkConnection(int numRetries, Iterable<? extends Entity> nodes) throws ConnectionException {
CassandraNode first = (CassandraNode) Iterables.get(nodes, 0);
// have been seeing intermittent SchemaDisagreementException errors on AWS, probably due to Astyanax / how we are using it
// (confirmed that clocks are in sync)
String uniqueName = Identifiers.makeRandomId(8);
AstyanaxSample astyanaxFirst = AstyanaxSample.builder().node(first).columnFamilyName(uniqueName).build();
Map<String, List<String>> versions;
AstyanaxContext<Cluster> context = astyanaxFirst.newAstyanaxContextForCluster();
try {
versions = context.getEntity().describeSchemaVersions();
} finally {
context.shutdown();
}
log.info("Cassandra schema versions are: "+versions);
if (versions.size() > 1) {
Assert.fail("Inconsistent versions on Cassandra start: "+versions);
}
String keyspacePrefix = "BrooklynTests_"+Identifiers.makeRandomId(8);
String keyspaceName = astyanaxFirst.writeData(keyspacePrefix, numRetries);
for (Entity node : nodes) {
AstyanaxSample astyanaxSecond = AstyanaxSample.builder().node((CassandraNode)node).columnFamilyName(uniqueName).build();
astyanaxSecond.readData(keyspaceName, numRetries);
}
}
protected static Boolean areVersionsConsistent(CassandraNode node) {
AstyanaxContext<Cluster> context = null;
try {
context = new AstyanaxSample(node).newAstyanaxContextForCluster();
Map<String, List<String>> v = context.getEntity().describeSchemaVersions();
return v.size() == 1;
} catch (Exception e) {
return null;
} finally {
if (context != null) context.shutdown();
}
}
protected static boolean isSocketOpen(CassandraNode node) {
try {
Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getThriftPort());
s.close();
return true;
} catch (Exception e) {
return false;
}
}
}