blob: a61ffea9acea027953bda38444ae7d6d6d86c918 [file] [log] [blame]
package org.apache.helix.integration.messaging;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestMessageThrottle2 extends ZkTestBase {
private final static String _clusterName = "TestMessageThrottle2";
private final static String _resourceName = "MyResource";
private HelixManager _helixController;
@Test
public void test() throws Exception {
System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
// Keep mock participant references so that they could be shut down after testing
Set<MyProcess> participants = new HashSet<>();
startAdmin();
startController();
// start node2 first
participants.add(Node.main(new String[] {
"2"
}));
// wait for node2 becoming MASTER
final Builder keyBuilder = new Builder(_clusterName);
final HelixDataAccessor accessor =
new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
TestHelper.verify(() -> {
ExternalView view = accessor.getProperty(keyBuilder.externalView(_resourceName));
String state = null;
if (view != null) {
Map<String, String> map = view.getStateMap(_resourceName);
if (map != null) {
state = map.get("node2");
}
}
return state != null && state.equals("MASTER");
}, 10 * 1000);
// start node 1
participants.add(Node.main(new String[] {
"1"
}));
boolean result = ClusterStateVerifier
.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
Assert.assertTrue(result);
// Clean up after testing
_helixController.disconnect();
participants.forEach(MyProcess::stop);
deleteCluster(_clusterName);
System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
}
private void startController() throws Exception {
// start _helixController
System.out.println(String.format("Starting Controller{Cluster:%s, Port:%s, Zookeeper:%s}",
_clusterName, 12000, ZK_ADDR));
_helixController = HelixControllerMain.startHelixController(ZK_ADDR, _clusterName,
"localhost_" + 12000, HelixControllerMain.STANDALONE);
// StatusPrinter statusPrinter = new StatusPrinter();
// statusPrinter.registerWith(_helixController);
}
private void startAdmin() {
HelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
// create cluster
System.out.println("Creating cluster: " + _clusterName);
admin.addCluster(_clusterName, true);
// add MasterSlave state mode definition
admin.addStateModelDef(_clusterName, "MasterSlave",
new StateModelDefinition(generateConfigForMasterSlave()));
// ideal-state znrecord
ZNRecord record = new ZNRecord(_resourceName);
record.setSimpleField("IDEAL_STATE_MODE", "AUTO");
record.setSimpleField("NUM_PARTITIONS", "1");
record.setSimpleField("REPLICAS", "2");
record.setSimpleField("STATE_MODEL_DEF_REF", "MasterSlave");
record.setListField(_resourceName, Arrays.asList("node1", "node2"));
admin.setResourceIdealState(_clusterName, _resourceName, new IdealState(record));
ConstraintItemBuilder builder = new ConstraintItemBuilder();
// limit one transition message at a time across the entire cluster
builder.addConstraintAttribute("MESSAGE_TYPE", "STATE_TRANSITION")
// .addConstraintAttribute("INSTANCE", ".*") // un-comment this line if using instance-level
// constraint
.addConstraintAttribute("CONSTRAINT_VALUE", "1");
admin.setConstraint(_clusterName, ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT,
"constraint1", builder.build());
}
private ZNRecord generateConfigForMasterSlave() {
ZNRecord record = new ZNRecord("MasterSlave");
record.setSimpleField(
StateModelDefinition.StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE");
List<String> statePriorityList = new ArrayList<>();
statePriorityList.add("MASTER");
statePriorityList.add("SLAVE");
statePriorityList.add("OFFLINE");
statePriorityList.add("DROPPED");
statePriorityList.add("ERROR");
record.setListField(
StateModelDefinition.StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
statePriorityList);
for (String state : statePriorityList) {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<>();
switch (state) {
case "MASTER":
metadata.put("count", "1");
record.setMapField(key, metadata);
break;
case "SLAVE":
metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
record.setMapField(key, metadata);
break;
case "OFFLINE":
case "DROPPED":
case "ERROR":
metadata.put("count", "-1");
record.setMapField(key, metadata);
break;
}
}
for (String state : statePriorityList) {
String key = state + ".next";
switch (state) {
case "MASTER": {
Map<String, String> metadata = new HashMap<>();
metadata.put("SLAVE", "SLAVE");
metadata.put("OFFLINE", "SLAVE");
metadata.put("DROPPED", "SLAVE");
record.setMapField(key, metadata);
break;
}
case "SLAVE": {
Map<String, String> metadata = new HashMap<>();
metadata.put("MASTER", "MASTER");
metadata.put("OFFLINE", "OFFLINE");
metadata.put("DROPPED", "OFFLINE");
record.setMapField(key, metadata);
break;
}
case "OFFLINE": {
Map<String, String> metadata = new HashMap<>();
metadata.put("SLAVE", "SLAVE");
metadata.put("MASTER", "SLAVE");
metadata.put("DROPPED", "DROPPED");
record.setMapField(key, metadata);
break;
}
case "ERROR": {
Map<String, String> metadata = new HashMap<>();
metadata.put("OFFLINE", "OFFLINE");
record.setMapField(key, metadata);
break;
}
}
}
// change the transition priority list
List<String> stateTransitionPriorityList = new ArrayList<>();
stateTransitionPriorityList.add("SLAVE-MASTER");
stateTransitionPriorityList.add("OFFLINE-SLAVE");
stateTransitionPriorityList.add("MASTER-SLAVE");
stateTransitionPriorityList.add("SLAVE-OFFLINE");
stateTransitionPriorityList.add("OFFLINE-DROPPED");
record.setListField(
StateModelDefinition.StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
stateTransitionPriorityList);
return record;
}
static final class MyProcess {
private final String _instanceName;
private HelixManager _helixManager;
public MyProcess(String instanceName) {
this._instanceName = instanceName;
}
public void start() throws Exception {
_helixManager =
new ZKHelixManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, ZK_ADDR);
{
// hack to set sessionTimeout
Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout");
sessionTimeout.setAccessible(true);
sessionTimeout.setInt(_helixManager, 1000);
}
StateMachineEngine stateMach = _helixManager.getStateMachineEngine();
stateMach.registerStateModelFactory("MasterSlave", new MyStateModelFactory(_helixManager));
_helixManager.connect();
// StatusPrinter statusPrinter = new StatusPrinter();
// statusPrinter.registerWith(_helixManager);
}
public void stop() {
_helixManager.disconnect();
}
}
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "ERROR"
})
public static class MyStateModel extends StateModel {
private static final Logger LOGGER = LoggerFactory.getLogger(MyStateModel.class);
private final HelixManager helixManager;
public MyStateModel(HelixManager helixManager) {
this.helixManager = helixManager;
}
@Transition(to = "SLAVE", from = "OFFLINE")
public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes SLAVE from OFFLINE for " + partitionName);
}
@Transition(to = "SLAVE", from = "MASTER")
public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes SLAVE from MASTER for " + partitionName);
}
@Transition(to = "MASTER", from = "SLAVE")
public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes MASTER from SLAVE for " + partitionName);
}
@Transition(to = "OFFLINE", from = "SLAVE")
public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes OFFLINE from SLAVE for " + partitionName);
}
@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes DROPPED from OFFLINE for " + partitionName);
}
@Transition(to = "OFFLINE", from = "ERROR")
public void onBecomeOfflineFromError(Message message, NotificationContext context) {
String partitionName = message.getPartitionName();
String instanceName = message.getTgtName();
LOGGER.info(instanceName + " becomes OFFLINE from ERROR for " + partitionName);
}
}
static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
private final HelixManager helixManager;
public MyStateModelFactory(HelixManager helixManager) {
this.helixManager = helixManager;
}
@Override
public MyStateModel createNewStateModel(String resource, String partitionName) {
return new MyStateModel(helixManager);
}
}
static class Node {
// ------------------------------ FIELDS ------------------------------
private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);
// -------------------------- INNER CLASSES --------------------------
// --------------------------- main() method ---------------------------
public static MyProcess main(String[] args) throws Exception {
if (args.length < 1) {
LOGGER.info("usage: id");
System.exit(0);
}
int id = Integer.parseInt(args[0]);
String instanceName = "node" + id;
addInstanceConfig(instanceName);
// Return the thread so that it could be interrupted after testing
return startProcess(instanceName);
}
private static void addInstanceConfig(String instanceName) {
// add node to cluster if not already added
ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
InstanceConfig instanceConfig = null;
try {
instanceConfig = admin.getInstanceConfig(_clusterName, instanceName);
} catch (Exception ignored) {
}
if (instanceConfig == null) {
InstanceConfig config = new InstanceConfig(instanceName);
config.setHostName("localhost");
config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
echo("Adding InstanceConfig:" + config);
admin.addInstance(_clusterName, config);
}
}
public static void echo(Object obj) {
LOGGER.info(obj.toString());
}
private static MyProcess startProcess(String instanceName) throws Exception {
MyProcess process = new MyProcess(instanceName);
process.start();
return process;
}
}
static class StatusPrinter implements IdealStateChangeListener, InstanceConfigChangeListener,
ExternalViewChangeListener, LiveInstanceChangeListener, ControllerChangeListener {
// ------------------------------ FIELDS ------------------------------
// ------------------------ INTERFACE METHODS ------------------------
// --------------------- Interface ControllerChangeListener
// ---------------------
@Override
public void onControllerChange(NotificationContext changeContext) {
System.out.println("StatusPrinter.onControllerChange:" + changeContext);
}
// --------------------- Interface ExternalViewChangeListener
// ---------------------
@Override
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
for (ExternalView externalView : externalViewList) {
System.out
.println("StatusPrinter.onExternalViewChange:" + "externalView = " + externalView);
}
}
// --------------------- Interface IdealStateChangeListener
// ---------------------
@Override
public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext) {
for (IdealState state : idealState) {
System.out.println("StatusPrinter.onIdealStateChange:" + "state = " + state);
}
}
// --------------------- Interface InstanceConfigChangeListener
// ---------------------
@Override
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
for (InstanceConfig instanceConfig : instanceConfigs) {
System.out.println(
"StatusPrinter.onInstanceConfigChange:" + "instanceConfig = " + instanceConfig);
}
}
// --------------------- Interface LiveInstanceChangeListener
// ---------------------
@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
for (LiveInstance liveInstance : liveInstances) {
System.out
.println("StatusPrinter.onLiveInstanceChange:" + "liveInstance = " + liveInstance);
}
}
// -------------------------- OTHER METHODS --------------------------
void registerWith(HelixManager helixManager) throws Exception {
helixManager.addIdealStateChangeListener(this);
helixManager.addInstanceConfigChangeListener(this);
helixManager.addExternalViewChangeListener(this);
helixManager.addLiveInstanceChangeListener(this);
helixManager.addControllerListener(this);
}
}
}