blob: 3beed833ce42bca856a1b249002afb9d8b7d2adb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.cli;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.uima.ducc.cli.aio.AllInOneLauncher;
import org.apache.uima.ducc.common.utils.DuccSchedulerClasses;
import org.apache.uima.ducc.common.utils.IllegalConfigurationException;
import org.apache.uima.ducc.transport.event.IDuccContext.DuccContext;
import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
import org.apache.uima.ducc.transport.event.SubmitJobReplyDuccEvent;
import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
/**
* Submit a DUCC job
*/
public class DuccJobSubmit
extends CliBase
{
private JobRequestProperties jobRequestProperties = new JobRequestProperties();
public static UiOption[] opts = new UiOption[] {
UiOption.Help,
UiOption.Debug,
UiOption.Timestamp,
UiOption.AllInOne,
UiOption.AttachConsole,
UiOption.ProcessDebug,
UiOption.DriverDebug,
UiOption.Description,
UiOption.SchedulingClass,
UiOption.WorkingDirectory, // Must precede LogDirecory
UiOption.LogDirectory, // Must precede Environment
UiOption.Jvm,
UiOption.Classpath,
UiOption.Environment,
UiOption.DriverJvmArgs,
UiOption.DriverDescriptorCR,
UiOption.DriverDescriptorCROverrides,
UiOption.DriverExceptionHandler,
UiOption.DriverExceptionHandlerArguments,
UiOption.ProcessJvmArgs,
UiOption.ProcessMemorySize,
UiOption.ProcessDD,
UiOption.ProcessDescriptorCM,
UiOption.ProcessDescriptorCMOverrides,
UiOption.ProcessDescriptorAE,
UiOption.ProcessDescriptorAEOverrides,
UiOption.ProcessDescriptorCC,
UiOption.ProcessDescriptorCCOverrides,
UiOption.ProcessDeploymentsMax,
UiOption.ProcessInitializationFailuresCap,
UiOption.ProcessFailuresLimit,
UiOption.ProcessPipelineCount,
UiOption.ProcessPerItemTimeMax,
UiOption.ProcessInitializationTimeMax,
UiOption.Specification,
UiOption.SuppressConsoleLog,
UiOption.WaitForCompletion,
UiOption.CancelOnInterrupt,
UiOption.ServiceDependency,
};
private AllInOneLauncher allInOneLauncher = null;
/**
* @param args Array of string arguments as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
*/
public DuccJobSubmit(String[] args)
throws Exception
{
this(args, null);
}
/**
* @param args List of string arguments as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
*/
public DuccJobSubmit(ArrayList<String> args)
throws Exception
{
this(args, null);
}
/**
* @param props Properties file of arguments, as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
*/
public DuccJobSubmit(Properties props)
throws Exception
{
this(props, null);
}
/**
* This form of the constructor allows the API user to capture
* messages, rather than directing them to stdout.
*
* @param args List of string arguments as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
* @param consoleCb If provided, messages are directed to it instead of
* stdout.
*/
public DuccJobSubmit(ArrayList<String> args, IDuccCallback consoleCb)
throws Exception
{
this(args.toArray(new String[args.size()]), consoleCb);
}
/**
* This form of the constructor allows the API user to capture
* messages, rather than directing them to stdout.
*
* @param args Array of string arguments as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
* @param consoleCb If provided, messages are directed to it instead of
* stdout.
*/
public DuccJobSubmit(String[] args, IDuccCallback consoleCb)
throws Exception
{
init (this.getClass().getName(), opts, args, jobRequestProperties, consoleCb);
check_descriptor_options();
if (isAllInOne()) {
allInOneLauncher = new AllInOneLauncher(userSpecifiedProperties, consoleCb); // Pass the already fixed-up user properties
}
}
/**
* This form of the constructor allows the API user to capture
* messages, rather than directing them to stdout.
*
* @param props Properties file containing string arguments as described in the
* <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
* @param consoleCb If provided, messages are directed to it instead of
* stdout.
*/
public DuccJobSubmit(Properties props, IDuccCallback consoleCb)
throws Exception
{
init (this.getClass().getName(), opts, props, jobRequestProperties, consoleCb);
check_descriptor_options();
if (isAllInOne()) {
allInOneLauncher = new AllInOneLauncher(userSpecifiedProperties, consoleCb); // Pass the already fixed-up user properties
}
}
/*
* If preemptable change to a non-preemptable scheduling class.
* If none provided use the default fixed class
*/
protected void transform_scheduling_class(CliBase base, Properties props)
throws Exception
{
String scheduling_class = null;
String user_scheduling_class = null;
String pname = UiOption.SchedulingClass.pname();
try {
DuccSchedulerClasses duccSchedulerClasses = DuccSchedulerClasses.getInstance();
if (props.containsKey(pname)) {
user_scheduling_class = props.getProperty(pname);
if (duccSchedulerClasses.isPreemptable(user_scheduling_class)) {
scheduling_class = duccSchedulerClasses.getDebugClassSpecificName(user_scheduling_class);
}
} else {
scheduling_class = duccSchedulerClasses.getDebugClassDefaultName();
}
} catch (Exception e) {
throw new IllegalConfigurationException("Error in DUCC configuration files - see administrator", e);
}
if (scheduling_class != null) {
props.setProperty(pname, scheduling_class);
String text = pname+"="+scheduling_class+" -- was "+user_scheduling_class;
base.message(text);
}
}
private void check_descriptor_options() {
boolean isDDjob = jobRequestProperties.containsKey(UiOption.ProcessDD.pname());
boolean isPPjob = jobRequestProperties.containsKey(UiOption.ProcessDescriptorCM.pname())
|| jobRequestProperties.containsKey(UiOption.ProcessDescriptorAE.pname())
|| jobRequestProperties.containsKey(UiOption.ProcessDescriptorCC.pname());
if (isDDjob && isPPjob) {
throw new IllegalArgumentException("--process_descriptor_DD is mutually exclusive with the AE, CC, & CM descriptor options");
}
if (!isDDjob && !isPPjob) {
throw new IllegalArgumentException("Missing --process_descriptor_xx option .. DD or at least one of AE, CC, CM required");
}
}
private void set_debug_parms(Properties props, String key, int port)
{
String debug_jvmargs = "-Xdebug -Xrunjdwp:transport=dt_socket,address=" + host_address + ":" + port;
String jvmargs = props.getProperty(key);
if (jvmargs == null) {
jvmargs = debug_jvmargs;
} else {
jvmargs += " " + debug_jvmargs;
}
props.put(key, jvmargs);
}
protected void enrich_parameters_for_debug(Properties props)
throws Exception
{
try {
int jp_debug_port = -1;
int jd_debug_port = -2; // a trick, must be different from jp_debug_port; see below
// we allow both jd and jp to debug, but the ports have to differ
String do_debug = UiOption.ProcessDebug.pname();
if ( props.containsKey(do_debug) ) {
String jp_port_s = props.getProperty(do_debug);
if ( jp_port_s == null ) {
throw new IllegalArgumentException("Missing port for " + do_debug);
}
jp_debug_port = Integer.parseInt(jp_port_s);
set_debug_parms(props, UiOption.ProcessJvmArgs.pname(), jp_debug_port);
// For debugging, if the JP is being debugged, we have to force max processes to 1 & no restarts
props.setProperty(UiOption.ProcessDeploymentsMax.pname(), "1");
props.setProperty(UiOption.ProcessFailuresLimit.pname(), "1");
// Alter scheduling class?
transform_scheduling_class(this, props);
}
do_debug = UiOption.DriverDebug.pname();
if ( props.containsKey(do_debug) ) {
String jd_port_s = props.getProperty(do_debug);
if ( jd_port_s == null ) {
throw new IllegalArgumentException("Missing port for " + do_debug);
}
jd_debug_port = Integer.parseInt(jd_port_s);
set_debug_parms(props, UiOption.DriverJvmArgs.pname(), jd_debug_port);
}
if ( jp_debug_port == jd_debug_port ) {
throw new IllegalArgumentException("Process and Driver debug ports must differ.");
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid debug port (not numeric)");
}
}
//**********
/**
* Execute collects the job parameters, does basic error and correctness checking, and sends
* the job properties to the DUCC orchestrator for execution.
*
* @return True if the orchestrator accepts the job; false otherwise.
*/
public boolean execute() throws Exception {
if(allInOneLauncher != null) {
return allInOneLauncher.execute();
}
// Defaults now provided in init()
try {
enrich_parameters_for_debug(jobRequestProperties);
} catch (Exception e1) {
message(e1.toString());
Throwable t = e1;
while ((t = t.getCause()) != null) {
message(" ... " + t);
}
if (jobRequestProperties.containsKey(UiOption.Debug.pname())) {
e1.printStackTrace();
}
return false;
}
/*
* Set default classpath if not specified
*/
String key_cp = UiOption.Classpath.pname();
if (!jobRequestProperties.containsKey(key_cp)) {
jobRequestProperties.setProperty(key_cp, System.getProperty("java.class.path"));
}
if (jobRequestProperties.containsKey(UiOption.Debug.pname())) {
jobRequestProperties.dump();
}
/*
* resolve ${defaultBrokerURL} in service dependencies - must fail if resolution needed but can't resolve
*/
if ( ! check_service_dependencies(null) ) {
return false;
}
// Warn if Xmx value is too large and may cause swapping
check_heap_size(UiOption.ProcessJvmArgs.pname());
SubmitJobDuccEvent submitJobDuccEvent = new SubmitJobDuccEvent(jobRequestProperties, CliVersion.getVersion());
SubmitJobReplyDuccEvent submitJobReplyDuccEvent = null;
try {
submitJobReplyDuccEvent = (SubmitJobReplyDuccEvent) dispatcher.dispatchAndWaitForDuccReply(submitJobDuccEvent);
} catch (Exception e) {
message("Job not submitted: " + e.getMessage());
stopListeners();
return false;
} finally {
dispatcher.close();
}
/*
* process reply
*/
boolean rc = extractReply(submitJobReplyDuccEvent);
if ( rc ) {
saveSpec(DuccUiConstants.job_specification_properties, jobRequestProperties);
startMonitors(false, DuccContext.Job); // starts conditionally, based on job spec and console listener present
}
return rc;
}
/**
* Return appropriate rc when job has completed.
* @return The exit code from the job.
*/
public int getReturnCode() {
if (allInOneLauncher != null) {
return allInOneLauncher.getReturnCode();
}
return super.getReturnCode();
}
public long getDuccId() {
if (allInOneLauncher != null) {
return allInOneLauncher.getDuccId();
}
return super.getDuccId();
}
private boolean isAllInOne() {
return jobRequestProperties.containsKey(UiOption.AllInOne.pname());
}
/**
* Main method, as used by the executable jar or direct java invocation.
* @param args arguments as described in the <a href="/doc/duccbook.html#DUCC_CLI_SUBMIT">DUCC CLI reference.</a>
*/
public static void main(String[] args) {
try {
DuccJobSubmit ds = new DuccJobSubmit(args, null);
boolean rc = ds.execute();
// If the return is 'true' then as best the API can tell, the submit worked
if ( rc ) {
System.out.println("Job " + ds.getDuccId() + " submitted");
int exit_code = ds.getReturnCode(); // after waiting if requested
System.exit(exit_code);
} else {
System.out.println("Could not submit job");
System.exit(1);
}
}
catch(Throwable e) {
System.out.println("Cannot initialize: " + e);
Throwable t = e;
while ((t = t.getCause()) != null) {
System.out.println(" ... " + t);
}
for (String arg : args) {
if (arg.equals("--debug")) {
e.printStackTrace();
break;
}
}
System.exit(1);
}
}
}