blob: 5d1df0a02c021765c18ac284b49a277f09f78c7c [file] [log] [blame]
package org.apache.helix.examples;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.ZkServer;
public class Quickstart {
private static final String ZK_ADDRESS = "localhost:2199";
private static final String CLUSTER_NAME = "HELIX_QUICKSTART";
private static final String RESOURCE_NAME = "MyResource";
private static final int NUM_PARTITIONS = 6;
private static final int NUM_REPLICAS = 2;
private static final String STATE_MODEL_NAME = "MyStateModel";
private static int NUM_NODES = 2;
// states
private static final String STANDBY = "STANDBY";
private static final String OFFLINE = "OFFLINE";
private static final String LEADER = "LEADER";
private static final String DROPPED = "DROPPED";
private static final List<InstanceConfig> INSTANCE_CONFIG_LIST;
private static final List<MyProcess> PROCESS_LIST;
private static HelixAdmin admin;
static {
INSTANCE_CONFIG_LIST = new ArrayList<>();
PROCESS_LIST = new ArrayList<>();
for (int i = 0; i < NUM_NODES; i++) {
int port = 12000 + i;
InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("" + port);
instanceConfig.setInstanceEnabled(true);
INSTANCE_CONFIG_LIST.add(instanceConfig);
}
}
public static void setup() {
admin = new ZKHelixAdmin(ZK_ADDRESS);
// create cluster
echo("Creating cluster: " + CLUSTER_NAME);
admin.addCluster(CLUSTER_NAME, true);
// Add nodes to the cluster
echo("Adding " + NUM_NODES + " participants to the cluster");
for (int i = 0; i < NUM_NODES; i++) {
admin.addInstance(CLUSTER_NAME, INSTANCE_CONFIG_LIST.get(i));
echo("\t Added participant: " + INSTANCE_CONFIG_LIST.get(i).getInstanceName());
}
// Add a state model
StateModelDefinition myStateModel = defineStateModel();
echo("Configuring StateModel: " + "MyStateModel with 1 Leader and 1 Standby");
admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, myStateModel);
// Add a resource with 6 partitions and 2 replicas
echo("Adding a resource MyResource: " + "with 6 partitions and 2 replicas");
admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, STATE_MODEL_NAME, "AUTO");
// this will set up the ideal state, it calculates the preference list for
// each partition similar to consistent hashing
admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);
}
private static StateModelDefinition defineStateModel() {
StateModelDefinition.Builder builder = new StateModelDefinition.Builder(STATE_MODEL_NAME);
// Add states and their rank to indicate priority. Lower the rank higher the
// priority
builder.addState(LEADER, 1);
builder.addState(STANDBY, 2);
builder.addState(OFFLINE);
builder.addState(DROPPED);
// Set the initial state when the node starts
builder.initialState(OFFLINE);
// Add transitions between the states.
builder.addTransition(OFFLINE, STANDBY);
builder.addTransition(STANDBY, OFFLINE);
builder.addTransition(STANDBY, LEADER);
builder.addTransition(LEADER, STANDBY);
builder.addTransition(OFFLINE, DROPPED);
// set constraints on states.
// static constraint
builder.upperBound(LEADER, 1);
// dynamic constraint, R means it should be derived based on the replication
// factor.
builder.dynamicUpperBound(STANDBY, "R");
StateModelDefinition statemodelDefinition = builder.build();
return statemodelDefinition;
}
public static void startController() {
// start controller
echo("Starting Helix Controller");
HelixControllerMain.startHelixController(ZK_ADDRESS, CLUSTER_NAME, "localhost_9100",
HelixControllerMain.STANDALONE);
}
public static void startNodes() throws Exception {
echo("Starting Participants");
for (int i = 0; i < NUM_NODES; i++) {
MyProcess process = new MyProcess(INSTANCE_CONFIG_LIST.get(i).getId());
PROCESS_LIST.add(process);
process.start();
echo("\t Started Participant: " + INSTANCE_CONFIG_LIST.get(i).getId());
}
}
public static void startZookeeper() {
echo("STARTING Zookeeper at " + ZK_ADDRESS);
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
@Override
public void createDefaultNameSpace(ZkClient zkClient) {
}
};
new File("/tmp/helix-quickstart").mkdirs();
// start zookeeper
ZkServer server =
new ZkServer("/tmp/helix-quickstart/dataDir", "/tmp/helix-quickstart/logDir",
defaultNameSpace, 2199);
server.start();
}
public static void echo(Object obj) {
System.out.println(obj);
}
public static void main(String[] args) throws Exception {
startZookeeper();
setup();
startNodes();
startController();
Thread.sleep(5000);
printState("After starting 2 nodes");
addNode();
Thread.sleep(5000);
printState("After adding a third node");
stopNode();
Thread.sleep(5000);
printState("After the 3rd node stops/crashes");
Thread.currentThread().join();
System.exit(0);
}
private static void addNode() throws Exception {
NUM_NODES = NUM_NODES + 1;
int port = 12000 + NUM_NODES - 1;
InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("" + port);
instanceConfig.setInstanceEnabled(true);
echo("ADDING NEW NODE :" + instanceConfig.getInstanceName()
+ ". Partitions will move from old nodes to the new node.");
admin.addInstance(CLUSTER_NAME, instanceConfig);
INSTANCE_CONFIG_LIST.add(instanceConfig);
MyProcess process = new MyProcess(instanceConfig.getInstanceName());
PROCESS_LIST.add(process);
admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, 3);
process.start();
}
private static void stopNode() {
int nodeId = NUM_NODES - 1;
echo("STOPPING " + INSTANCE_CONFIG_LIST.get(nodeId).getInstanceName()
+ ". Leadership will be transferred to the remaining nodes");
PROCESS_LIST.get(nodeId).stop();
}
private static void printState(String msg) {
System.out.println("CLUSTER STATE: " + msg);
ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
TreeSet<String> sortedSet = new TreeSet<String>(resourceExternalView.getPartitionSet());
StringBuilder sb = new StringBuilder("\t\t");
for (int i = 0; i < NUM_NODES; i++) {
sb.append(INSTANCE_CONFIG_LIST.get(i).getInstanceName()).append("\t");
}
System.out.println(sb);
for (String partitionName : sortedSet) {
sb.delete(0, sb.length() - 1);
sb.append(partitionName).append("\t");
for (int i = 0; i < NUM_NODES; i++) {
Map<String, String> stateMap = resourceExternalView.getStateMap(partitionName);
if (stateMap != null && stateMap.containsKey(INSTANCE_CONFIG_LIST.get(i).getInstanceName())) {
sb.append(stateMap.get(INSTANCE_CONFIG_LIST.get(i).getInstanceName()).charAt(0)).append(
"\t\t");
} else {
sb.append("-").append("\t\t");
}
}
System.out.println(sb);
}
System.out.println("###################################################################");
}
static final class MyProcess {
private final String instanceName;
private HelixManager manager;
public MyProcess(String instanceName) {
this.instanceName = instanceName;
}
public void start() throws Exception {
manager =
HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
InstanceType.PARTICIPANT, ZK_ADDRESS);
LeaderStandbyStateModelFactory stateModelFactory =
new LeaderStandbyStateModelFactory(instanceName);
StateMachineEngine stateMach = manager.getStateMachineEngine();
stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
manager.connect();
}
public void stop() {
manager.disconnect();
}
}
}