blob: 93c73fbc5493b6ee27a447d315e8d1a0fbf8a91d [file] [log] [blame]
package org.apache.helix.integration.spectator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.api.listeners.RoutingTableChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.spectator.RoutingTableSnapshot;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.mockito.internal.util.collections.Sets;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestRoutingTableProvider extends ZkTestBase {
static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
static final String TEST_DB = "TestDB";
static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
static final int PARTICIPANT_NUMBER = 3;
static final int PARTICIPANT_START_PORT = 12918;
static final long WAIT_DURATION = 5 * 1000L; // 5 seconds
static final int PARTITION_NUMBER = 20;
static final int REPLICA_NUMBER = 3;
private HelixManager _spectator;
private List<MockParticipantManager> _participants = new ArrayList<>();
private List<String> _instances = new ArrayList<>();
private ClusterControllerManager _controller;
private ZkHelixClusterVerifier _clusterVerifier;
private RoutingTableProvider _routingTableProvider_default;
private RoutingTableProvider _routingTableProvider_ev;
private RoutingTableProvider _routingTableProvider_cs;
private boolean _listenerTestResult = true;
private static final AtomicBoolean customizedViewChangeCalled = new AtomicBoolean(false);
class MockRoutingTableChangeListener implements RoutingTableChangeListener {
boolean routingTableChangeReceived = false;
@Override
public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
Set<String> masterInstances = new HashSet<>();
Set<String> slaveInstances = new HashSet<>();
for (InstanceConfig config : routingTableSnapshot
.getInstancesForResource(TEST_DB, "MASTER")) {
masterInstances.add(config.getInstanceName());
}
for (InstanceConfig config : routingTableSnapshot.getInstancesForResource(TEST_DB, "SLAVE")) {
slaveInstances.add(config.getInstanceName());
}
if (context != null && (!masterInstances.equals(Map.class.cast(context).get("MASTER"))
|| !slaveInstances.equals(Map.class.cast(context).get("SLAVE")))) {
_listenerTestResult = false;
} else {
_listenerTestResult = true;
}
routingTableChangeReceived = true;
}
}
class MockRoutingTableProvider extends RoutingTableProvider {
MockRoutingTableProvider(HelixManager helixManager, Map<PropertyType, List<String>> sourceDataTypes) {
super(helixManager, sourceDataTypes);
}
@Override
public void onCustomizedViewChange(List<CustomizedView> customizedViewList,
NotificationContext changeContext){
customizedViewChangeCalled.getAndSet(true);
super.onCustomizedViewChange(customizedViewList, changeContext);
}
}
@BeforeClass
public void beforeClass() throws Exception {
System.out.println(
"START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
// setup storage cluster
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
_instances.add(instance);
}
// start dummy participants
for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
participant.syncStart();
_participants.add(participant);
}
createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances,
STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER);
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
// start speculator
_spectator = HelixManagerFactory
.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
_spectator.connect();
_routingTableProvider_default = new RoutingTableProvider(_spectator);
_spectator.addExternalViewChangeListener(_routingTableProvider_default);
_spectator.addLiveInstanceChangeListener(_routingTableProvider_default);
_spectator.addInstanceConfigChangeListener(_routingTableProvider_default);
_routingTableProvider_ev = new RoutingTableProvider(_spectator);
_routingTableProvider_cs = new RoutingTableProvider(_spectator, PropertyType.CURRENTSTATES);
_clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
@AfterClass
public void afterClass() {
// stop participants
for (MockParticipantManager p : _participants) {
p.syncStop();
}
_controller.syncStop();
_routingTableProvider_default.shutdown();
_routingTableProvider_ev.shutdown();
_routingTableProvider_cs.shutdown();
_spectator.disconnect();
deleteCluster(CLUSTER_NAME);
}
@Test
public void testInvocation() throws Exception {
MockRoutingTableChangeListener routingTableChangeListener = new MockRoutingTableChangeListener();
_routingTableProvider_default
.addRoutingTableChangeListener(routingTableChangeListener, null, true);
// Add a routing table provider listener should trigger an execution of the
// listener callbacks
Assert.assertTrue(TestHelper.verify(() -> {
if (!routingTableChangeListener.routingTableChangeReceived) {
return false;
}
return true;
}, TestHelper.WAIT_DURATION));
}
@Test(dependsOnMethods = { "testInvocation" })
public void testRoutingTable() {
Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size());
validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(1), _instances.get(2)));
validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(1), _instances.get(2)));
validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(1), _instances.get(2)));
Collection<String> databases = _routingTableProvider_default.getResources();
Assert.assertEquals(databases.size(), 1);
}
@Test(dependsOnMethods = { "testRoutingTable" })
public void testDisableInstance() throws InterruptedException {
// disable the master instance
String prevMasterInstance = _instances.get(0);
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(1)),
Sets.newSet(_instances.get(2)));
validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(1)),
Sets.newSet(_instances.get(2)));
validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(1)),
Sets.newSet(_instances.get(2)));
}
@Test(dependsOnMethods = { "testDisableInstance" })
public void testRoutingTableListener() throws InterruptedException {
RoutingTableChangeListener routingTableChangeListener = new MockRoutingTableChangeListener();
Map<String, Set<String>> context = new HashMap<>();
context.put("MASTER", Sets.newSet(_instances.get(0)));
context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
_routingTableProvider_default
.addRoutingTableChangeListener(routingTableChangeListener, context, true);
_routingTableProvider_default
.addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null, true);
// reenable the master instance to cause change
String prevMasterInstance = _instances.get(0);
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertTrue(_listenerTestResult);
}
@Test(dependsOnMethods = { "testRoutingTableListener" })
public void testShutdownInstance() throws InterruptedException {
// shutdown second instance.
_participants.get(1).syncStop();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size() - 1);
Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size() - 1);
Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size());
Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size() - 1);
Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size());
validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(2)));
validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(2)));
validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)),
Sets.newSet(_instances.get(2)));
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testShutdownInstance")
public void testExternalViewWithType() {
Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
sourceDataTypes.put(PropertyType.EXTERNALVIEW, Arrays.asList("typeA"));
sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
RoutingTableProvider routingTableProvider;
routingTableProvider = new RoutingTableProvider(_spectator, sourceDataTypes);
}
@Test(dependsOnMethods = "testExternalViewWithType", expectedExceptions = HelixException.class)
public void testCustomizedViewWithoutType() {
RoutingTableProvider routingTableProvider;
routingTableProvider = new RoutingTableProvider(_spectator, PropertyType.CUSTOMIZEDVIEW);
}
@Test(dependsOnMethods = "testCustomizedViewWithoutType")
public void testCustomizedViewCorrectConstructor() throws Exception {
Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA"));
MockRoutingTableProvider routingTableProvider =
new MockRoutingTableProvider(_spectator, sourceDataTypes);
CustomizedView customizedView = new CustomizedView(TEST_DB);
customizedView.setState("p1", "h1", "testState");
// Clear the flag before writing to the Customized View Path
customizedViewChangeCalled.getAndSet(false);
String customizedViewPath = PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", TEST_DB);
_spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath,
customizedView.getRecord(), AccessOption.PERSISTENT);
boolean onCustomizedViewChangeCalled =
TestHelper.verify(() -> customizedViewChangeCalled.get(), WAIT_DURATION);
Assert.assertTrue(onCustomizedViewChangeCalled);
_spectator.getHelixDataAccessor().getBaseDataAccessor().remove(customizedViewPath,
AccessOption.PERSISTENT);
routingTableProvider.shutdown();
}
@Test(dependsOnMethods = "testCustomizedViewCorrectConstructor")
public void testGetRoutingTableSnapshot() throws Exception {
Map<PropertyType, List<String>> sourceDataTypes = new HashMap<>();
sourceDataTypes.put(PropertyType.CUSTOMIZEDVIEW, Arrays.asList("typeA", "typeB"));
sourceDataTypes.put(PropertyType.EXTERNALVIEW, Collections.emptyList());
RoutingTableProvider routingTableProvider =
new RoutingTableProvider(_spectator, sourceDataTypes);
CustomizedView customizedView1 = new CustomizedView("Resource1");
customizedView1.setState("p1", "h1", "testState1");
customizedView1.setState("p1", "h2", "testState1");
customizedView1.setState("p2", "h1", "testState2");
customizedView1.setState("p3", "h2", "testState3");
String customizedViewPath1 =
PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeA", "Resource1");
CustomizedView customizedView2 = new CustomizedView("Resource2");
customizedView2.setState("p1", "h3", "testState3");
customizedView2.setState("p1", "h4", "testState2");
String customizedViewPath2 =
PropertyPathBuilder.customizedView(CLUSTER_NAME, "typeB", "Resource2");
_spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath1,
customizedView1.getRecord(), AccessOption.PERSISTENT);
_spectator.getHelixDataAccessor().getBaseDataAccessor().set(customizedViewPath2,
customizedView2.getRecord(), AccessOption.PERSISTENT);
RoutingTableSnapshot routingTableSnapshot =
routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeA");
Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeA");
routingTableSnapshot =
routingTableProvider.getRoutingTableSnapshot(PropertyType.CUSTOMIZEDVIEW, "typeB");
Assert.assertEquals(routingTableSnapshot.getPropertyType(), PropertyType.CUSTOMIZEDVIEW);
Assert.assertEquals(routingTableSnapshot.getCustomizedStateType(), "typeB");
// Make sure snapshot information is correct
// Check resources are in a correct state
boolean isRoutingTableUpdatedProperly = TestHelper.verify(() -> {
Map<String, Map<String, RoutingTableSnapshot>> routingTableSnapshots =
routingTableProvider.getRoutingTableSnapshots();
RoutingTableSnapshot routingTableSnapshotTypeA =
routingTableSnapshots.get(PropertyType.CUSTOMIZEDVIEW.name()).get("typeA");
RoutingTableSnapshot routingTableSnapshotTypeB =
routingTableSnapshots.get(PropertyType.CUSTOMIZEDVIEW.name()).get("typeB");
String typeAp1h1 = "noState";
String typeAp1h2 = "noState";
String typeAp2h1 = "noState";
String typeAp3h2 = "noState";
String typeBp1h2 = "noState";
String typeBp1h4 = "noState";
try {
typeAp1h1 = routingTableSnapshotTypeA.getCustomizeViews().iterator().next()
.getStateMap("p1").get("h1");
typeAp1h2 = routingTableSnapshotTypeA.getCustomizeViews().iterator().next()
.getStateMap("p1").get("h2");
typeAp2h1 = routingTableSnapshotTypeA.getCustomizeViews().iterator().next()
.getStateMap("p2").get("h1");
typeAp3h2 = routingTableSnapshotTypeA.getCustomizeViews().iterator().next()
.getStateMap("p3").get("h2");
typeBp1h2 = routingTableSnapshotTypeB.getCustomizeViews().iterator().next()
.getStateMap("p1").get("h3");
typeBp1h4 = routingTableSnapshotTypeB.getCustomizeViews().iterator().next()
.getStateMap("p1").get("h4");
} catch (Exception e) {
// ok because RoutingTable has not been updated yet
return false;
}
return (routingTableSnapshots.size() == 2
&& routingTableSnapshots.get(PropertyType.CUSTOMIZEDVIEW.name()).size() == 2
&& typeAp1h1.equals("testState1") && typeAp1h2.equals("testState1")
&& typeAp2h1.equals("testState2") && typeAp3h2.equals("testState3")
&& typeBp1h2.equals("testState3") && typeBp1h4.equals("testState2"));
}, WAIT_DURATION);
Assert.assertTrue(isRoutingTableUpdatedProperly, "RoutingTable has been updated properly");
routingTableProvider.shutdown();
}
private void validateRoutingTable(RoutingTableProvider routingTableProvider,
Set<String> masterNodes, Set<String> slaveNodes) {
IdealState is =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
for (String p : is.getPartitionSet()) {
Set<String> masterInstances = new HashSet<>();
for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) {
masterInstances.add(config.getInstanceName());
}
Set<String> slaveInstances = new HashSet<>();
for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) {
slaveInstances.add(config.getInstanceName());
}
Assert.assertEquals(masterInstances, masterNodes);
Assert.assertEquals(slaveInstances, slaveNodes);
}
}
}