blob: d1dfee853fcae9d6a327146d9c69f7086f85c83d [file] [log] [blame]
package org.apache.helix.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Date;
import org.apache.helix.HelixAdmin;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestAddClusterV2 extends ZkTestBase {
private static final int CLUSTER_NR = 10;
protected static final int NODE_NR = 5;
protected static final int START_PORT = 12918;
protected static final String STATE_MODEL = "MasterSlave";
protected final String CLASS_NAME = getShortClassName();
private final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
protected static final String TEST_DB = "TestDB";
MockParticipantManager[] _participants = new MockParticipantManager[NODE_NR];
private ClusterDistributedController[] _distControllers =
new ClusterDistributedController[NODE_NR];
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
// setup CONTROLLER_CLUSTER
_gSetupTool.addCluster(CONTROLLER_CLUSTER, true);
for (int i = 0; i < NODE_NR; i++) {
String controllerName = CONTROLLER_PREFIX + "_" + i;
_gSetupTool.addInstanceToCluster(CONTROLLER_CLUSTER, controllerName);
}
// setup cluster of clusters
for (int i = 0; i < CLUSTER_NR; i++) {
String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
_gSetupTool.addCluster(clusterName, true);
_gSetupTool.activateCluster(clusterName, CONTROLLER_CLUSTER, true);
}
final String firstCluster = CLUSTER_PREFIX + "_" + CLASS_NAME + "_0";
setupStorageCluster(_gSetupTool, firstCluster, TEST_DB, 20, PARTICIPANT_PREFIX, START_PORT,
"MasterSlave", 3, true);
// start dummy participants for the first cluster
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, firstCluster, instanceName);
_participants[i].syncStart();
}
// start distributed cluster controllers
for (int i = 0; i < NODE_NR; i++) {
String controllerName = CONTROLLER_PREFIX + "_" + i;
_distControllers[i] =
new ClusterDistributedController(ZK_ADDR, CONTROLLER_CLUSTER, controllerName);
_distControllers[i].syncStart();
}
verifyClusters();
}
@Test
public void Test() {
// Verify the super cluster resources are all rebalanced by the WAGED rebalancer.
HelixAdmin admin = _gSetupTool.getClusterManagementTool();
for (String clusterName : admin.getResourcesInCluster(CONTROLLER_CLUSTER)) {
IdealState is = _gSetupTool.getClusterManagementTool()
.getResourceIdealState(CONTROLLER_CLUSTER, clusterName);
Assert.assertEquals(is.getRebalancerClassName(), WagedRebalancer.class.getName());
}
}
@AfterClass
public void afterClass() throws Exception {
System.out.println("AFTERCLASS " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
/**
* shutdown order:
* 1) pause the leader (optional)
* 2) disconnect all controllers
* 3) disconnect leader/disconnect participant
*/
String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
int leaderIdx = -1;
for (int i = 0; i < NODE_NR; i++) {
if (!_distControllers[i].getInstanceName().equals(leader)) {
_distControllers[i].syncStop();
verifyClusters();
} else {
leaderIdx = i;
}
}
Assert.assertNotSame(leaderIdx, -1);
_distControllers[leaderIdx].syncStop();
for (int i = 0; i < NODE_NR; i++) {
_participants[i].syncStop();
}
// delete clusters
for (int i = 0; i < CLUSTER_NR; i++) {
String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
deleteCluster(clusterName);
}
deleteCluster(CONTROLLER_CLUSTER);
System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
}
/**
* verify the external view (against the best possible state)
* in the controller cluster and the first cluster
*/
private void verifyClusters() {
ZkHelixClusterVerifier _clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(CONTROLLER_CLUSTER).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
_clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_PREFIX + "_" + CLASS_NAME + "_0")
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.setZkClient(_gZkClient).build();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
protected void setupStorageCluster(ClusterSetup setupTool, String clusterName, String dbName,
int partitionNr, String prefix, int startPort, String stateModel, int replica,
boolean rebalance) {
setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
for (int i = 0; i < NODE_NR; i++) {
String instanceName = prefix + "_" + (startPort + i);
setupTool.addInstanceToCluster(clusterName, instanceName);
}
if (rebalance) {
setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
}
}
}