| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.uima.ducc.cli; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.lang.management.ManagementFactory; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.uima.ducc.common.IDucc; |
| import org.apache.uima.ducc.common.crypto.Crypto; |
| import org.apache.uima.ducc.common.utils.DuccProperties; |
| import org.apache.uima.ducc.common.utils.DuccPropertiesResolver; |
| import org.apache.uima.ducc.common.utils.Utils; |
| import org.apache.uima.ducc.transport.dispatcher.IDuccEventDispatcher; |
| import org.apache.uima.ducc.transport.event.AbstractDuccOrchestratorEvent; |
| import org.apache.uima.ducc.transport.event.IDuccContext.DuccContext; |
| |
| /** |
| * Define common methods and data used by all the DUCC API and CLI. |
| */ |
| public abstract class CliBase |
| implements IUiOptions |
| { |
| |
| private String myClassName = "N/A"; |
| private boolean init_done = false; |
| protected String ducc_home; |
| protected IDuccEventDispatcher dispatcher; |
| |
| protected CommandLine commandLine; |
| |
| protected long friendlyId = -1; |
| protected int returnCode = 0; |
| |
| protected DuccProperties cli_props; |
| protected ArrayList<String> errors = new ArrayList<String>(); |
| protected ArrayList<String> warnings = new ArrayList<String>(); |
| protected ArrayList<String> messages = new ArrayList<String>(); |
| |
| protected boolean debug; |
| private boolean load_defaults = true; |
| |
| protected ConsoleListener console_listener = null; |
| protected boolean suppress_console_log; |
| protected String host_address = "N/A"; |
| protected boolean console_attach = false; |
| protected IDuccCallback consoleCb = null; |
| |
| protected MonitorListener monitor_listener = null; |
| |
| CountDownLatch waiter = null; |
| |
| protected Properties userSpecifiedProperties; |
| |
| /** |
| * All extenders must implement execute - this method does whatever processing on the input |
| * is needed and passes the CLI request to the internal DUCC processes. |
| * |
| * @return Return true if execution works, and false otherwise. |
| * @throws java.lang.Exception The specific exception is a function of the implementor. |
| */ |
| public abstract boolean execute() throws Exception; |
| |
| protected void inhibitDefaults() |
| { |
| this.load_defaults = false; |
| } |
| |
| /* |
| * Get log directory or employ default log directory if not specified |
| * UIMA-4617 Make it relative to the run-time working directory, not HOME |
| */ |
| String getLogDirectory() |
| { |
| |
| String log_directory = cli_props.getProperty(UiOption.LogDirectory.pname()); |
| if(log_directory == null) { |
| // no log directory was specified - default to user's home + "/ducc/logs" |
| log_directory = System.getProperty("user.home") + IDucc.userLogsSubDirectory; |
| } |
| |
| File f; |
| if (log_directory.startsWith(File.separator)) { |
| f = new File(log_directory); |
| } else { |
| // Make the log directory relative to the run-time working directory |
| // NOTE: the working-directory is ALWAYS present when the logging-directory is specified |
| String working_directory = cli_props.getProperty(UiOption.WorkingDirectory.pname()); |
| f = new File(working_directory, log_directory); |
| try { |
| log_directory = f.getCanonicalPath(); |
| } catch (IOException e) { |
| throw new IllegalArgumentException("getLogDirectory: Cannot get full name of log directory " + log_directory); |
| } |
| } |
| |
| cli_props.setProperty(UiOption.LogDirectory.pname(), log_directory); |
| |
| /* |
| * make sure the logdir is actually legal. |
| * JD may also be creating it so to reduce any race or NFS delay blindly create and then test |
| */ |
| f.mkdirs(); |
| if ( ! f.exists() ) { |
| throw new IllegalArgumentException("getLogDirectory: Cannot create log directory " + log_directory); |
| } |
| |
| if ( ! f.isDirectory() ) { |
| throw new IllegalArgumentException("Specified log_directory is not a directory: " + log_directory); |
| } |
| |
| if ( ! f.canWrite() ) { |
| throw new IllegalArgumentException("Log directory exists but cannot be written: " + f); |
| } |
| |
| if ( ! f.canExecute() ) { |
| throw new IllegalArgumentException("Log directory exists but cannot be accessed (must be writable and executable): " + f); |
| } |
| |
| return log_directory; |
| } |
| |
| /* |
| * If the workong directory has been defined (or defaulted) make sure it is absolute |
| */ |
| void setWorkingDirectory() throws IOException |
| { |
| String working_directory = cli_props.getProperty(UiOption.WorkingDirectory.pname()); |
| if (working_directory == null) { |
| return; // Not valid for this request |
| } |
| File f = new File(working_directory); |
| if ( ! f.exists() ) { |
| throw new IllegalArgumentException("Working directory " + working_directory + " does not exist."); |
| } |
| if ( ! f.isAbsolute() ) { |
| cli_props.setProperty(UiOption.WorkingDirectory.pname(), f.getCanonicalPath()); |
| } |
| } |
| |
| /* |
| * Check the syntax & if a service refers to itself -- place-holders already resolved |
| * Strip any broker URL decorations |
| */ |
| boolean check_service_dependencies(String endpoint) |
| { |
| String deps = cli_props.getProperty(UiOption.ServiceDependency.pname()); |
| try { |
| String dependencies = DuccUiUtilities.check_service_dependencies(endpoint, deps); |
| if (dependencies != null) { |
| cli_props.setProperty(UiOption.ServiceDependency.pname(), dependencies); |
| } |
| return true; |
| } catch ( Throwable t ) { |
| message("ERROR:", t.toString()); |
| return false; |
| } |
| } |
| |
| /* |
| * Check if -Xmx value is >= memory size ... if both are specified |
| */ |
| void check_heap_size(String argsOption) { |
| String jvmArgs = cli_props.getProperty(argsOption); |
| String memSize = cli_props.getProperty(UiOption.ProcessMemorySize.pname()); |
| if (jvmArgs == null || memSize == null) { |
| return; |
| } |
| |
| // The numbers may be terminated by a units factor, white-space, or the end of the string |
| // The units factor may be any of kKmMgG ... if omitted is bytes |
| // Match -Xmx###[units-flag] and take the last one specified (IBM & Oracle JREs do this) |
| String xmxRegex = "-Xmx([0-9]+)($|[\\skKmMgG])"; |
| Pattern patn = Pattern.compile(xmxRegex); |
| Matcher matcher = patn.matcher(jvmArgs); |
| Long size = null; |
| String unit = null; |
| while (matcher.find()) { |
| size = Long.valueOf(matcher.group(1)); |
| unit = matcher.group(2); |
| } |
| if (size == null) { |
| return; |
| } |
| if (unit.isEmpty()) { // Was last option in list |
| unit = " "; |
| } |
| char factor = unit.toLowerCase().charAt(0); |
| int shift = "gmk".indexOf(factor); // Number of 1024's to divide size by to get GB |
| if (shift < 0) { // No explicit unit factor ... white-space => bytes |
| shift = 3; |
| } |
| long sizeGB = size >> (10 * shift); // Shift 10 bits per K |
| int memGB = Integer.valueOf(memSize); |
| if (sizeGB >= memGB) { |
| String text = "WARNING - process_memory_size is " + memSize + "G but the max heap is " + size + unit + " --- swapping may occur"; |
| message(text); |
| } |
| } |
| |
| void setUser() |
| throws Exception |
| { |
| /* |
| * marshal user |
| */ |
| String user = DuccUiUtilities.getUser(); |
| cli_props.setProperty(UiOption.User.pname(), user); |
| String property = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_signature_required); |
| if(property != null) { |
| String signatureRequiredProperty = property.trim().toLowerCase(); |
| if(signatureRequiredProperty.equals("on")) { |
| Crypto crypto = new Crypto(user, true); |
| byte[] cypheredMessage = crypto.getSignature(); |
| cli_props.put(UiOption.Signature.pname(), cypheredMessage); |
| } |
| } |
| } |
| |
| /** |
| * Standard init for all except the Service calls that are sent to the SM |
| */ |
| |
| protected synchronized void init(String myClassName, UiOption[] opts, String[] args, DuccProperties cli_props, |
| IDuccCallback consoleCb) throws Exception { |
| this.init (myClassName, opts, args, null, cli_props, consoleCb, "orchestrator"); |
| } |
| |
| protected synchronized void init(String myClassName, UiOption[] opts, Properties props, DuccProperties cli_props, |
| IDuccCallback consoleCb) throws Exception { |
| this.init (myClassName, opts, null, props, cli_props, consoleCb, "orchestrator"); |
| } |
| |
| /** |
| * |
| * @param myClassName Name of the class invoking me, for help string |
| * @param uiOpts Array of IUioptions permitted for this command |
| * @param args Arguments from the command line (or null) |
| * @param props Properties passed in from the API (or null) |
| * @param cli_props (Initially) empty properties file to be filled in |
| * @param consoleCb Console callback object (optional) |
| * @param servlet The name of the http servlet that will serve this request |
| * @throws Exception |
| */ |
| protected synchronized void init(String myClassName, IUiOption[] uiOpts, String[] args, Properties props, |
| DuccProperties cli_props, IDuccCallback consoleCb, String servlet) |
| throws Exception |
| { |
| |
| // Either args or props passed in, not both |
| if (args != null) { |
| CliFixups.cleanupArgs(args, myClassName); |
| } else { |
| CliFixups.cleanupProps(props, myClassName); |
| } |
| |
| if ( init_done ) return; |
| |
| if ( consoleCb == null ) { |
| this.consoleCb = new DefaultCallback(); |
| } else { |
| this.consoleCb = consoleCb; |
| } |
| |
| this.myClassName = myClassName; |
| ducc_home = Utils.findDuccHome(); |
| |
| this.cli_props = cli_props; |
| commandLine = new CommandLine(args, uiOpts, props); |
| try { |
| commandLine.parse(); |
| } catch (Exception e) { |
| usage(e.getMessage()); |
| } |
| |
| if ( commandLine.contains(UiOption.Help)) { |
| usage(null); |
| } |
| |
| debug = commandLine.contains(UiOption.Debug); |
| |
| // Load the specification file, if given on the command line. Note that registration |
| // bypasses the somewhat redundant --specification option so we check two options. |
| // Cannot have both as --specification && --register are never both valid. |
| String fname = null; |
| for (IUiOption spec : new IUiOption[]{ UiOption.Specification, UiOption.Register }) { |
| if ( commandLine.isOption(spec) && commandLine.contains(spec)) { // legal for this command, and also specified? |
| fname = commandLine.get(spec); |
| if (fname.length() == 0) { // Check if --register has no value |
| fname = null; |
| } |
| break; |
| } |
| } |
| // If have a specification file re-parse using it for default values |
| if ( fname != null ) { |
| FileInputStream fis = new FileInputStream(new File(fname)); |
| Properties defaults = new Properties(); |
| defaults.load(fis); |
| fis.close(); |
| CliFixups.cleanupProps(defaults, myClassName); // May correct or drop deprecated options |
| |
| // If invoked with overriding properties add to or replace the defaults |
| if (props != null) { |
| defaults.putAll(props); |
| } |
| commandLine = new CommandLine(args, uiOpts, defaults); |
| commandLine.parse(); |
| } |
| commandLine.verify(); // Insure all the rules specified by the IUiOpts are enforced |
| |
| // Copy options into cli_props |
| setOptions(uiOpts); |
| |
| // Save a copy of the user-specified ones by cloning the underlying properties |
| userSpecifiedProperties = (Properties)((Properties)cli_props).clone(); |
| |
| // May need to suppress logging in console listener, or in the DUCC process. |
| suppress_console_log = cli_props.containsKey(UiOption.SuppressConsoleLog.pname()); |
| |
| // Apply defaults for and fixup the environment if needed |
| // -- unless default loading is inhibited, as it must be for modify operations |
| // What this routine does is fill in all the options that weren't specified |
| // on the command line with their defaults. For 'modify' we want to bypass |
| // this because ONLY the options from the command line should be set. |
| if ( load_defaults ) { |
| setDefaults(uiOpts, suppress_console_log); |
| } |
| |
| // This is not used by DUCC ... allows ducc-mon to display the origin of a job |
| cli_props.setProperty(UiOption.SubmitPid.pname(), ManagementFactory.getRuntimeMXBean().getName()); |
| |
| // First set working-directory as the log-directory may be relative to it |
| setWorkingDirectory(); |
| if ( load_defaults && (getLogDirectory() == null) ) { |
| throw new IllegalArgumentException("Cannot access log directory."); |
| } |
| setUser(); |
| |
| //NodeIdentity ni = new NodeIdentity(); UIMA-3899, use getHostAddress() directly. jrc |
| host_address = InetAddress.getLocalHost().getHostAddress(); |
| |
| initConsoleListener(); |
| |
| // AllInOne doesn't dispatch requests (and local doesn't need a running DUCC!) |
| if (!cli_props.containsKey(UiOption.AllInOne.pname())) { |
| dispatcher = DispatcherFactory.create(cli_props, servlet); |
| } |
| |
| init_done = true; |
| } |
| |
| /* |
| * Save options as properties after resolving any ${..} placeholders |
| */ |
| void setOptions(IUiOption[] uiOpts) |
| throws Exception |
| { |
| // Find the environment variables that are always propagated |
| List<String> envNameList; |
| String envNames = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_environment_propagated); |
| if (envNames != null) { |
| envNameList = Arrays.asList(envNames.split("\\s+")); |
| } else { |
| envNameList = new ArrayList<String>(0); |
| } |
| |
| Map<IUiOption, String> parsed = commandLine.allOptions(); |
| for (IUiOption opt : parsed.keySet() ) { |
| // If a "flexible" boolean that accepts various true/false values, add it only if true |
| String val; |
| if (opt.optargs() && "true".equals(opt.deflt())) { |
| boolean bval = commandLine.getBoolean(opt); |
| if (bval) { |
| val = ""; |
| } else { |
| if (debug) System.out.println("CLI omitted boolean " + opt.pname() + " = '" + parsed.get(opt) + "'"); |
| continue; |
| } |
| } else { |
| val = parsed.get(opt); |
| if (val == null) { // Should only happen for no-arg options |
| val = ""; |
| } else { |
| if (val.contains("${")) { |
| val = resolvePlaceholders(val, envNameList); |
| } |
| } |
| } |
| val = val.trim(); |
| cli_props.put(opt.pname(), val); |
| if (debug) System.out.println("CLI set " + opt.pname() + " = '" + val + "'"); |
| } |
| } |
| |
| /* |
| * Check for missing required options, set defaults, and validate where possible |
| * Also fixup the environment for all that use it. |
| */ |
| void setDefaults(IUiOption[] uiOpts, boolean suppress_console) throws Exception { |
| ArrayList<String> envNameList = new ArrayList<String>(0); // Why this when are resolving against use caller's environment? |
| for (IUiOption uiopt : uiOpts) { |
| if (!cli_props.containsKey(uiopt.pname())) { |
| // |
| // here deal with stuff that wasn't given explicitly in the command |
| // |
| // our convention - optargs() implies boolean, but it does't have to. |
| // If the arg is not expllicitly specified, we assume |
| // it is (boolean,false) for the sake of dealing with defaults. |
| // -- and then just leave it out -- |
| // similarly - noargs() is definitely boolean, same treatement |
| // |
| if ( (! uiopt.optargs()) && (! uiopt.noargs() ) && uiopt.deflt() != null) { |
| String deflt = uiopt.deflt(); |
| if (deflt.startsWith("$$")) { // Lookup default in ducc.properties |
| deflt = DuccPropertiesResolver.get(deflt.substring(2)); |
| if (deflt == null) { |
| throw new IllegalArgumentException("Invalid default (undefined property) for " + uiopt.pname()); |
| } |
| } else if (deflt.contains("${")) { |
| deflt = resolvePlaceholders(deflt, envNameList); |
| } |
| if (debug) System.out.println("CLI set default: " + uiopt.pname() + " = " + deflt); |
| cli_props.put(uiopt.pname(), deflt); |
| } |
| } else { |
| // |
| // here clean up stuff that was specified but we want to validate it |
| // |
| if (uiopt == UiOption.ProcessMemorySize || uiopt == UiOption.ReservationMemorySize) { |
| String val = cli_props.getStringProperty(uiopt.pname()); |
| if (!val.matches("^\\d+$")) { |
| throw new IllegalArgumentException("Invalid non-numeric value for " + uiopt.pname() + ": " + val); |
| } |
| } |
| } |
| // If this request accepts the --environment option may need to augment it by |
| // renaming LD_LIBRARY_PATH & propagating some user values |
| if (uiopt == UiOption.Environment) { |
| String environment = cli_props.getProperty(uiopt.pname()); |
| String allInOne = cli_props.getProperty(UiOption.AllInOne.pname()); |
| environment = DuccUiUtilities.fixupEnvironment(environment, allInOne); |
| cli_props.setProperty(uiopt.pname(), environment); |
| } |
| } |
| } |
| |
| /* |
| * Resolve any ${..} placeholders against user's system properties and environment |
| * NOTE - this resolves against the caller's sys-props & environment ... the one in DuccUiUtilities |
| * resolves against the process JVM args to match what is done by Spring in UIMA-AS. |
| * 2.0: Leave unresolved entries as is & warn if not one of the always-propagated ones |
| */ |
| private String resolvePlaceholders(String contents, List<String> envNameList) { |
| // Placeholders syntax ${<placeholder>} |
| Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}"); // Stops on first '}' |
| Matcher matcher = pattern.matcher(contents); |
| |
| StringBuffer sb = new StringBuffer(); |
| while (matcher.find()) { |
| final String key = matcher.group(1); |
| String value = System.getProperty(key); |
| if (value == null) { |
| value = System.getenv(key); |
| } |
| if (value != null) { |
| matcher.appendReplacement(sb, value); |
| } else { |
| matcher.appendReplacement(sb, ""); // Can't include the value as it looks like a group specification |
| value = "${" + key + "}"; |
| sb.append(value); |
| if (!envNameList.contains(key)) { |
| message("WARN: undefined placeholder", value, "not replaced"); |
| } |
| } |
| } |
| matcher.appendTail(sb); |
| return sb.toString(); |
| } |
| |
| void saveSpec(String name, DuccProperties props) |
| throws Exception |
| { |
| String directory = props.getProperty("log_directory") + File.separator + friendlyId; |
| String fileName = directory + File.separator + name; |
| File f = new File(directory); |
| |
| f.mkdirs(); |
| if ( ! f.exists() ) { |
| throw new IllegalStateException("saveSpec: Cannot create log directory: " + f.toString()); |
| } |
| |
| // Save the specification (but exclude the 'signature' entry) |
| String comments = null; |
| OutputStreamWriter out = new OutputStreamWriter(new FileOutputStream(fileName)); |
| String key = UiOption.Signature.pname(); |
| if ( props.containsKey(key) ) { |
| Object value = props.remove(key); |
| props.store(out, comments); |
| props.put(key, value); |
| } else { |
| props.store(out, comments); |
| } |
| out.close(); |
| |
| // Also save just the values the user provided |
| fileName = directory + File.separator + DuccUiConstants.user_specified_properties; |
| out = new OutputStreamWriter(new FileOutputStream(fileName)); |
| userSpecifiedProperties.store(out, comments); |
| out.close(); |
| } |
| |
| /** |
| * Extract messages and job pid from reply. This sets messages and errors into the appropriate |
| * structures for the API, and extracts the numeric id of the [job, ducclet, reservation, service] |
| * returned by the Orchestrator. |
| * |
| * @return true if the action succeeded and false otherwise. The action in this case, is whatever |
| * the Orchestrator was asked to do: submit something, cancel something, etc. |
| */ |
| boolean extractReply(AbstractDuccOrchestratorEvent reply) |
| { |
| /* |
| * process reply |
| */ |
| boolean rc = true; |
| Properties properties = reply.getProperties(); |
| @SuppressWarnings("unchecked") |
| ArrayList<String> value_submit_warnings = (ArrayList<String>) properties.get(UiOption.SubmitWarnings.pname()); |
| if(value_submit_warnings != null) { |
| message("Job warnings:"); |
| Iterator<String> reasons = value_submit_warnings.iterator(); |
| while(reasons.hasNext()) { |
| message("WARN:", reasons.next()); |
| } |
| } |
| @SuppressWarnings("unchecked") |
| ArrayList<String> value_submit_errors = (ArrayList<String>) properties.get(UiOption.SubmitErrors.pname()); |
| if(value_submit_errors != null) { |
| message("Job errors:"); |
| Iterator<String> reasons = value_submit_errors.iterator(); |
| while(reasons.hasNext()) { |
| message("ERROR:", reasons.next()); |
| } |
| rc = false; |
| } |
| |
| String pid = reply.getProperties().getProperty(UiOption.JobId.pname()); |
| if (pid == null ) { |
| message("ERROR: Request ID not found in reply"); |
| rc = false; |
| } else { |
| friendlyId = Long.parseLong(pid); |
| if ( friendlyId < 0 ) { |
| message("ERROR: Invalid Request ID", pid); |
| rc = false; |
| } |
| } |
| |
| return rc; |
| } |
| |
| void usage(String message) |
| { |
| if ( message != null ) { |
| System.out.println(message); |
| } |
| System.out.println(commandLine.formatHelp(myClassName)); |
| System.exit(1); |
| } |
| |
| /** |
| * Set a property via the API. This method allows the API user to |
| * build up or override properties after the initial API object is constructed. |
| * |
| * @param key This is the property name. |
| * @param value This is the value of the property. |
| * |
| * @return true if the property is set. Returns false if the property is not legal for this API. |
| */ |
| public boolean setProperty(String key, String value) |
| { |
| |
| if ( ! commandLine.isOptionName(key)) { |
| return false; |
| } |
| cli_props.setProperty(key, value); |
| return true; |
| } |
| |
| protected IDuccCallback getCallback() |
| { |
| return consoleCb; |
| } |
| |
| /** |
| * NOTE: We do NOT want to be intentionally throwing from the CLI. Pls pass e.getMessage() or |
| * e.toString() to this instead of throwing. |
| */ |
| synchronized void message(String ... e ) |
| { |
| if ( e.length > 1 ) { |
| StringBuffer sb = new StringBuffer(); |
| int i = 0; |
| for (i = 0; i < e.length - 1; i++) { |
| sb.append(e[i]); |
| sb.append(' '); |
| } |
| sb.append(e[i]); |
| consoleCb.status(sb.toString()); |
| } else { |
| consoleCb.status(e[0]); |
| } |
| } |
| |
| /** |
| * This returns the return code from the execution of the requested work. Return code is only |
| * available when the monitor wait completes ... if not waiting then assume success. |
| * |
| * @return The exit code of the job, process, etc. |
| */ |
| public int getReturnCode() |
| { |
| waitForCompletion(); |
| return returnCode; |
| } |
| |
| /** |
| * This returns the unique numeric id for the requested work. For submissions (job, reservation, etc) |
| * this is the newly assigned id. |
| * @return The unique numeric id of the job, reservation, etc. |
| */ |
| synchronized public long getDuccId() |
| { |
| return friendlyId; |
| } |
| |
| synchronized void consoleExits() |
| { |
| if ( waiter != null ) waiter.countDown(); |
| } |
| |
| synchronized void monitorExits(int rc) |
| { |
| this.returnCode = rc; |
| if ( waiter != null ) waiter.countDown(); |
| if ( console_listener != null ) { |
| console_listener.shutdown(); |
| } |
| } |
| |
| // TODO TODO TODO - do we have to support lots of these for multi-threaded stuff? Hope not ... |
| protected synchronized void startMonitors(boolean start_stdin, DuccContext context) |
| throws Exception |
| { |
| int wait_count = 0; |
| |
| if ( console_listener != null ) { |
| wait_count++; |
| } |
| |
| boolean monitor_attach = |
| ( |
| cli_props.containsKey(UiOption.WaitForCompletion.pname()) || |
| cli_props.containsKey(UiOption.CancelOnInterrupt.pname()) |
| ); |
| |
| if ( monitor_attach ) { |
| wait_count++; |
| } |
| |
| // Probably over-cautious but create the waiter before starting the threads that use it |
| if ( wait_count > 0 ) { |
| waiter = new CountDownLatch(wait_count); |
| if ( console_listener != null ) { |
| startConsoleListener(start_stdin); |
| } |
| if ( monitor_attach ) { |
| startMonitor(context); |
| } |
| } |
| } |
| |
| protected synchronized void startMonitor(DuccContext context) |
| { |
| monitor_listener = new MonitorListener(this, friendlyId, cli_props, context); |
| Thread mlt = new Thread(monitor_listener); //MonitorListenerThread |
| mlt.start(); |
| } |
| |
| /** |
| * Needs to be done before submitting the job because the job needs the ports. We'll |
| * just define the listener, but not start it until the job monitor starts, in case the |
| * submission fails. |
| */ |
| protected void initConsoleListener() throws Exception { |
| String value; |
| |
| console_attach = cli_props.containsKey(UiOption.AttachConsole.pname()); |
| if (console_attach) { |
| console_listener = new ConsoleListener(this, consoleCb); |
| value = console_listener.getConsoleHostAddress(); |
| if (myClassName.equals(DuccManagedReservationSubmit.class.getName())) { |
| value += "?splitstreams"; // Add a query string so APs have separate streams |
| } |
| } else if (suppress_console_log) { |
| value = "suppress"; |
| } else { |
| return; |
| } |
| // Set the console "suppress" flag or the host:port for the console listener into the env |
| String key = UiOption.Environment.pname(); |
| String env = cli_props.getProperty(key); |
| if (env == null) { |
| env = "DUCC_CONSOLE_LISTENER=" + value; |
| } else { |
| env += " DUCC_CONSOLE_LISTENER=" + value; |
| } |
| cli_props.setProperty(key, env); |
| } |
| |
| /** |
| * Be sure to call this BEFORE submission, to insure the callback address is set in properties. |
| */ |
| protected synchronized void startConsoleListener(boolean start_stdin) |
| throws Exception |
| { |
| if ( console_attach ) { |
| console_listener.startStdin(start_stdin); |
| Thread t = new Thread(console_listener); |
| t.start(); |
| } else { |
| message("WARN: Attermpt to start console but no console listener is defined."); |
| } |
| } |
| |
| protected synchronized void stopListeners() |
| { |
| if ( console_listener != null ) { |
| console_listener.shutdown(); |
| console_listener = null; |
| } |
| |
| if ( monitor_listener != null ) { |
| monitor_listener.shutdown(); |
| monitor_listener = null; |
| } |
| } |
| |
| /** |
| * This is used to find if the remote console is redirected to the local process, and if so, is it still |
| * active. |
| * @return True if the console is still attached to the remote process, false otherwise. |
| */ |
| public boolean isConsoleAttached() |
| { |
| return ( (console_listener != null ) && ( !console_listener.isShutdown())); |
| } |
| |
| /** |
| * Wait for the listeners - maybe a console listener, maybe a job listener, maybe both. |
| * |
| * @return true if a monitor wait was done, false otherwise. A monitor wait |
| * results in a return code from the process. In all other cases |
| * the return code is spurious. |
| */ |
| public boolean waitForCompletion() |
| { |
| try { |
| if ( waiter != null ) { |
| waiter.await(); |
| return true; |
| } |
| } catch (InterruptedException e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| return false; |
| } |
| |
| } |