/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
*/
package org.apache.uima.ducc.cli;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.uima.ducc.common.IDucc;
import org.apache.uima.ducc.common.crypto.Crypto;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.dispatcher.IDuccEventDispatcher;
import org.apache.uima.ducc.transport.event.AbstractDuccOrchestratorEvent;
import org.apache.uima.ducc.transport.event.IDuccContext.DuccContext;

/**
 * Define common methods and data used by all the DUCC API and CLI.
 */
public abstract class CliBase
    implements IUiOptions
{

    private String myClassName = "N/A";
    private boolean init_done = false;
    protected String ducc_home;
    protected IDuccEventDispatcher dispatcher;

    protected CommandLine commandLine;

    protected long friendlyId = -1;
    protected int  returnCode = 0;

    protected DuccProperties cli_props;
    protected ArrayList<String> errors   = new ArrayList<String>();
    protected ArrayList<String> warnings = new ArrayList<String>();
    protected ArrayList<String> messages = new ArrayList<String>();

    protected boolean debug;
    private   boolean load_defaults = true;

    protected ConsoleListener  console_listener = null;
    protected boolean suppress_console_log;
    protected String host_address = "N/A";
    protected boolean console_attach = false;
    protected IDuccCallback consoleCb = null;

    protected MonitorListener monitor_listener = null;
    
    CountDownLatch waiter = null;

    protected Properties userSpecifiedProperties;
    
    /**
     * All extenders must implement execute - this method does whatever processing on the input
     * is needed and passes the CLI request to the internal DUCC processes.
     *
     * @return Return true if execution works, and false otherwise.
     * @throws java.lang.Exception The specific exception is a function of the implementor.
     */
    public abstract boolean execute() throws Exception;

    protected void inhibitDefaults()
    {
        this.load_defaults = false;
    }

    /*
     * Get log directory or employ default log directory if not specified
     * UIMA-4617 Make it relative to the run-time working directory, not HOME
     */
    String getLogDirectory()
    {

        String log_directory = cli_props.getProperty(UiOption.LogDirectory.pname());
        if(log_directory == null) {
            // no log directory was specified - default to user's home + "/ducc/logs"
            log_directory = System.getProperty("user.home") + IDucc.userLogsSubDirectory;
        } 
        
        File f;
        if (log_directory.startsWith(File.separator)) {
            f = new File(log_directory);
        } else {
            // Make the log directory relative to the run-time working directory
            // NOTE: the working-directory is ALWAYS present when the logging-directory is specified
            String working_directory = cli_props.getProperty(UiOption.WorkingDirectory.pname());
            f = new File(working_directory, log_directory);
            try {
                log_directory = f.getCanonicalPath();
            } catch (IOException e) {
                throw new IllegalArgumentException("getLogDirectory: Cannot get full name of log directory " + log_directory);
            }
        }

        cli_props.setProperty(UiOption.LogDirectory.pname(), log_directory);

        /*
         * make sure the logdir is actually legal.
         * JD may also be creating it so to reduce any race or NFS delay blindly create and then test
         */
        f.mkdirs();
        if ( ! f.exists() ) {
            throw new IllegalArgumentException("getLogDirectory: Cannot create log directory " + log_directory);
        }

        if ( ! f.isDirectory() ) {
            throw new IllegalArgumentException("Specified log_directory is not a directory: " + log_directory);
        }

        if ( ! f.canWrite() ) {
            throw new IllegalArgumentException("Log directory exists but cannot be written: " + f);
        }

        if ( ! f.canExecute() ) {
            throw new IllegalArgumentException("Log directory exists but cannot be accessed (must be writable and executable): " + f);
        }

        return log_directory;
    }

    /*
     * If the workong directory has been defined (or defaulted) make sure it is absolute
     */
    void setWorkingDirectory() throws IOException
    {
        String working_directory = cli_props.getProperty(UiOption.WorkingDirectory.pname());
        if (working_directory == null) {
            return;     // Not valid for this request
        }
        File f = new File(working_directory);
        if ( ! f.exists() ) {
            throw new IllegalArgumentException("Working directory " + working_directory + " does not exist.");
        }
        if ( ! f.isAbsolute() ) {
            cli_props.setProperty(UiOption.WorkingDirectory.pname(), f.getCanonicalPath());
        }
    }

    /*
     * Check the syntax & if a service refers to itself -- place-holders already resolved
     * Strip any broker URL decorations
     */
    boolean check_service_dependencies(String endpoint)
    {
        String deps = cli_props.getProperty(UiOption.ServiceDependency.pname());
        try {
            String dependencies = DuccUiUtilities.check_service_dependencies(endpoint, deps);
            if (dependencies != null) {
                cli_props.setProperty(UiOption.ServiceDependency.pname(), dependencies);
            }
            return true;
        } catch ( Throwable t ) {
            message("ERROR:", t.toString());
            return false;
        }
    }
    
    /*
     * Check if -Xmx value is >= memory size ... if both are specified
     */
    void check_heap_size(String argsOption) {
        String jvmArgs = cli_props.getProperty(argsOption);
        String memSize = cli_props.getProperty(UiOption.ProcessMemorySize.pname());
        if (jvmArgs == null || memSize == null) {
            return;
        }

        // The numbers may be terminated by a units factor, white-space, or the end of the string
        // The units factor may be any of kKmMgG ... if omitted is bytes
        // Match -Xmx###[units-flag] and take the last one specified (IBM & Oracle JREs do this)
        String xmxRegex = "-Xmx([0-9]+)($|[\\skKmMgG])";
        Pattern patn = Pattern.compile(xmxRegex);
        Matcher matcher = patn.matcher(jvmArgs);
        Long size = null;
        String unit = null;
        while (matcher.find()) {
            size = Long.valueOf(matcher.group(1));
            unit = matcher.group(2);
        }
        if (size == null) {
            return;
        }
        if (unit.isEmpty()) { // Was last option in list
            unit = " ";
        }
        char factor = unit.toLowerCase().charAt(0);
        int shift = "gmk".indexOf(factor); // Number of 1024's to divide size by to get GB
        if (shift < 0) {    // No explicit unit factor ... white-space => bytes
            shift = 3;
        }
        long sizeGB = size >> (10 * shift); // Shift 10 bits per K
        int memGB = Integer.valueOf(memSize);
        if (sizeGB >= memGB) {
            String text = "WARNING - process_memory_size is " + memSize + "G but the max heap is " + size + unit + " --- swapping may occur";
            message(text);
        }
    }
    
    void setUser()
        throws Exception
    {
        /*
         * marshal user
         */
        String user = DuccUiUtilities.getUser();
        cli_props.setProperty(UiOption.User.pname(), user);
        String property = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_signature_required);
        if(property != null) {
            String signatureRequiredProperty = property.trim().toLowerCase();
            if(signatureRequiredProperty.equals("on")) {
                Crypto crypto = new Crypto(user, true);
                byte[] cypheredMessage = crypto.getSignature();
                cli_props.put(UiOption.Signature.pname(), cypheredMessage);
            }
        }
    }
    
    /**
     * Standard init for all except the Service calls that are sent to the SM
     */

    protected synchronized void init(String myClassName, UiOption[] opts, String[] args, DuccProperties cli_props, 
                    IDuccCallback consoleCb) throws Exception {
        this.init (myClassName, opts, args, null, cli_props, consoleCb, "orchestrator");
    }

    protected synchronized void init(String myClassName, UiOption[] opts, Properties props, DuccProperties cli_props, 
                    IDuccCallback consoleCb) throws Exception {
        this.init (myClassName, opts, null, props, cli_props, consoleCb, "orchestrator");
    }
    
  /**
   * 
   * @param myClassName  Name of the class invoking me, for help string
   * @param uiOpts       Array of IUioptions permitted for this command
   * @param args         Arguments from the command line (or null)
   * @param props        Properties passed in from the API (or null)
   * @param cli_props    (Initially) empty properties file to be filled in 
   * @param consoleCb    Console callback object (optional)
   * @param servlet      The name of the http servlet that will serve this request
   * @throws Exception
   */
    protected synchronized void init(String myClassName, IUiOption[] uiOpts, String[] args, Properties props, 
                    DuccProperties cli_props, IDuccCallback consoleCb, String servlet)
        throws Exception
    {

    	// Either args or props passed in, not both
    	if (args != null) {
    		CliFixups.cleanupArgs(args, myClassName);
    	} else {
    		CliFixups.cleanupProps(props, myClassName);
    	}
        
        if ( init_done ) return;
        
        if ( consoleCb == null ) {
            this.consoleCb =  new DefaultCallback();
        } else {
            this.consoleCb = consoleCb;
        }

        this.myClassName = myClassName;
        ducc_home = Utils.findDuccHome();

        this.cli_props = cli_props;
        commandLine = new CommandLine(args, uiOpts, props);
        try {
			commandLine.parse();
		} catch (Exception e) {
			usage(e.getMessage());
		}

        if ( commandLine.contains(UiOption.Help)) {
        	usage(null);
        }
        
        debug = commandLine.contains(UiOption.Debug);

        // Load the specification file, if given on the command line.  Note that registration
        // bypasses the somewhat redundant --specification option so we check two options.
        // Cannot have both as --specification && --register are never both valid.
        String fname = null;
        for (IUiOption spec : new IUiOption[]{ UiOption.Specification, UiOption.Register }) {
            if ( commandLine.isOption(spec) && commandLine.contains(spec)) {     // legal for this command, and also specified?
            	fname = commandLine.get(spec);
            	if (fname.length() == 0) {		// Check if --register has no value
            		fname = null;
            	}
            	break;
             }
        }
        // If have a specification file re-parse using it for default values
        if ( fname != null ) {
            FileInputStream fis = new FileInputStream(new File(fname));
            Properties defaults = new Properties();
            defaults.load(fis);
            fis.close();
            CliFixups.cleanupProps(defaults, myClassName);     // May correct or drop deprecated options
            
            // If invoked with overriding properties add to or replace the defaults 
            if (props != null) {
                defaults.putAll(props);
            }
            commandLine = new CommandLine(args, uiOpts, defaults);
            commandLine.parse();
        }
        commandLine.verify();  // Insure all the rules specified by the IUiOpts are enforced        
        
        // Copy options into cli_props
        setOptions(uiOpts);
        
        // Save a copy of the user-specified ones by cloning the underlying properties
        userSpecifiedProperties = (Properties)((Properties)cli_props).clone();
        
        // May need to suppress logging in console listener, or in the DUCC process.
        suppress_console_log = cli_props.containsKey(UiOption.SuppressConsoleLog.pname());
        
        // Apply defaults for and fixup the environment if needed
        //   -- unless default loading is inhibited, as it must be for modify operations
        //      What this routine does is fill in all the options that weren't specified
        //      on the command line with their defaults.  For 'modify' we want to bypass
        //      this because ONLY the options from the command line should be set.
        if ( load_defaults ) {
            setDefaults(uiOpts, suppress_console_log);
        }
        
        // This is not used by DUCC ... allows ducc-mon to display the origin of a job
        cli_props.setProperty(UiOption.SubmitPid.pname(), ManagementFactory.getRuntimeMXBean().getName());

        // First set working-directory as the log-directory may be relative to it
        setWorkingDirectory();
        if ( load_defaults && (getLogDirectory() == null) ) {
            throw new IllegalArgumentException("Cannot access log directory.");
        }
        setUser();

        //NodeIdentity ni = new NodeIdentity(); UIMA-3899, use getHostAddress() directly.  jrc
        host_address = InetAddress.getLocalHost().getHostAddress();
        
        initConsoleListener();

        // AllInOne doesn't dispatch requests (and local doesn't need a running DUCC!)
        if (!cli_props.containsKey(UiOption.AllInOne.pname())) {
            dispatcher = DispatcherFactory.create(cli_props, servlet);
        }
        
        init_done = true;
    }

    /*
     * Save options as properties after resolving any ${..} placeholders
     */
    void setOptions(IUiOption[] uiOpts) 
        throws Exception 
    {
        // Find the environment variables that are always propagated
        List<String> envNameList;
        String envNames = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_environment_propagated);
        if (envNames != null) {
        	envNameList = Arrays.asList(envNames.split("\\s+"));
        } else {
        	envNameList = new ArrayList<String>(0);
        }
    	
    	Map<IUiOption, String> parsed = commandLine.allOptions();
        for (IUiOption opt : parsed.keySet() ) {
        	// If a "flexible" boolean that accepts various true/false values, add it only if true
        	String val;
        	if (opt.optargs() && "true".equals(opt.deflt())) {
        		boolean bval = commandLine.getBoolean(opt);
        		if (bval) {
        			val = "";
        		} else {
        			if (debug) System.out.println("CLI omitted boolean " + opt.pname() + " = '" + parsed.get(opt) + "'");
        			continue;
        		}
			} else {
				val = parsed.get(opt);
				if (val == null) { // Should only happen for no-arg options
					val = "";
				} else {
					if (val.contains("${")) {
						val = resolvePlaceholders(val, envNameList);
					}
				}
			}
            val = val.trim();
            cli_props.put(opt.pname(), val);
            if (debug) System.out.println("CLI set " + opt.pname() + " = '" + val + "'");
        }
    }
    
    /*
     * Check for missing required options, set defaults, and validate where possible
     * Also fixup the environment for all that use it.
     */
    void setDefaults(IUiOption[] uiOpts, boolean suppress_console) throws Exception {
        ArrayList<String> envNameList = new ArrayList<String>(0);   // Why this when are resolving against use caller's environment?
        for (IUiOption uiopt : uiOpts) {
            if (!cli_props.containsKey(uiopt.pname())) {
                //
                // here deal with stuff that wasn't given explicitly in the command
                //
                // our convention - optargs() implies boolean, but it does't have to.
                //                  If the arg is not expllicitly specified, we assume
                //                  it is (boolean,false) for the sake of dealing with defaults.
                //                  -- and then just leave it out --
                // similarly      - noargs() is definitely boolean, same treatement
                //
                if ( (! uiopt.optargs()) && (! uiopt.noargs() ) && uiopt.deflt() != null) {
                    String deflt = uiopt.deflt();
                    if (deflt.startsWith("$$")) {     // Lookup default in ducc.properties
                        deflt = DuccPropertiesResolver.get(deflt.substring(2));
                        if (deflt == null) {
                            throw new IllegalArgumentException("Invalid default (undefined property) for " + uiopt.pname()); 
                        }
                    } else if (deflt.contains("${")) {
                        deflt = resolvePlaceholders(deflt, envNameList);
                    }
                    if (debug) System.out.println("CLI set default: " + uiopt.pname() + " = " + deflt);
                    cli_props.put(uiopt.pname(), deflt);
                }
            } else {
                //
                // here clean up stuff that was specified but we want to validate it
                //
                if (uiopt == UiOption.ProcessMemorySize || uiopt == UiOption.ReservationMemorySize)  {
                    String val = cli_props.getStringProperty(uiopt.pname());
                    if (!val.matches("^\\d+$")) {
                        throw new IllegalArgumentException("Invalid non-numeric value for " + uiopt.pname() + ": " + val);
                    }
                }
            }
            // If this request accepts the --environment option may need to augment it by
            // renaming LD_LIBRARY_PATH & propagating some user values
            if (uiopt == UiOption.Environment) {
              String environment = cli_props.getProperty(uiopt.pname());
              String allInOne = cli_props.getProperty(UiOption.AllInOne.pname());
              environment = DuccUiUtilities.fixupEnvironment(environment, allInOne);
              cli_props.setProperty(uiopt.pname(), environment);
            }
        }
    }
    
    /*
     * Resolve any ${..} placeholders against user's system properties and environment
     * NOTE - this resolves against the caller's sys-props & environment ... the one in DuccUiUtilities 
     *        resolves against the process JVM args to match what is done by Spring in UIMA-AS.
     * 2.0: Leave unresolved entries as is & warn if not one of the always-propagated ones 
     */
    private String resolvePlaceholders(String contents, List<String> envNameList) {
        //  Placeholders syntax ${<placeholder>} 
        Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}");  // Stops on first '}'
        Matcher matcher = pattern.matcher(contents); 

        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            final String key = matcher.group(1);
            String value = System.getProperty(key);
            if (value == null) {
                value = System.getenv(key);
            }
            if (value != null) {
            	matcher.appendReplacement(sb, value);
            } else {
            	matcher.appendReplacement(sb, "");   // Can't include the value as it looks like a group specification
            	value = "${" + key + "}";
            	sb.append(value);
            	if (!envNameList.contains(key)) {
            		message("WARN: undefined placeholder", value, "not replaced");
            	}
            }
        }
        matcher.appendTail(sb);
        return sb.toString();
    }
    
    void saveSpec(String name, DuccProperties props) 
        throws Exception
    {
        String directory = props.getProperty("log_directory") + File.separator + friendlyId;
        String fileName = directory + File.separator + name;
        File f = new File(directory);

        f.mkdirs();
        if ( ! f.exists() ) {
            throw new IllegalStateException("saveSpec: Cannot create log directory: " + f.toString());
        }

        // Save the specification (but exclude the 'signature' entry)
        String comments = null;
        OutputStreamWriter out = new OutputStreamWriter(new FileOutputStream(fileName));
        String key = UiOption.Signature.pname();
        if ( props.containsKey(key) ) {
            Object value = props.remove(key);
            props.store(out, comments);
            props.put(key, value);
        } else {
            props.store(out, comments);
        }
        out.close();
        
        // Also save just the values the user provided
        fileName = directory + File.separator + DuccUiConstants.user_specified_properties;
        out = new OutputStreamWriter(new FileOutputStream(fileName));
        userSpecifiedProperties.store(out, comments);
        out.close();
    }

    /**
     * Extract messages and job pid from reply.  This sets messages and errors into the appropriate
     * structures for the API, and extracts the numeric id of the [job, ducclet, reservation, service]
     * returned by the Orchestrator.
     *
     * @return true if the action succeeded and false otherwise.  The action in this case, is whatever
     *               the Orchestrator was asked to do: submit something, cancel something, etc.
     */
    boolean extractReply(AbstractDuccOrchestratorEvent reply)
    {
        /*
         * process reply
         */
        boolean rc = true;
        Properties properties = reply.getProperties();
        @SuppressWarnings("unchecked")
        ArrayList<String> value_submit_warnings = (ArrayList<String>) properties.get(UiOption.SubmitWarnings.pname());
        if(value_submit_warnings != null) {
            message("Job warnings:");
            Iterator<String> reasons = value_submit_warnings.iterator();
            while(reasons.hasNext()) {
                message("WARN:", reasons.next());
            }
        }
        @SuppressWarnings("unchecked")
        ArrayList<String> value_submit_errors = (ArrayList<String>) properties.get(UiOption.SubmitErrors.pname());
        if(value_submit_errors != null) {
            message("Job errors:");
            Iterator<String> reasons = value_submit_errors.iterator();
            while(reasons.hasNext()) {
                message("ERROR:", reasons.next());
            }
            rc = false;
        }

        String pid =  reply.getProperties().getProperty(UiOption.JobId.pname());
        if (pid == null ) {
            message("ERROR: Request ID not found in reply");
            rc = false;
        } else {
            friendlyId = Long.parseLong(pid);
            if ( friendlyId < 0 ) {
                message("ERROR: Invalid Request ID", pid);
                rc = false;
            }
        }

        return rc;
    }

    void usage(String message)
    {
        if ( message != null ) {
            System.out.println(message);
        }
        System.out.println(commandLine.formatHelp(myClassName));
        System.exit(1);
    }

    /**
     * Set a property via the API. This method allows the API user to
     * build up or override properties after the initial API object is constructed.
     *
     * @param key This is the property name.
     * @param value This is the value of the property.
     *
     * @return true if the property is set.  Returns false if the property is not legal for this API.
     */
    public boolean setProperty(String key, String value)
    {

        if ( ! commandLine.isOptionName(key)) {       
            return false;
        }
        cli_props.setProperty(key, value);
        return true;
    }

    protected IDuccCallback getCallback()
    {
        return consoleCb;
    }

    /**
     * NOTE: We do NOT want to be intentionally throwing from the CLI.  Pls pass e.getMessage() or
     *       e.toString() to this instead of throwing.
     */
    synchronized void message(String ... e )
    {
        if ( e.length > 1 ) {
            StringBuffer sb = new StringBuffer();
            int i = 0;
            for (i = 0; i < e.length - 1; i++) {
                sb.append(e[i]);
                sb.append(' ');
            }
            sb.append(e[i]);
            consoleCb.status(sb.toString());
        } else {
            consoleCb.status(e[0]);
        }
    }

    /**
     * This returns the return code from the execution of the requested work.  Return code is only
     * available when the monitor wait completes ... if not waiting then assume success.
     *
     * @return The exit code of the job, process, etc.
     */
    public int getReturnCode()
    {
        waitForCompletion();
        return returnCode;
    }

    /**
     * This returns the unique numeric id for the requested work.  For submissions (job, reservation, etc)
     * this is the newly assigned id.
     * @return The unique numeric id of the job, reservation, etc.
     */
    synchronized public long getDuccId()
    {
        return friendlyId;
    }

    synchronized void consoleExits()
    {
        if ( waiter != null ) waiter.countDown();
    }

    synchronized void monitorExits(int rc)
    {
        this.returnCode = rc;
        if ( waiter != null ) waiter.countDown();
        if ( console_listener != null ) {
            console_listener.shutdown();
        }
    }

    // TODO TODO TODO - do we have to support lots of these for multi-threaded stuff?  Hope not ...
    protected synchronized void startMonitors(boolean start_stdin, DuccContext context)
        throws Exception
    {
        int wait_count = 0;

        if ( console_listener != null ) {
            wait_count++;
        }
        
        boolean monitor_attach = 
                (
                cli_props.containsKey(UiOption.WaitForCompletion.pname()) || 
                cli_props.containsKey(UiOption.CancelOnInterrupt.pname())  
                );
            
        if ( monitor_attach ) {
            wait_count++;
        }

        // Probably over-cautious but create the waiter before starting the threads that use it
        if ( wait_count > 0 ) {
            waiter = new CountDownLatch(wait_count);
            if ( console_listener != null ) {
                startConsoleListener(start_stdin);
            }
            if ( monitor_attach ) {
                startMonitor(context);
            }
        }
    }

    protected synchronized void startMonitor(DuccContext context)
    {
        monitor_listener = new MonitorListener(this, friendlyId, cli_props, context);
        Thread mlt = new Thread(monitor_listener);  //MonitorListenerThread
        mlt.start();
    }

    /**
     * Needs to be done before submitting the job because the job needs the ports.  We'll
     * just define the listener, but not start it until the job monitor starts, in case the
     * submission fails.
     */
    protected void initConsoleListener() throws Exception {
        String value;

        console_attach = cli_props.containsKey(UiOption.AttachConsole.pname());
        if (console_attach) {
            console_listener = new ConsoleListener(this, consoleCb);
            value = console_listener.getConsoleHostAddress();
            if (myClassName.equals(DuccManagedReservationSubmit.class.getName())) {
              value += "?splitstreams";   // Add a query string so APs have separate streams
            }
        } else if (suppress_console_log) {
            value = "suppress";
        } else {
            return;
        }
        // Set the console "suppress" flag or the host:port for the console listener into the env
        String key = UiOption.Environment.pname();
        String env = cli_props.getProperty(key);
        if (env == null) {
            env = "DUCC_CONSOLE_LISTENER=" + value;
        } else {
            env += " DUCC_CONSOLE_LISTENER=" + value;
        }
        cli_props.setProperty(key, env);
    }

    /**
     * Be sure to call this BEFORE submission, to insure the callback address is set in properties.
     */
    protected synchronized void startConsoleListener(boolean start_stdin)
        throws Exception
    {        
        if ( console_attach ) {
            console_listener.startStdin(start_stdin);
            Thread t = new Thread(console_listener);
            t.start();
        } else {
            message("WARN: Attermpt to start console but no console listener is defined.");
        }
    }

    protected synchronized void stopListeners()
    {
        if ( console_listener != null ) {
            console_listener.shutdown();
            console_listener = null;
        }

        if ( monitor_listener != null ) {
            monitor_listener.shutdown();
            monitor_listener = null;
        }
    }

    /**
     * This is used to find if the remote console is redirected to the local process, and if so, is it still
     * active.
     * @return True if the console is still attached to the remote process, false otherwise.
     */
    public boolean isConsoleAttached()
    {
        return ( (console_listener != null ) && ( !console_listener.isShutdown()));
    }

    /**
     * Wait for the listeners - maybe a console listener, maybe a job listener, maybe both.
     *
     * @return true if a monitor wait was done, false otherwise.  A monitor wait
     *          results in a return code from the process.  In all other cases
     *          the return code is spurious.
     */
    public boolean waitForCompletion()
    {
        try {
            if ( waiter != null ) {
                waiter.await();
                return true;
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }

}
