blob: 13b88ca8cefc7949a199e39965246625eaf7d191 [file] [log] [blame]
package org.apache.s4.tools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.ConfigScope;
import org.apache.helix.ConfigScopeBuilder;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.IdealState.IdealStateModeProperty;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
public class RebalanceTask extends S4ArgsBase {
public static void main(String[] args) {
RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
Tools.parseArgs(taskArgs, args);
HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
// This does the assignment of partition to nodes. It uses a modified
// version of consistent hashing to distribute active partitions and standbys
// equally among nodes.
IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
List<String> instancesInGroup = new ArrayList<String>();
List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
for (String instanceName : instancesInCluster) {
InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
if (nodeGroup.equals(taskArgs.nodeGroup)) {
instancesInGroup.add(instanceName);
}
}
admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
static class RebalanceTaskArgs extends S4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
@Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
String clusterName;
@Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
String taskId;
@Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
String nodeGroup = "default";
}
}