blob: 0b490962b8f2f20a96099e6df67433b9d388d806 [file] [log] [blame]
package org.apache.helix.webapp;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.ZNRecord;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
import org.apache.helix.webapp.resources.InstancesResource.ListInstancesWrapper;
import org.apache.helix.webapp.resources.JsonParameters;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.restlet.Component;
import org.restlet.Request;
import org.restlet.Response;
import org.restlet.data.MediaType;
import org.restlet.data.Method;
import org.restlet.data.Reference;
import org.restlet.data.Status;
import org.restlet.representation.Representation;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* Simulate all the admin tasks needed by using command line tool
*/
public class TestHelixAdminScenariosRest extends AdminTestBase {
private static final int MAX_RETRIES = 5;
RestAdminApplication _adminApp;
Component _component;
String _tag1 = "tag1123";
String _tag2 = "tag212334";
public static String ObjectToJson(Object object) throws JsonGenerationException,
JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
SerializationConfig serializationConfig = mapper.getSerializationConfig();
serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
StringWriter sw = new StringWriter();
mapper.writeValue(sw, object);
return sw.toString();
}
public static <T extends Object> T JsonToObject(Class<T> clazz, String jsonString)
throws JsonParseException, JsonMappingException, IOException {
StringReader sr = new StringReader(jsonString);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(sr, clazz);
}
static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
boolean hasException) throws IOException {
return assertSuccessPostOperation(url, jsonParameters, null, hasException);
}
static String assertSuccessPostOperation(String url, Map<String, String> jsonParameters,
Map<String, String> extraForm, boolean hasException) throws IOException {
Reference resourceRef = new Reference(url);
int numRetries = 0;
while (numRetries <= MAX_RETRIES) {
Request request = new Request(Method.POST, resourceRef);
if (extraForm != null) {
String entity =
JsonParameters.JSON_PARAMETERS + "="
+ ClusterRepresentationUtil.ObjectToJson(jsonParameters);
for (String key : extraForm.keySet()) {
entity = entity + "&" + (key + "=" + extraForm.get(key));
}
request.setEntity(entity, MediaType.APPLICATION_ALL);
} else {
request
.setEntity(
JsonParameters.JSON_PARAMETERS + "="
+ ClusterRepresentationUtil.ObjectToJson(jsonParameters),
MediaType.APPLICATION_ALL);
}
Response response = _gClient.handle(request);
Representation result = response.getEntity();
StringWriter sw = new StringWriter();
if (result != null) {
result.write(sw);
}
int code = response.getStatus().getCode();
boolean successCode =
code == Status.SUCCESS_NO_CONTENT.getCode() || code == Status.SUCCESS_OK.getCode();
if (successCode || numRetries == MAX_RETRIES) {
Assert.assertTrue(successCode);
Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
return sw.toString();
}
numRetries++;
}
Assert.fail("Request failed after all retries");
return null;
}
void deleteUrl(String url, boolean hasException) throws IOException {
Reference resourceRef = new Reference(url);
Request request = new Request(Method.DELETE, resourceRef);
Response response = _gClient.handle(request);
Representation result = response.getEntity();
StringWriter sw = new StringWriter();
result.write(sw);
Assert.assertTrue(hasException == sw.toString().toLowerCase().contains("exception"));
}
String getUrl(String url) throws IOException {
Reference resourceRef = new Reference(url);
Request request = new Request(Method.GET, resourceRef);
Response response = _gClient.handle(request);
Representation result = response.getEntity();
StringWriter sw = new StringWriter();
result.write(sw);
return sw.toString();
}
String getClusterUrl(String cluster) {
return "http://localhost:" + ADMIN_PORT + "/clusters" + "/" + cluster;
}
String getInstanceUrl(String cluster, String instance) {
return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster + "/instances/" + instance;
}
String getResourceUrl(String cluster, String resourceGroup) {
return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster + "/resourceGroups/"
+ resourceGroup;
}
void assertClusterSetupException(String command) {
boolean exceptionThrown = false;
try {
ClusterSetup.processCommandLineArgs(command.split(" "));
} catch (Exception e) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
}
private Map<String, String> addClusterCmd(String clusterName) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.CLUSTER_NAME, clusterName);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addCluster);
return parameters;
}
private void addCluster(String clusterName) throws IOException {
String url = "http://localhost:" + ADMIN_PORT + "/clusters";
String response = assertSuccessPostOperation(url, addClusterCmd(clusterName), false);
Assert.assertTrue(response.contains(clusterName));
}
@Test
public void testAddCluster() throws Exception {
String url = "http://localhost:" + ADMIN_PORT + "/clusters";
// Normal add
String response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), false);
Assert.assertTrue(response.contains("clusterTest"));
// malformed cluster name
response = assertSuccessPostOperation(url, addClusterCmd("/ClusterTest"), true);
// Add the grand cluster
response = assertSuccessPostOperation(url, addClusterCmd("Klazt3rz"), false);
Assert.assertTrue(response.contains("Klazt3rz"));
response = assertSuccessPostOperation(url, addClusterCmd("\\ClusterTest"), false);
Assert.assertTrue(response.contains("\\ClusterTest"));
// Add already exist cluster
response = assertSuccessPostOperation(url, addClusterCmd("clusterTest"), false);
// delete cluster without resource and instance
Assert.assertTrue(ZKUtil.isClusterSetup("Klazt3rz", _gZkClient));
Assert.assertTrue(ZKUtil.isClusterSetup("clusterTest", _gZkClient));
Assert.assertTrue(ZKUtil.isClusterSetup("\\ClusterTest", _gZkClient));
String clusterUrl = getClusterUrl("\\ClusterTest");
deleteUrl(clusterUrl, false);
String clustersUrl = "http://localhost:" + ADMIN_PORT + "/clusters";
response = getUrl(clustersUrl);
clusterUrl = getClusterUrl("clusterTest1");
deleteUrl(clusterUrl, false);
response = getUrl(clustersUrl);
Assert.assertFalse(response.contains("clusterTest1"));
clusterUrl = getClusterUrl("clusterTest");
deleteUrl(clusterUrl, false);
response = getUrl(clustersUrl);
Assert.assertFalse(response.contains("clusterTest"));
clusterUrl = getClusterUrl("clusterTestOK");
deleteUrl(clusterUrl, false);
Assert.assertFalse(_gZkClient.exists("/clusterTest"));
Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
Assert.assertFalse(_gZkClient.exists("/clusterTestOK"));
response = assertSuccessPostOperation(url, addClusterCmd("clusterTest1"), false);
response = getUrl(clustersUrl);
Assert.assertTrue(response.contains("clusterTest1"));
}
private Map<String, String> addResourceCmd(String resourceName, String stateModelDef,
int partition) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.RESOURCE_GROUP_NAME, resourceName);
parameters.put(JsonParameters.STATE_MODEL_DEF_REF, stateModelDef);
parameters.put(JsonParameters.PARTITIONS, "" + partition);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addResource);
return parameters;
}
private void addResource(String clusterName, String resourceName, int partitions)
throws IOException {
final String reourcesUrl =
"http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
String response =
assertSuccessPostOperation(reourcesUrl,
addResourceCmd(resourceName, "MasterSlave", partitions), false);
Assert.assertTrue(response.contains(resourceName));
}
@Test
public void testAddResource() throws Exception {
final String clusterName = "clusterTestAddResource";
addCluster(clusterName);
String reourcesUrl =
"http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
String response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "MasterSlave", 144), false);
Assert.assertTrue(response.contains("db_22"));
response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_11"));
// Add duplicate resource
response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_22", "OnlineOffline", 55), true);
// drop resource now
String resourceUrl = getResourceUrl(clusterName, "db_11");
deleteUrl(resourceUrl, false);
Assert.assertFalse(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_11", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_11"));
Assert.assertTrue(_gZkClient.exists("/" + clusterName + "/IDEALSTATES/db_11"));
response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_33", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_33"));
response =
assertSuccessPostOperation(reourcesUrl, addResourceCmd("db_44", "MasterSlave", 44), false);
Assert.assertTrue(response.contains("db_44"));
}
private Map<String, String> activateClusterCmd(String grandClusterName, boolean enabled) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.GRAND_CLUSTER, grandClusterName);
parameters.put(JsonParameters.ENABLED, "" + enabled);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.activateCluster);
return parameters;
}
@Test
public void testDeactivateCluster() throws Exception {
final String clusterName = "clusterTestDeactivateCluster";
final String controllerClusterName = "controllerClusterTestDeactivateCluster";
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
Map<String, ClusterDistributedController> distControllers =
new HashMap<String, ClusterDistributedController>();
// setup cluster
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 6, null);
addResource(clusterName, "db_11", 16);
rebalanceResource(clusterName, "db_11");
addCluster(controllerClusterName);
addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
// start mock nodes
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
// start controller nodes
for (int i = 0; i < 2; i++) {
String controllerName = "controller_900" + i;
ClusterDistributedController distController =
new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
distController.syncStart();
distControllers.put(controllerName, distController);
}
String clusterUrl = getClusterUrl(clusterName);
// activate cluster
assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
controllerClusterName));
Assert.assertTrue(verifyResult);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
// deactivate cluster
assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, false), false);
Thread.sleep(6000);
Assert.assertFalse(_gZkClient.exists("/" + controllerClusterName + "/IDEALSTATES/"
+ clusterName));
HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
String path = accessor.keyBuilder().controllerLeader().getPath();
Assert.assertFalse(_gZkClient.exists(path));
deleteUrl(clusterUrl, true);
Assert.assertTrue(_gZkClient.exists("/" + clusterName));
// leader node should be gone
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
deleteUrl(clusterUrl, false);
Assert.assertFalse(_gZkClient.exists("/" + clusterName));
// clean up
for (ClusterDistributedController controller : distControllers.values()) {
controller.syncStop();
}
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
private Map<String, String> addIdealStateCmd() {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addIdealState);
return parameters;
}
@Test
public void testDropAddResource() throws Exception {
final String clusterName = "clusterTestDropAddResource";
// setup cluster
addCluster(clusterName);
addResource(clusterName, "db_11", 22);
addInstancesToCluster(clusterName, "localhost_123", 6, null);
rebalanceResource(clusterName, "db_11");
ZNRecord record =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, "db_11")
.getRecord();
String x = ObjectToJson(record);
FileWriter fos = new FileWriter("/tmp/temp.log");
PrintWriter pw = new PrintWriter(fos);
pw.write(x);
pw.close();
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
controller.syncStart();
// start mock nodes
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
String resourceUrl = getResourceUrl(clusterName, "db_11");
deleteUrl(resourceUrl, false);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
addResource(clusterName, "db_11", 22);
String idealStateUrl = getResourceUrl(clusterName, "db_11") + "/idealState";
Map<String, String> extraform = new HashMap<String, String>();
extraform.put(JsonParameters.NEW_IDEAL_STATE, x);
assertSuccessPostOperation(idealStateUrl, addIdealStateCmd(), extraform, false);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
ZNRecord record2 =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, "db_11")
.getRecord();
Assert.assertTrue(record2.equals(record));
// clean up
controller.syncStop();
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
private Map<String, String> addInstanceCmd(String instances) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.INSTANCE_NAMES, instances);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
return parameters;
}
private Map<String, String> expandClusterCmd() {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.expandCluster);
return parameters;
}
@Test
public void testExpandCluster() throws Exception {
final String clusterName = "clusterTestExpandCluster";
// setup cluster
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 6, null);
addResource(clusterName, "db_11", 22);
rebalanceResource(clusterName, "db_11");
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
controller.syncStart();
// start mock nodes
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
String clusterUrl = getClusterUrl(clusterName);
String instancesUrl = clusterUrl + "/instances";
String instances = "localhost:12331;localhost:12341;localhost:12351;localhost:12361";
String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
String[] hosts = instances.split(";");
for (String host : hosts) {
Assert.assertTrue(response.contains(host.replace(':', '_')));
}
response = assertSuccessPostOperation(clusterUrl, expandClusterCmd(), false);
for (int i = 3; i <= 6; i++) {
String instanceName = "localhost_123" + i + "1";
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
verifyResult =
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
Assert.assertTrue(verifyResult);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
// clean up
controller.syncStop();
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
private Map<String, String> enablePartitionCmd(String resourceName, String partitions,
boolean enabled) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enablePartition);
parameters.put(JsonParameters.ENABLED, "" + enabled);
parameters.put(JsonParameters.PARTITION, partitions);
parameters.put(JsonParameters.RESOURCE, resourceName);
return parameters;
}
@Test
public void testEnablePartitions() throws IOException, InterruptedException {
final String clusterName = "clusterTestEnablePartitions";
// setup cluster
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 6, null);
addResource(clusterName, "db_11", 22);
rebalanceResource(clusterName, "db_11");
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
controller.syncStart();
// start mock nodes
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
HelixDataAccessor accessor = participants.get("localhost_1231").getHelixDataAccessor();
// drop node should fail as not disabled
String hostName = "localhost_1231";
String instanceUrl = getInstanceUrl(clusterName, hostName);
ExternalView ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
String response =
assertSuccessPostOperation(instanceUrl,
enablePartitionCmd("db_11", "db_11_0;db_11_11", false), false);
Assert.assertTrue(response.contains("DISABLED_PARTITION"));
Assert.assertTrue(response.contains("db_11_0"));
Assert.assertTrue(response.contains("db_11_11"));
boolean verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "OFFLINE");
Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "OFFLINE");
response =
assertSuccessPostOperation(instanceUrl,
enablePartitionCmd("db_11", "db_11_0;db_11_11", true), false);
Assert.assertFalse(response.contains("db_11_0"));
Assert.assertFalse(response.contains("db_11_11"));
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
ev = accessor.getProperty(accessor.keyBuilder().externalView("db_11"));
Assert.assertEquals(ev.getStateMap("db_11_0").get(hostName), "MASTER");
Assert.assertEquals(ev.getStateMap("db_11_11").get(hostName), "SLAVE");
// clean up
controller.syncStop();
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
private Map<String, String> enableInstanceCmd(boolean enabled) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.enableInstance);
parameters.put(JsonParameters.ENABLED, "" + enabled);
return parameters;
}
private Map<String, String> swapInstanceCmd(String oldInstance, String newInstance) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.swapInstance);
parameters.put(JsonParameters.OLD_INSTANCE, oldInstance);
parameters.put(JsonParameters.NEW_INSTANCE, newInstance);
return parameters;
}
@Test
public void testInstanceOperations() throws Exception {
final String clusterName = "clusterTestInstanceOperations";
// setup cluster
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 6, null);
addResource(clusterName, "db_11", 8);
rebalanceResource(clusterName, "db_11");
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_9900");
controller.syncStart();
// start mock nodes
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
HelixDataAccessor accessor;
// drop node should fail as not disabled
String instanceUrl = getInstanceUrl(clusterName, "localhost_1232");
deleteUrl(instanceUrl, true);
// disabled node
String response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
Assert.assertTrue(response.contains("false"));
// Cannot drop / swap
deleteUrl(instanceUrl, true);
String instancesUrl = getClusterUrl(clusterName) + "/instances";
response =
assertSuccessPostOperation(instancesUrl,
swapInstanceCmd("localhost_1232", "localhost_12320"), true);
// disconnect the node
participants.get("localhost_1232").syncStop();
// add new node then swap instance
response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost_12320"), false);
Assert.assertTrue(response.contains("localhost_12320"));
// swap instance. The instance get swapped out should not exist anymore
response =
assertSuccessPostOperation(instancesUrl,
swapInstanceCmd("localhost_1232", "localhost_12320"), false);
Assert.assertTrue(response.contains("localhost_12320"));
Assert.assertFalse(response.contains("localhost_1232\""));
accessor = participants.get("localhost_1231").getHelixDataAccessor();
String path = accessor.keyBuilder().instanceConfig("localhost_1232").getPath();
Assert.assertFalse(_gZkClient.exists(path));
MockParticipantManager newParticipant =
new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12320");
newParticipant.syncStart();
participants.put("localhost_12320", newParticipant);
boolean verifyResult =
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
Assert.assertTrue(verifyResult);
// clean up
controller.syncStop();
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
@Test
public void testStartCluster() throws Exception {
final String clusterName = "clusterTestStartCluster";
final String controllerClusterName = "controllerClusterTestStartCluster";
Map<String, MockParticipantManager> participants =
new HashMap<String, MockParticipantManager>();
Map<String, ClusterDistributedController> distControllers =
new HashMap<String, ClusterDistributedController>();
// setup cluster
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 6, null);
addResource(clusterName, "db_11", 8);
rebalanceResource(clusterName, "db_11");
addCluster(controllerClusterName);
addInstancesToCluster(controllerClusterName, "controller_900", 2, null);
// start mock nodes
for (int i = 0; i < 6; i++) {
String instanceName = "localhost_123" + i;
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
participants.put(instanceName, participant);
}
// start controller nodes
for (int i = 0; i < 2; i++) {
String controllerName = "controller_900" + i;
ClusterDistributedController distController =
new ClusterDistributedController(ZK_ADDR, controllerClusterName, controllerName);
distController.syncStart();
distControllers.put(controllerName, distController);
}
Thread.sleep(100);
// activate clusters
// wrong grand clustername
String clusterUrl = getClusterUrl(clusterName);
assertSuccessPostOperation(clusterUrl, activateClusterCmd("nonExistCluster", true), true);
// wrong cluster name
clusterUrl = getClusterUrl("nonExistCluster");
assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), true);
clusterUrl = getClusterUrl(clusterName);
assertSuccessPostOperation(clusterUrl, activateClusterCmd(controllerClusterName, true), false);
Thread.sleep(500);
deleteUrl(clusterUrl, true);
// verify leader node
HelixDataAccessor accessor = distControllers.get("controller_9001").getHelixDataAccessor();
LiveInstance controllerLeader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
Assert.assertTrue(controllerLeader.getInstanceName().startsWith("controller_900"));
accessor = participants.get("localhost_1232").getHelixDataAccessor();
LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
for (int i = 0; i < 5; i++) {
if (leader != null) {
break;
}
Thread.sleep(1000);
leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
}
Assert.assertTrue(leader.getInstanceName().startsWith("controller_900"));
boolean verifyResult =
ClusterStateVerifier
.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, clusterName));
Assert.assertTrue(verifyResult);
verifyResult =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(verifyResult);
Thread.sleep(1000);
// clean up
for (ClusterDistributedController controller : distControllers.values()) {
controller.syncStop();
}
for (MockParticipantManager participant : participants.values()) {
participant.syncStop();
}
}
private Map<String, String> rebalanceCmd(int replicas, String prefix, String tag) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.REPLICAS, "" + replicas);
if (prefix != null) {
parameters.put(JsonParameters.RESOURCE_KEY_PREFIX, prefix);
}
if (tag != null) {
parameters.put(ClusterSetup.instanceGroupTag, tag);
}
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.rebalance);
return parameters;
}
private void rebalanceResource(String clusterName, String resourceName) throws IOException {
String resourceUrl = getResourceUrl(clusterName, resourceName);
String idealStateUrl = resourceUrl + "/idealState";
assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
}
@Test
public void testRebalanceResource() throws Exception {
// add a normal cluster
final String clusterName = "clusterTestRebalanceResource";
addCluster(clusterName);
addInstancesToCluster(clusterName, "localhost:123", 3, _tag1);
addResource(clusterName, "db_11", 44);
String resourceUrl = getResourceUrl(clusterName, "db_11");
String idealStateUrl = resourceUrl + "/idealState";
String response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
ZNRecord record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
Assert.assertEquals(record.getListField("db_11_0").size(), 3);
Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
deleteUrl(resourceUrl, false);
// re-add and rebalance
final String reourcesUrl =
"http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
response = getUrl(reourcesUrl);
Assert.assertFalse(response.contains("db_11"));
addResource(clusterName, "db_11", 48);
idealStateUrl = resourceUrl + "/idealState";
response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(3, null, null), false);
record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_11"));
Assert.assertEquals(record.getListField("db_11_0").size(), 3);
Assert.assertEquals(record.getMapField("db_11_0").size(), 3);
// rebalance with key prefix
addResource(clusterName, "db_22", 55);
resourceUrl = getResourceUrl(clusterName, "db_22");
idealStateUrl = resourceUrl + "/idealState";
response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", null), false);
record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue(record.getId().equalsIgnoreCase("db_22"));
Assert.assertEquals(record.getListField("alias_0").size(), 2);
Assert.assertEquals(record.getMapField("alias_0").size(), 2);
Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0])))
.startsWith("alias_"));
Assert.assertFalse(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
addResource(clusterName, "db_33", 44);
resourceUrl = getResourceUrl(clusterName, "db_33");
idealStateUrl = resourceUrl + "/idealState";
response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, null, _tag1), false);
Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
Assert.assertTrue(response.contains(_tag1));
for (int i = 0; i < 6; i++) {
String instance = "localhost_123" + i;
if (i < 3) {
Assert.assertTrue(response.contains(instance));
} else {
Assert.assertFalse(response.contains(instance));
}
}
addResource(clusterName, "db_44", 44);
resourceUrl = getResourceUrl(clusterName, "db_44");
idealStateUrl = resourceUrl + "/idealState";
response = assertSuccessPostOperation(idealStateUrl, rebalanceCmd(2, "alias", _tag1), false);
Assert.assertTrue(response.contains(IdealStateProperty.INSTANCE_GROUP_TAG.toString()));
Assert.assertTrue(response.contains(_tag1));
record = JsonToObject(ZNRecord.class, response);
Assert.assertTrue((((String) (record.getMapFields().keySet().toArray()[0])))
.startsWith("alias_"));
for (int i = 0; i < 6; i++) {
String instance = "localhost_123" + i;
if (i < 3) {
Assert.assertTrue(response.contains(instance));
} else {
Assert.assertFalse(response.contains(instance));
}
}
}
private void addInstancesToCluster(String clusterName, String instanceNamePrefix, int n,
String tag) throws IOException {
Map<String, String> parameters = new HashMap<String, String>();
final String clusterUrl = getClusterUrl(clusterName);
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstance);
// add instances to cluster
String instancesUrl = clusterUrl + "/instances";
for (int i = 0; i < n; i++) {
parameters.put(JsonParameters.INSTANCE_NAME, instanceNamePrefix + i);
String response = assertSuccessPostOperation(instancesUrl, parameters, false);
Assert.assertTrue(response.contains((instanceNamePrefix + i).replace(':', '_')));
}
// add tag to instance
if (tag != null && !tag.isEmpty()) {
parameters.clear();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
parameters.put(ClusterSetup.instanceGroupTag, tag);
for (int i = 0; i < n; i++) {
String instanceUrl = instancesUrl + "/" + (instanceNamePrefix + i).replace(':', '_');
String response = assertSuccessPostOperation(instanceUrl, parameters, false);
Assert.assertTrue(response.contains(_tag1));
}
}
}
private Map<String, String> addInstanceTagCmd(String tag) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.addInstanceTag);
parameters.put(ClusterSetup.instanceGroupTag, tag);
return parameters;
}
private Map<String, String> removeInstanceTagCmd(String tag) {
Map<String, String> parameters = new HashMap<String, String>();
parameters.put(JsonParameters.MANAGEMENT_COMMAND, ClusterSetup.removeInstanceTag);
parameters.put(ClusterSetup.instanceGroupTag, tag);
return parameters;
}
@Test
public void testAddInstance() throws Exception {
final String clusterName = "clusterTestAddInstance";
// add normal cluster
addCluster(clusterName);
String clusterUrl = getClusterUrl(clusterName);
// Add instances to cluster
String instancesUrl = clusterUrl + "/instances";
addInstancesToCluster(clusterName, "localhost:123", 3, null);
String instances = "localhost:1233;localhost:1234;localhost:1235;localhost:1236";
String response = assertSuccessPostOperation(instancesUrl, addInstanceCmd(instances), false);
for (int i = 3; i <= 6; i++) {
Assert.assertTrue(response.contains("localhost_123" + i));
}
// delete one node without disable
String instanceUrl = instancesUrl + "/localhost_1236";
deleteUrl(instanceUrl, true);
response = getUrl(instancesUrl);
Assert.assertTrue(response.contains("localhost_1236"));
// delete non-exist node
instanceUrl = instancesUrl + "/localhost_12367";
deleteUrl(instanceUrl, true);
response = getUrl(instancesUrl);
Assert.assertFalse(response.contains("localhost_12367"));
// disable node
instanceUrl = instancesUrl + "/localhost_1236";
response = assertSuccessPostOperation(instanceUrl, enableInstanceCmd(false), false);
Assert.assertTrue(response.contains("false"));
deleteUrl(instanceUrl, false);
// add controller cluster
final String controllerClusterName = "controllerClusterTestAddInstance";
addCluster(controllerClusterName);
// add node to controller cluster
String controllers = "controller:9000;controller:9001";
String controllerUrl = getClusterUrl(controllerClusterName) + "/instances";
response = assertSuccessPostOperation(controllerUrl, addInstanceCmd(controllers), false);
Assert.assertTrue(response.contains("controller_9000"));
Assert.assertTrue(response.contains("controller_9001"));
// add a duplicated host
response = assertSuccessPostOperation(instancesUrl, addInstanceCmd("localhost:1234"), true);
// add/remove tags
for (int i = 0; i < 4; i++) {
instanceUrl = instancesUrl + "/localhost_123" + i;
response = assertSuccessPostOperation(instanceUrl, addInstanceTagCmd(_tag1), false);
Assert.assertTrue(response.contains(_tag1));
}
instanceUrl = instancesUrl + "/localhost_1233";
response = assertSuccessPostOperation(instanceUrl, removeInstanceTagCmd(_tag1), false);
Assert.assertFalse(response.contains(_tag1));
}
@Test
public void testGetResources() throws IOException {
final String clusterName = "TestTagAwareness_testGetResources";
final String TAG = "tag";
final String URL_BASE =
"http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/resourceGroups";
_gSetupTool.addCluster(clusterName, true);
HelixAdmin admin = _gSetupTool.getClusterManagementTool();
// Add a tagged resource
IdealState taggedResource = new IdealState("taggedResource");
taggedResource.setInstanceGroupTag(TAG);
taggedResource.setStateModelDefRef("OnlineOffline");
admin.addResource(clusterName, taggedResource.getId(), taggedResource);
// Add an untagged resource
IdealState untaggedResource = new IdealState("untaggedResource");
untaggedResource.setStateModelDefRef("OnlineOffline");
admin.addResource(clusterName, untaggedResource.getId(), untaggedResource);
// Now make a REST call for all resources
Reference resourceRef = new Reference(URL_BASE);
Request request = new Request(Method.GET, resourceRef);
Response response = _gClient.handle(request);
ZNRecord responseRecord =
ClusterRepresentationUtil.JsonToObject(ZNRecord.class, response.getEntityAsText());
// Ensure that the tagged resource has information and the untagged one doesn't
Assert.assertNotNull(responseRecord.getMapField("ResourceTags"));
Assert
.assertEquals(TAG, responseRecord.getMapField("ResourceTags").get(taggedResource.getId()));
Assert.assertFalse(responseRecord.getMapField("ResourceTags").containsKey(
untaggedResource.getId()));
}
@Test
public void testGetInstances() throws IOException {
final String clusterName = "TestTagAwareness_testGetResources";
final String[] TAGS = {
"tag1", "tag2"
};
final String URL_BASE =
"http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/instances";
_gSetupTool.addCluster(clusterName, true);
HelixAdmin admin = _gSetupTool.getClusterManagementTool();
// Add 4 participants, each with differint tag characteristics
InstanceConfig instance1 = new InstanceConfig("localhost_1");
instance1.addTag(TAGS[0]);
admin.addInstance(clusterName, instance1);
InstanceConfig instance2 = new InstanceConfig("localhost_2");
instance2.addTag(TAGS[1]);
admin.addInstance(clusterName, instance2);
InstanceConfig instance3 = new InstanceConfig("localhost_3");
instance3.addTag(TAGS[0]);
instance3.addTag(TAGS[1]);
admin.addInstance(clusterName, instance3);
InstanceConfig instance4 = new InstanceConfig("localhost_4");
admin.addInstance(clusterName, instance4);
// Now make a REST call for all resources
Reference resourceRef = new Reference(URL_BASE);
Request request = new Request(Method.GET, resourceRef);
Response response = _gClient.handle(request);
ListInstancesWrapper responseWrapper =
ClusterRepresentationUtil.JsonToObject(ListInstancesWrapper.class,
response.getEntityAsText());
Map<String, List<String>> tagInfo = responseWrapper.tagInfo;
// Ensure tag ownership is reported correctly
Assert.assertTrue(tagInfo.containsKey(TAGS[0]));
Assert.assertTrue(tagInfo.containsKey(TAGS[1]));
Assert.assertTrue(tagInfo.get(TAGS[0]).contains("localhost_1"));
Assert.assertFalse(tagInfo.get(TAGS[0]).contains("localhost_2"));
Assert.assertTrue(tagInfo.get(TAGS[0]).contains("localhost_3"));
Assert.assertFalse(tagInfo.get(TAGS[0]).contains("localhost_4"));
Assert.assertFalse(tagInfo.get(TAGS[1]).contains("localhost_1"));
Assert.assertTrue(tagInfo.get(TAGS[1]).contains("localhost_2"));
Assert.assertTrue(tagInfo.get(TAGS[1]).contains("localhost_3"));
Assert.assertFalse(tagInfo.get(TAGS[1]).contains("localhost_4"));
}
}