blob: 52a2524a1ef7413de19bf9ad4b8e099c43af6dd3 [file] [log] [blame]
package org.apache.helix.tools.commandtools;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* CLI for operating workflows and jobs.
* This is a wrapper of TaskDriver instance to allow command line changes of workflows and jobs.
*/
public class TaskAdmin {
/** For logging */
private static final Logger LOG = LoggerFactory.getLogger(TaskAdmin.class);
/** Required option name for Helix endpoint */
private static final String ZK_ADDRESS = "zk";
/** Required option name for cluster against which to run task */
private static final String CLUSTER_NAME_OPTION = "cluster";
/** Required option name for task resource within target cluster */
private static final String RESOURCE_OPTION = "resource";
/** Field for specifying a workflow file when starting a job */
private static final String WORKFLOW_FILE_OPTION = "file";
/**
* Parses the first argument as a driver command and the rest of the
* arguments are parsed based on that command. Constructs a Helix
* message and posts it to the controller
*/
public static void main(String[] args) throws Exception {
String[] cmdArgs = Arrays.copyOfRange(args, 1, args.length);
CommandLine cl = parseOptions(cmdArgs, constructOptions(), args[0]);
String zkAddr = cl.getOptionValue(ZK_ADDRESS);
String clusterName = cl.getOptionValue(CLUSTER_NAME_OPTION);
String workflow = cl.getOptionValue(RESOURCE_OPTION);
if (zkAddr == null || clusterName == null || workflow == null) {
printUsage(constructOptions(), "[cmd]");
throw new IllegalArgumentException(
"zk, cluster, and resource must all be non-null for all commands");
}
HelixManager helixMgr =
HelixManagerFactory.getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR,
zkAddr);
helixMgr.connect();
TaskDriver driver = new TaskDriver(helixMgr);
TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(args[0]);
switch (cmd) {
case start:
if (cl.hasOption(WORKFLOW_FILE_OPTION)) {
driver.start(Workflow.parse(new File(cl.getOptionValue(WORKFLOW_FILE_OPTION))));
} else {
throw new IllegalArgumentException("Workflow file is required to start flow.");
}
break;
case stop:
driver.stop(workflow);
break;
case resume:
driver.resume(workflow);
break;
case delete:
driver.delete(workflow);
break;
case list:
list(driver, workflow);
break;
case flush:
driver.flushQueue(workflow);
break;
case clean:
driver.cleanupQueue(workflow);
break;
default:
throw new IllegalArgumentException("Unknown command " + args[0]);
}
helixMgr.disconnect();
}
private static void list(TaskDriver taskDriver, String workflow) {
WorkflowConfig wCfg = taskDriver.getWorkflowConfig(workflow);
if (wCfg == null) {
LOG.error("Workflow " + workflow + " does not exist!");
return;
}
WorkflowContext wCtx = taskDriver.getWorkflowContext(workflow);
LOG.info("Workflow " + workflow + " consists of the following tasks: " + wCfg.getJobDag()
.getAllNodes());
String workflowState =
(wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name();
LOG.info("Current state of workflow is " + workflowState);
LOG.info("Job states are: ");
LOG.info("-------");
for (String job : wCfg.getJobDag().getAllNodes()) {
TaskState jobState = (wCtx != null) ? wCtx.getJobState(job) : TaskState.NOT_STARTED;
LOG.info("Job " + job + " is " + jobState);
// fetch job information
JobConfig jCfg = taskDriver.getJobConfig(job);
JobContext jCtx = taskDriver.getJobContext(job);
if (jCfg == null || jCtx == null) {
LOG.info("-------");
continue;
}
// calculate taskPartitions
List<Integer> partitions = Lists.newArrayList(jCtx.getPartitionSet());
Collections.sort(partitions);
// report status
for (Integer partition : partitions) {
String taskId = jCtx.getTaskIdForPartition(partition);
taskId = (taskId != null) ? taskId : jCtx.getTargetForPartition(partition);
LOG.info("Task: " + taskId);
TaskConfig taskConfig = jCfg.getTaskConfig(taskId);
if (taskConfig != null) {
LOG.info("Configuration: " + taskConfig.getConfigMap());
}
TaskPartitionState state = jCtx.getPartitionState(partition);
state = (state != null) ? state : TaskPartitionState.INIT;
LOG.info("State: " + state);
String assignedParticipant = jCtx.getAssignedParticipant(partition);
if (assignedParticipant != null) {
LOG.info("Assigned participant: " + assignedParticipant);
}
LOG.info("-------");
}
LOG.info("-------");
}
}
/** Constructs option group containing options required by all drivable jobs */
@SuppressWarnings("static-access")
private static OptionGroup contructGenericRequiredOptionGroup() {
Option zkAddressOption =
OptionBuilder.isRequired().withLongOpt(ZK_ADDRESS)
.withDescription("ZK address managing cluster").create();
zkAddressOption.setArgs(1);
zkAddressOption.setArgName("zkAddress");
Option clusterNameOption =
OptionBuilder.isRequired().withLongOpt(CLUSTER_NAME_OPTION).withDescription("Cluster name")
.create();
clusterNameOption.setArgs(1);
clusterNameOption.setArgName("clusterName");
Option taskResourceOption =
OptionBuilder.isRequired().withLongOpt(RESOURCE_OPTION)
.withDescription("Workflow or job name").create();
taskResourceOption.setArgs(1);
taskResourceOption.setArgName("resourceName");
OptionGroup group = new OptionGroup();
group.addOption(zkAddressOption);
group.addOption(clusterNameOption);
group.addOption(taskResourceOption);
return group;
}
/** Constructs options set for all basic control messages */
private static Options constructOptions() {
Options options = new Options();
options.addOptionGroup(contructGenericRequiredOptionGroup());
options.addOptionGroup(constructStartOptionGroup());
return options;
}
/** Constructs option group containing options required by all drivable jobs */
private static OptionGroup constructStartOptionGroup() {
@SuppressWarnings("static-access")
Option workflowFileOption =
OptionBuilder.withLongOpt(WORKFLOW_FILE_OPTION)
.withDescription("Local file describing workflow").create();
workflowFileOption.setArgs(1);
workflowFileOption.setArgName("workflowFile");
OptionGroup group = new OptionGroup();
group.addOption(workflowFileOption);
return group;
}
/** Attempts to parse options for given command, printing usage under failure */
private static CommandLine parseOptions(String[] args, Options options, String cmdStr) {
CommandLineParser cliParser = new GnuParser();
CommandLine cmd = null;
try {
cmd = cliParser.parse(options, args);
} catch (ParseException pe) {
LOG.error("CommandLineClient: failed to parse command-line options: " + pe.toString());
printUsage(options, cmdStr);
System.exit(1);
}
boolean ret = checkOptionArgsNumber(cmd.getOptions());
if (!ret) {
printUsage(options, cmdStr);
System.exit(1);
}
return cmd;
}
/** Ensures options argument counts are correct */
private static boolean checkOptionArgsNumber(Option[] options) {
for (Option option : options) {
int argNb = option.getArgs();
String[] args = option.getValues();
if (argNb == 0) {
if (args != null && args.length > 0) {
System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ Arrays.toString(args) + ")");
return false;
}
} else {
if (args == null || args.length != argNb) {
System.err.println(option.getArgName() + " shall have " + argNb + " arguments (was "
+ Arrays.toString(args) + ")");
return false;
}
}
}
return true;
}
/** Displays CLI usage for given option set and command name */
private static void printUsage(Options cliOptions, String cmd) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setWidth(1000);
helpFormatter.printHelp("java " + TaskAdmin.class.getName() + " " + cmd, cliOptions);
}
}