/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.apache.ofbiz.service;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.transaction.Transaction;

import org.apache.ofbiz.base.config.GenericConfigException;
import org.apache.ofbiz.base.util.Debug;
import org.apache.ofbiz.base.util.GeneralRuntimeException;
import org.apache.ofbiz.base.util.UtilMisc;
import org.apache.ofbiz.base.util.UtilProperties;
import org.apache.ofbiz.base.util.UtilTimer;
import org.apache.ofbiz.base.util.UtilValidate;
import org.apache.ofbiz.entity.Delegator;
import org.apache.ofbiz.entity.DelegatorFactory;
import org.apache.ofbiz.entity.GenericDelegator;
import org.apache.ofbiz.entity.GenericEntityException;
import org.apache.ofbiz.entity.GenericValue;
import org.apache.ofbiz.entity.transaction.DebugXaResource;
import org.apache.ofbiz.entity.transaction.GenericTransactionException;
import org.apache.ofbiz.entity.transaction.TransactionUtil;
import org.apache.ofbiz.security.Security;
import org.apache.ofbiz.security.SecurityConfigurationException;
import org.apache.ofbiz.security.SecurityFactory;
import org.apache.ofbiz.service.config.ServiceConfigUtil;
import org.apache.ofbiz.service.config.model.StartupService;
import org.apache.ofbiz.service.eca.ServiceEcaRule;
import org.apache.ofbiz.service.eca.ServiceEcaUtil;
import org.apache.ofbiz.service.engine.GenericEngine;
import org.apache.ofbiz.service.engine.GenericEngineFactory;
import org.apache.ofbiz.service.group.ServiceGroupReader;
import org.apache.ofbiz.service.jms.JmsListenerFactory;
import org.apache.ofbiz.service.job.JobManager;
import org.apache.ofbiz.service.job.JobManagerException;
import org.apache.ofbiz.service.semaphore.ServiceSemaphore;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;

/**
 * The global service dispatcher. This is the "engine" part of the
 * Service Engine.
 */
public class ServiceDispatcher {

    public static final String module = ServiceDispatcher.class.getName();
    public static final int lruLogSize = 200;
    public static final int LOCK_RETRIES = 3;

    protected static final Map<RunningService, ServiceDispatcher> runLog = new ConcurrentLinkedHashMap.Builder<RunningService, ServiceDispatcher>().maximumWeightedCapacity(lruLogSize).build();
    protected static ConcurrentHashMap<String, ServiceDispatcher> dispatchers = new ConcurrentHashMap<String, ServiceDispatcher>();
    // FIXME: These fields are not thread-safe. They are modified by EntityDataLoadContainer.
    // We need a better design - like have this class query EntityDataLoadContainer if data is being loaded.
    protected static boolean enableJM = true;
    protected static boolean enableJMS = true;
    protected static boolean enableSvcs = true;

    protected Delegator delegator = null;
    protected GenericEngineFactory factory = null;
    protected Security security = null;
    protected Map<String, DispatchContext> localContext = new HashMap<String, DispatchContext>();
    protected Map<String, List<GenericServiceCallback>> callbacks = new HashMap<String, List<GenericServiceCallback>>();
    protected JobManager jm = null;
    protected JmsListenerFactory jlf = null;

    protected ServiceDispatcher(Delegator delegator, boolean enableJM, boolean enableJMS) {
        factory = new GenericEngineFactory(this);
        ServiceGroupReader.readConfig();
        ServiceEcaUtil.readConfig();

        this.delegator = delegator;

        if (delegator != null) {
            try {
                this.security = SecurityFactory.getInstance(delegator);
            } catch (SecurityConfigurationException e) {
                Debug.logError(e, "[ServiceDispatcher.init] : No instance of security implementation found.", module);
            }
        }

        // clean up the service semaphores of same instance
        if (delegator != null) {
            try {
                int rn = delegator.removeByAnd("ServiceSemaphore", "lockedByInstanceId", JobManager.instanceId);
                if (rn > 0) {
                    Debug.logInfo("[ServiceDispatcher.init] : Clean up " + rn + " service semaphors." , module);
                }
            } catch (GenericEntityException e) {
                Debug.logError(e, module);
            }
        }
        
        // job manager needs to always be running, but the poller thread does not
        try {
            Delegator origDelegator = this.delegator;
            if (!this.delegator.getOriginalDelegatorName().equals(this.delegator.getDelegatorName())) {
                origDelegator = DelegatorFactory.getDelegator(this.delegator.getOriginalDelegatorName());
            }
            this.jm = JobManager.getInstance(origDelegator, enableJM);
        } catch (GeneralRuntimeException e) {
            Debug.logWarning(e.getMessage(), module);
        }

        // make sure we haven't disabled these features from running
        if (enableJMS) {
            this.jlf = JmsListenerFactory.getInstance(delegator);
        }
    }

    protected ServiceDispatcher(Delegator delegator) {
        this(delegator, enableJM, enableJMS);
    }

    /**
     * Returns a pre-registered instance of the ServiceDispatcher associated with this delegator.
     * @param name the name of the DispatchContext
     * @param delegator the local delegator
     * @return A reference to the LocalDispatcher associated with the DispatchContext
     */
    public static LocalDispatcher getLocalDispatcher(String name, Delegator delegator) {
        // get the ServiceDispatcher associated to the delegator (if not found in the cache, it will be created and added to the cache)
        ServiceDispatcher sd = getInstance(delegator);
        // if a DispatchContext has already been already registered as "name" then return the LocalDispatcher associated with it
        if (sd.containsContext(name)) {
            return sd.getLocalDispatcher(name);
        }
        // otherwise return null
        return null;
    }

    /**
     * Returns an instance of the ServiceDispatcher associated with this delegator.
     * @param delegator the local delegator
     * @return A reference to this global ServiceDispatcher
     */
    public static ServiceDispatcher getInstance(Delegator delegator) {
        ServiceDispatcher sd;
        String dispatcherKey = delegator != null ? delegator.getDelegatorName() : "null";
        sd = dispatchers.get(dispatcherKey);
        if (sd == null) {
            if (Debug.verboseOn())
                Debug.logVerbose("[ServiceDispatcher.getInstance] : No instance found (" + dispatcherKey + ").", module);
            sd = new ServiceDispatcher(delegator);
            ServiceDispatcher cachedDispatcher = dispatchers.putIfAbsent(dispatcherKey, sd);
            if (cachedDispatcher == null) {
                // if the cachedDispatcher is null, then it means that
                // the new dispatcher created by this thread was successfully added to the cache
                // only in this case, the thread runs runStartupServices
                sd.runStartupServices();
                cachedDispatcher = sd;
            }
            sd = cachedDispatcher;
        }
        return sd;
    }

    /**
     * Registers the loader with this ServiceDispatcher
     * @param context the context of the local dispatcher
     */
    public void register(DispatchContext context) {
        if (Debug.infoOn()) Debug.logInfo("Registering dispatcher: " + context.getName(), module);
        this.localContext.put(context.getName(), context);
    }
    /**
     * De-Registers the loader with this ServiceDispatcher
     * @param local the LocalDispatcher to de-register
     */
    public void deregister(LocalDispatcher local) {
        if (Debug.infoOn()) Debug.logInfo("De-Registering dispatcher: " + local.getName(), module);
        localContext.remove(local.getName());
        if (localContext.size() == 0) {
            try {
                 this.shutdown();
             } catch (GenericServiceException e) {
                 Debug.logError(e, "Trouble shutting down ServiceDispatcher!", module);
             }
        }
    }

    public synchronized void registerCallback(String serviceName, GenericServiceCallback cb) {
        List<GenericServiceCallback> callBackList = callbacks.get(serviceName);
        if (callBackList == null) {
            callBackList = new LinkedList<GenericServiceCallback>();
        }
        callBackList.add(cb);
        callbacks.put(serviceName, callBackList);
    }

    public List<GenericServiceCallback> getCallbacks(String serviceName) {
        return callbacks.get(serviceName);
    }

    /**
     * Run the service synchronously and return the result.
     * @param localName Name of the context to use.
     * @param service Service model object.
     * @param context Map of name, value pairs composing the context.
     * @return Map of name, value pairs composing the result.
     * @throws ServiceAuthException
     * @throws ServiceValidationException
     * @throws GenericServiceException
     */
    public Map<String, Object> runSync(String localName, ModelService service, Map<String, ? extends Object> context) throws ServiceAuthException, ServiceValidationException, GenericServiceException {
        return runSync(localName, service, context, true);
    }

    /**
     * Run the service synchronously and IGNORE the result.
     * @param localName Name of the context to use.
     * @param service Service model object.
     * @param context Map of name, value pairs composing the context.
     * @throws ServiceAuthException
     * @throws ServiceValidationException
     * @throws GenericServiceException
     */
    public void runSyncIgnore(String localName, ModelService service, Map<String, ? extends Object> context) throws ServiceAuthException, ServiceValidationException, GenericServiceException {
        runSync(localName, service, context, false);
    }

    /**
     * Run the service synchronously and return the result.
     * @param localName Name of the context to use.
     * @param modelService Service model object.
     * @param params Map of name, value pairs composing the parameters.
     * @param validateOut Validate OUT parameters
     * @return Map of name, value pairs composing the result.
     * @throws ServiceAuthException
     * @throws ServiceValidationException
     * @throws GenericServiceException
     */
    public Map<String, Object> runSync(String localName, ModelService modelService, Map<String, ? extends Object> params, boolean validateOut) throws ServiceAuthException, ServiceValidationException, GenericServiceException {
        long serviceStartTime = System.currentTimeMillis();
        Map<String, Object> result = new HashMap<String, Object>();
        ServiceSemaphore lock = null;
        Map<String, List<ServiceEcaRule>> eventMap = null;
        Map<String, Object> ecaContext = null;
        RunningService rs = null;
        DispatchContext ctx = localContext.get(localName);
        GenericEngine engine = null;
        Transaction parentTransaction = null;
        boolean isFailure = false;
        boolean isError = false;
        boolean beganTrans = false;
        try {
            // check for semaphore and acquire a lock
            if ("wait".equals(modelService.semaphore) || "fail".equals(modelService.semaphore)) {
                lock = new ServiceSemaphore(delegator, modelService);
                lock.acquire();
            }

            if (Debug.verboseOn() || modelService.debug) {
                Debug.logVerbose("[ServiceDispatcher.runSync] : invoking service " + modelService.name + " [" + modelService.location +
                    "/" + modelService.invoke + "] (" + modelService.engineName + ")", module);
            }

            Map<String, Object> context = new HashMap<String, Object>();
            if (params != null) {
                context.putAll(params);
            }
            // check the locale
            Locale locale = this.checkLocale(context);

            // set up the running service log
            rs = this.logService(localName, modelService, GenericEngine.SYNC_MODE);

            // get eventMap once for all calls for speed, don't do event calls if it is null
            eventMap = ServiceEcaUtil.getServiceEventMap(modelService.name);
            engine = this.getGenericEngine(modelService.engineName);


            // set IN attributes with default-value as applicable
            modelService.updateDefaultValues(context, ModelService.IN_PARAM);
            //Debug.logInfo("=========================== " + modelService.name + " 1 tx status =" + TransactionUtil.getStatusString() + ", modelService.requireNewTransaction=" + modelService.requireNewTransaction + ", modelService.useTransaction=" + modelService.useTransaction + ", TransactionUtil.isTransactionInPlace()=" + TransactionUtil.isTransactionInPlace(), module);
            if (modelService.useTransaction) {
                if (TransactionUtil.isTransactionInPlace()) {
                    // if a new transaction is needed, do it here; if not do nothing, just use current tx
                    if (modelService.requireNewTransaction) {
                        parentTransaction = TransactionUtil.suspend();
                        if (TransactionUtil.isTransactionInPlace()) {
                            throw new GenericTransactionException("In service " + modelService.name + " transaction is still in place after suspend, status is " + TransactionUtil.getStatusString());
                        }
                        // now start a new transaction
                        beganTrans = TransactionUtil.begin(modelService.transactionTimeout);
                    }
                } else {
                    beganTrans = TransactionUtil.begin(modelService.transactionTimeout);
                }
                // enlist for XAResource debugging
                if (beganTrans && TransactionUtil.debugResources()) {
                    DebugXaResource dxa = new DebugXaResource(modelService.name);
                    try {
                        dxa.enlist();
                    } catch (Exception e) {
                        Debug.logError(e, module);
                    }
                }
            }

            try {
                int lockRetriesRemaining = LOCK_RETRIES;
                boolean needsLockRetry = false;

                do {
                    // Ensure this is reset to false on each pass
                    needsLockRetry = false;

                    lockRetriesRemaining--;

                    // NOTE: general pattern here is to do everything up to the main service call, and retry it all if
                    //needed because those will be part of the same transaction and have been rolled back
                    // TODO: if there is an ECA called async or in a new transaction it won't get rolled back
                    //but will be called again, which means the service may complete multiple times! that would be for
                    //pre-invoke and earlier events only of course


                    // setup global transaction ECA listeners to execute later
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-rollback", ctx, context, result, isError, isFailure);
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-commit", ctx, context, result, isError, isFailure);

                    // pre-auth ECA
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "auth", ctx, context, result, isError, isFailure);

                    // check for pre-auth failure/errors
                    isFailure = ServiceUtil.isFailure(result);
                    isError = ServiceUtil.isError(result);

                    //Debug.logInfo("After [" + modelService.name + "] pre-auth ECA, before auth; isFailure=" + isFailure + ", isError=" + isError, module);

                    context = checkAuth(localName, context, modelService);
                    GenericValue userLogin = (GenericValue) context.get("userLogin");

                    if (modelService.auth && userLogin == null) {
                        throw new ServiceAuthException("User authorization is required for this service: " + modelService.name + modelService.debugInfo());
                    }

                    // now that we have authed, if there is a userLogin, set the EE userIdentifier
                    if (userLogin != null && userLogin.getString("userLoginId") != null) {
                        GenericDelegator.pushUserIdentifier(userLogin.getString("userLoginId"));
                    }

                    // pre-validate ECA
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "in-validate", ctx, context, result, isError, isFailure);

                    // check for pre-validate failure/errors
                    isFailure = ServiceUtil.isFailure(result);
                    isError = ServiceUtil.isError(result);

                    //Debug.logInfo("After [" + modelService.name + "] pre-in-validate ECA, before in-validate; isFailure=" + isFailure + ", isError=" + isError, module);

                    // validate the context
                    if (modelService.validate && !isError && !isFailure) {
                        try {
                            modelService.validate(context, ModelService.IN_PARAM, locale);
                        } catch (ServiceValidationException e) {
                            Debug.logError(e, "Incoming context (in runSync : " + modelService.name + ") does not match expected requirements", module);
                            throw e;
                        }
                    }

                    // pre-invoke ECA
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "invoke", ctx, context, result, isError, isFailure);

                    // check for pre-invoke failure/errors
                    isFailure = ServiceUtil.isFailure(result);
                    isError = ServiceUtil.isError(result);

                    //Debug.logInfo("After [" + modelService.name + "] pre-invoke ECA, before invoke; isFailure=" + isFailure + ", isError=" + isError, module);

                    // ===== invoke the service =====
                    if (!isError && !isFailure) {
                        Map<String, Object> invokeResult = null;
                        invokeResult = engine.runSync(localName, modelService, context);
                        engine.sendCallbacks(modelService, context, invokeResult, GenericEngine.SYNC_MODE);
                        if (invokeResult != null) {
                            result.putAll(invokeResult);
                        } else {
                            Debug.logWarning("Service (in runSync : " + modelService.name + ") returns null result", module);
                        }
                    }

                    // re-check the errors/failures
                    isFailure = ServiceUtil.isFailure(result);
                    isError = ServiceUtil.isError(result);

                    //Debug.logInfo("After [" + modelService.name + "] invoke; isFailure=" + isFailure + ", isError=" + isError, module);

                    if (beganTrans) {
                        // crazy stuff here: see if there was a deadlock or other such error and if so retry... which we can ONLY do if we own the transaction!

                        String errMsg = ServiceUtil.getErrorMessage(result);

                        // look for the string DEADLOCK in an upper-cased error message; tested on: Derby, MySQL
                        // - Derby 10.2.2.0 deadlock string: "A lock could not be obtained due to a deadlock"
                        // - MySQL ? deadlock string: "Deadlock found when trying to get lock; try restarting transaction"
                        // - Postgres ? deadlock string: TODO
                        // - Other ? deadlock string: TODO
                        // TODO need testing in other databases because they all return different error messages for this!

                        // NOTE DEJ20070908 are there other things we need to check? I don't think so because these will
                        //be Entity Engine errors that will be caught and come back in an error message... IFF the
                        //service is written to not ignore it of course!
                        if (errMsg != null && errMsg.toUpperCase().indexOf("DEADLOCK") >= 0) {
                            // it's a deadlock! retry...
                            String retryMsg = "RETRYING SERVICE [" + modelService.name + "]: Deadlock error found in message [" + errMsg + "]; retry [" + (LOCK_RETRIES - lockRetriesRemaining) + "] of [" + LOCK_RETRIES + "]";

                            // make sure the old transaction is rolled back, and then start a new one

                            // if there is an exception in these things, let the big overall thing handle it
                            TransactionUtil.rollback(beganTrans, retryMsg, null);

                            beganTrans = TransactionUtil.begin(modelService.transactionTimeout);
                            // enlist for XAResource debugging
                            if (beganTrans && TransactionUtil.debugResources()) {
                                DebugXaResource dxa = new DebugXaResource(modelService.name);
                                try {
                                    dxa.enlist();
                                } catch (Exception e) {
                                    Debug.logError(e, module);
                                }
                            }

                            if (!beganTrans) {
                                // just log and let things roll through, will be considered an error and ECAs, etc will run according to that
                                Debug.logError("After rollback attempt for lock retry did not begin a new transaction!", module);
                            } else {
                                // deadlocks can be resolved by retring immediately as conflicting operations in the other thread will have cleared
                                needsLockRetry = true;

                                // reset state variables
                                result = new HashMap<String, Object>();
                                isFailure = false;
                                isError = false;

                                Debug.logWarning(retryMsg, module);
                            }

                            // look for lock wait timeout error, retry in a different way by running after the parent transaction finishes, ie attach to parent tx
                            // - Derby 10.2.2.0 lock wait timeout string: "A lock could not be obtained within the time requested"
                            // - MySQL ? lock wait timeout string: "Lock wait timeout exceeded; try restarting transaction"
                            if (errMsg != null && (errMsg.indexOf("A lock could not be obtained within the time requested") >= 0 ||
                                    errMsg.indexOf("Lock wait timeout exceeded") >= 0)) {
                                // TODO: add to run after parent tx
                            }
                        }
                    }
                } while (needsLockRetry && lockRetriesRemaining > 0);

                // create a new context with the results to pass to ECA services; necessary because caller may reuse this context
                ecaContext = new HashMap<String, Object>();
                ecaContext.putAll(context);
                // copy all results: don't worry parameters that aren't allowed won't be passed to the ECA services
                ecaContext.putAll(result);

                // setup default OUT values
                modelService.updateDefaultValues(context, ModelService.OUT_PARAM);

                // validate the result
                if (modelService.validate && validateOut) {
                    // pre-out-validate ECA
                    if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "out-validate", ctx, ecaContext, result, isError, isFailure);
                    try {
                        modelService.validate(result, ModelService.OUT_PARAM, locale);
                    } catch (ServiceValidationException e) {
                        throw new GenericServiceException("Outgoing result (in runSync : " + modelService.name + ") does not match expected requirements", e);
                    }
                }

                // pre-commit ECA
                if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "commit", ctx, ecaContext, result, isError, isFailure);

                // check for pre-commit failure/errors
                isFailure = ServiceUtil.isFailure(result);
                isError = ServiceUtil.isError(result);

                // global-commit-post-run ECA, like global-commit but gets the context after the service is run
                if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "global-commit-post-run", ctx, ecaContext, result, isError, isFailure);

                // check for failure and log on info level; this is used for debugging
                if (isFailure) {
                    Debug.logWarning("Service Failure [" + modelService.name + "]: " + ServiceUtil.getErrorMessage(result), module);
                }
            } catch (Throwable t) {
                if (Debug.timingOn()) {
                    UtilTimer.closeTimer(localName + " / " + modelService.name, "Sync service failed...", module);
                }
                String errMsg = "Service [" + modelService.name + "] threw an unexpected exception/error";
                engine.sendCallbacks(modelService, context, t, GenericEngine.SYNC_MODE);
                try {
                    TransactionUtil.rollback(beganTrans, errMsg, t);
                } catch (GenericTransactionException te) {
                    Debug.logError(te, "Cannot rollback transaction", module);
                }
                rs.setEndStamp();
                if (t instanceof ServiceAuthException) {
                    throw (ServiceAuthException) t;
                } else if (t instanceof ServiceValidationException) {
                    throw (ServiceValidationException) t;
                } else if (t instanceof GenericServiceException) {
                    throw (GenericServiceException) t;
                } else {
                    throw new GenericServiceException("Service [" + modelService.name + "] Failed" + modelService.debugInfo() , t);
                }
            } finally {
                // if there was an error, rollback transaction, otherwise commit
                if (isError) {
                    String errMsg = "Error in Service [" + modelService.name + "]: " + ServiceUtil.getErrorMessage(result);
                    Debug.logError(errMsg, module);

                    // rollback the transaction
                    try {
                        TransactionUtil.rollback(beganTrans, errMsg, null);
                    } catch (GenericTransactionException e) {
                        Debug.logError(e, "Could not rollback transaction: " + e.toString(), module);
                    }
                } else {
                    // commit the transaction
                    try {
                        TransactionUtil.commit(beganTrans);
                    } catch (GenericTransactionException e) {
                        GenericDelegator.popUserIdentifier();
                        String errMsg = "Could not commit transaction for service [" + modelService.name + "] call";
                        Debug.logError(e, errMsg, module);
                        if (e.getMessage() != null) {
                            errMsg = errMsg + ": " + e.getMessage();
                        }
                        throw new GenericServiceException(errMsg);
                    }
                }

                // call notifications -- event is determined from the result (success, error, fail)
                modelService.evalNotifications(this.getLocalContext(localName), context, result);

                // clear out the EE userIdentifier
                GenericDelegator.popUserIdentifier();
            }
        } catch (GenericTransactionException te) {
            Debug.logError(te, "Problems with the transaction", module);
            throw new GenericServiceException("Problems with the transaction.", te.getNested());
        } finally {
            if (lock != null) {
                // release the semaphore lock
                try {
                    lock.release();
                } catch (GenericServiceException e) {
                    Debug.logWarning(e, "Exception thrown while unlocking semaphore: ", module);
                }
            }

            // resume the parent transaction
            if (parentTransaction != null) {
                try {
                    TransactionUtil.resume(parentTransaction);
                } catch (GenericTransactionException ite) {
                    Debug.logWarning(ite, "Transaction error, not resumed", module);
                    throw new GenericServiceException("Resume transaction exception, see logs");
                }
            }
        }

        // pre-return ECA
        if (eventMap != null) ServiceEcaUtil.evalRules(modelService.name, eventMap, "return", ctx, ecaContext, result, isError, isFailure);

        rs.setEndStamp();

        long timeToRun = System.currentTimeMillis() - serviceStartTime;
        long showServiceDurationThreshold = UtilProperties.getPropertyAsLong("service", "showServiceDurationThreshold", 0);
        long showSlowServiceThreshold = UtilProperties.getPropertyAsLong("service", "showSlowServiceThreshold", 1000);
                
        if (Debug.timingOn() && timeToRun > showServiceDurationThreshold) {
            Debug.logTiming("Sync service [" + localName + "/" + modelService.name + "] finished in [" + timeToRun + "] milliseconds", module);
        } else if (Debug.infoOn() && timeToRun > showSlowServiceThreshold) {
            Debug.logTiming("Slow sync service execution detected: service [" + localName + "/" + modelService.name + "] finished in [" + timeToRun + "] milliseconds", module);
        }
        if ((Debug.verboseOn() || modelService.debug) && timeToRun > 50 && !modelService.hideResultInLog) {
            // Sanity check - some service results can be multiple MB in size. Limit message size to 10K.
            String resultStr = result.toString();
            if (resultStr.length() > 10240) {
                resultStr = resultStr.substring(0, 10226) + "...[truncated]";
            }
            Debug.logVerbose("Sync service [" + localName + "/" + modelService.name + "] finished with response [" + resultStr + "]", module);
        }
        if (modelService.metrics != null) {
            modelService.metrics.recordServiceRate(1, timeToRun);
        }
        return result;
    }

    /**
     * Run the service asynchronously, passing an instance of GenericRequester that will receive the result.
     * @param localName Name of the context to use.
     * @param service Service model object.
     * @param params Map of name, value pairs composing the parameters.
     * @param requester Object implementing GenericRequester interface which will receive the result.
     * @param persist True for store/run; False for run.
     * @throws ServiceAuthException
     * @throws ServiceValidationException
     * @throws GenericServiceException
     */
    public void runAsync(String localName, ModelService service, Map<String, ? extends Object> params, GenericRequester requester, boolean persist) throws ServiceAuthException, ServiceValidationException, GenericServiceException {
        if (Debug.timingOn()) {
            UtilTimer.timerLog(localName + " / " + service.name, "ASync service started...", module);
        }
        if (Debug.verboseOn() || service.debug) {
            Debug.logVerbose("[ServiceDispatcher.runAsync] : preparing service " + service.name + " [" + service.location + "/" + service.invoke +
                "] (" + service.engineName + ")", module);
        }

        Map<String, Object> context = new HashMap<String, Object>();
        if (params != null) {
            context.putAll(params);
        }
        // setup the result map
        Map<String, Object> result = new HashMap<String, Object>();
        boolean isFailure = false;
        boolean isError = false;

        // set up the running service log
        this.logService(localName, service, GenericEngine.ASYNC_MODE);

        // check the locale
        Locale locale = this.checkLocale(context);

        // setup the engine and context
        DispatchContext ctx = localContext.get(localName);
        GenericEngine engine = this.getGenericEngine(service.engineName);

        // for isolated transactions
        Transaction parentTransaction = null;
        // start the transaction
        boolean beganTrans = false;

        try {
            if (service.useTransaction) {
                if (TransactionUtil.isTransactionInPlace()) {
                    // if a new transaction is needed, do it here; if not do nothing, just use current tx
                    if (service.requireNewTransaction) {
                        parentTransaction = TransactionUtil.suspend();
                        // now start a new transaction
                        beganTrans = TransactionUtil.begin(service.transactionTimeout);
                    }
                } else {
                    beganTrans = TransactionUtil.begin(service.transactionTimeout);
                }
                // enlist for XAResource debugging
                if (beganTrans && TransactionUtil.debugResources()) {
                    DebugXaResource dxa = new DebugXaResource(service.name);
                    try {
                        dxa.enlist();
                    } catch (Exception e) {
                        Debug.logError(e, module);
                    }
                }
            }

            try {
                // get eventMap once for all calls for speed, don't do event calls if it is null
                Map<String, List<ServiceEcaRule>> eventMap = ServiceEcaUtil.getServiceEventMap(service.name);

                // pre-auth ECA
                if (eventMap != null) ServiceEcaUtil.evalRules(service.name, eventMap, "auth", ctx, context, result, isError, isFailure);

                context = checkAuth(localName, context, service);
                Object userLogin = context.get("userLogin");

                if (service.auth && userLogin == null) {
                    throw new ServiceAuthException("User authorization is required for this service: " + service.name + service.debugInfo());
                }

                // pre-validate ECA
                if (eventMap != null) ServiceEcaUtil.evalRules(service.name, eventMap, "in-validate", ctx, context, result, isError, isFailure);

                // check for pre-validate failure/errors
                isFailure = ModelService.RESPOND_FAIL.equals(result.get(ModelService.RESPONSE_MESSAGE));
                isError = ModelService.RESPOND_ERROR.equals(result.get(ModelService.RESPONSE_MESSAGE));

                // validate the context
                if (service.validate && !isError && !isFailure) {
                    try {
                        service.validate(context, ModelService.IN_PARAM, locale);
                    } catch (ServiceValidationException e) {
                        Debug.logError(e, "Incoming service context (in runAsync: " + service.name + ") does not match expected requirements", module);
                        throw e;
                    }
                }

                // run the service
                if (!isError && !isFailure) {
                    if (requester != null) {
                        engine.runAsync(localName, service, context, requester, persist);
                    } else {
                        engine.runAsync(localName, service, context, persist);
                    }
                    engine.sendCallbacks(service, context, GenericEngine.ASYNC_MODE);
                }

                if (Debug.timingOn()) {
                    UtilTimer.closeTimer(localName + " / " + service.name, "ASync service finished...", module);
                }
            } catch (Throwable t) {
                if (Debug.timingOn()) {
                    UtilTimer.closeTimer(localName + " / " + service.name, "ASync service failed...", module);
                }
                String errMsg = "Service [" + service.name + "] threw an unexpected exception/error";
                Debug.logError(t, errMsg, module);
                engine.sendCallbacks(service, context, t, GenericEngine.ASYNC_MODE);
                try {
                    TransactionUtil.rollback(beganTrans, errMsg, t);
                } catch (GenericTransactionException te) {
                    Debug.logError(te, "Cannot rollback transaction", module);
                }
                if (t instanceof ServiceAuthException) {
                    throw (ServiceAuthException) t;
                } else if (t instanceof ServiceValidationException) {
                    throw (ServiceValidationException) t;
                } else if (t instanceof GenericServiceException) {
                    throw (GenericServiceException) t;
                } else {
                    throw new GenericServiceException("Service [" + service.name + "] Failed" + service.debugInfo() , t);
                }
            } finally {
                // always try to commit the transaction since we don't know in this case if its was an error or not
                try {
                    TransactionUtil.commit(beganTrans);
                } catch (GenericTransactionException e) {
                    Debug.logError(e, "Could not commit transaction", module);
                    throw new GenericServiceException("Commit transaction failed");
                }
            }
        } catch (GenericTransactionException se) {
            Debug.logError(se, "Problems with the transaction", module);
            throw new GenericServiceException("Problems with the transaction: " + se.getMessage() + "; See logs for more detail");
        } finally {
            // resume the parent transaction
            if (parentTransaction != null) {
                try {
                    TransactionUtil.resume(parentTransaction);
                } catch (GenericTransactionException ise) {
                    Debug.logError(ise, "Trouble resuming parent transaction", module);
                    throw new GenericServiceException("Resume transaction exception: " + ise.getMessage() + "; See logs for more detail");
                }
            }
        }
    }

    /**
     * Run the service asynchronously and IGNORE the result.
     * @param localName Name of the context to use.
     * @param service Service model object.
     * @param context Map of name, value pairs composing the context.
     * @param persist True for store/run; False for run.
     * @throws ServiceAuthException
     * @throws ServiceValidationException
     * @throws GenericServiceException
     */
    public void runAsync(String localName, ModelService service, Map<String, ? extends Object> context, boolean persist) throws ServiceAuthException, ServiceValidationException, GenericServiceException {
        this.runAsync(localName, service, context, null, persist);
    }

    /**
     * Gets the GenericEngine instance that corresponds to the given name
     * @param engineName Name of the engine
     * @return GenericEngine instance that corresponds to the engineName
     */
    public GenericEngine getGenericEngine(String engineName) throws GenericServiceException {
        return factory.getGenericEngine(engineName);
    }

    /**
     * Gets the JobManager associated with this dispatcher
     * @return JobManager that is associated with this dispatcher
     */
    public JobManager getJobManager() {
        return this.jm;
    }

    /**
     * Gets the JmsListenerFactory which holds the message listeners.
     * @return JmsListenerFactory
     */
    public JmsListenerFactory getJMSListenerFactory() {
        return this.jlf;
    }

    /**
     * Gets the Delegator associated with this dispatcher
     * @return Delegator associated with this dispatcher
     */
    public Delegator getDelegator() {
        return this.delegator;
    }

    /**
     * Gets the Security object associated with this dispatcher
     * @return Security object associated with this dispatcher
     */
    public Security getSecurity() {
        return this.security;
    }

    /**
     * Gets the local context from a name
     * @param name of the context to find.
     */
    public DispatchContext getLocalContext(String name) {
        return localContext.get(name);
    }

    /**
     * Gets the local dispatcher from a name
     * @param name of the LocalDispatcher to find.
     * @return LocalDispatcher matching the loader name
     */
    public LocalDispatcher getLocalDispatcher(String name) {
        return localContext.get(name).getDispatcher();
    }

    /**
     * Test if this dispatcher instance contains the local context.
     * @param name of the local context
     * @return true if the local context is found in this dispatcher.
     */
    public boolean containsContext(String name) {
        return localContext.containsKey(name);
    }

    protected void shutdown() throws GenericServiceException {
        Debug.logImportant("Shutting down the service engine...", module);
        if (jlf != null) {
            // shutdown JMS listeners
            jlf.closeListeners();
        }
    }

    // checks if parameters were passed for authentication
    private Map<String, Object> checkAuth(String localName, Map<String, Object> context, ModelService origService) throws ServiceAuthException, GenericServiceException {
        String service = null;
        try {
            service = ServiceConfigUtil.getServiceEngine().getAuthorization().getServiceName();
        } catch (GenericConfigException e) {
            throw new GenericServiceException(e.getMessage(), e);
        }

        if (service == null) {
            throw new GenericServiceException("No Authentication Service Defined");
        }
        if (service.equals(origService.name)) {
            // manually calling the auth service, don't continue...
            return context;
        }

        if (UtilValidate.isNotEmpty(context.get("login.username"))) {
            // check for a username/password, if there log the user in and make the userLogin object
            String username = (String) context.get("login.username");

            if (UtilValidate.isNotEmpty(context.get("login.password"))) {
                String password = (String) context.get("login.password");

                context.put("userLogin", getLoginObject(service, localName, username, password, (Locale) context.get("locale")));
                context.remove("login.password");
            } else {
                context.put("userLogin", getLoginObject(service, localName, username, null, (Locale) context.get("locale")));
            }
            context.remove("login.username");
        } else {
            // if a userLogin object is there, make sure the given username/password exists in our local database
            GenericValue userLogin = (GenericValue) context.get("userLogin");

            if (userLogin != null) {
                // Because of encrypted passwords we can't just pass in the encrypted version of the password from the data, so we'll do something different and not run the userLogin service...

                //The old way: GenericValue newUserLogin = getLoginObject(service, localName, userLogin.getString("userLoginId"), userLogin.getString("currentPassword"), (Locale) context.get("locale"));
                GenericValue newUserLogin = null;
                try {
                    newUserLogin = this.getDelegator().findOne("UserLogin", true, "userLoginId", userLogin.get("userLoginId"));
                } catch (GenericEntityException e) {
                    Debug.logError(e, "Error looking up service authentication UserLogin: " + e.toString(), module);
                    // leave newUserLogin null, will be handled below
                }

                if (newUserLogin == null) {
                    // uh oh, couldn't validate that one...
                    // we'll have to remove it from the incoming context which will cause an auth error later if auth is required
                    Debug.logInfo("Service auth failed for userLoginId [" + userLogin.get("userLoginId") + "] because UserLogin record not found.", module);
                    context.remove("userLogin");
                } else if (newUserLogin.getString("currentPassword") != null && !newUserLogin.getString("currentPassword").equals(userLogin.getString("currentPassword"))) {
                    // passwords didn't match, remove the userLogin for failed auth
                    Debug.logInfo("Service auth failed for userLoginId [" + userLogin.get("userLoginId") + "] because UserLogin record currentPassword fields did not match; note that the UserLogin object passed into a service may need to have the currentPassword encrypted.", module);
                    context.remove("userLogin");
                }
            }
        }

        // evaluate permissions for the service or throw exception if fail.
        DispatchContext dctx = this.getLocalContext(localName);
        if (UtilValidate.isNotEmpty(origService.permissionServiceName)) {
            Map<String, Object> permResp = origService.evalPermission(dctx, context);
            Boolean hasPermission = (Boolean) permResp.get("hasPermission");
            if (hasPermission == null) {
                throw new ServiceAuthException("ERROR: the permission-service [" + origService.permissionServiceName + "] did not return a result. Not running the service [" + origService.name + "]");
            }
            if (hasPermission.booleanValue()) {
                context.putAll(permResp);
                context = origService.makeValid(context, ModelService.IN_PARAM);
            } else {
                String message = (String) permResp.get("failMessage");
                if (UtilValidate.isEmpty(message)) {
                    message = ServiceUtil.getErrorMessage(permResp);
                }
                if (UtilValidate.isEmpty(message)) {
                    message = "You do not have permission to invoke the service [" + origService.name + "]";
                }
                throw new ServiceAuthException(message);
            }
        } else {
            if (!origService.evalPermissions(dctx, context)) {
                throw new ServiceAuthException("You do not have permission to invoke the service [" + origService.name + "]");
            }
        }

        return context;
    }

    // gets a value object from name/password pair
    private GenericValue getLoginObject(String service, String localName, String username, String password, Locale locale) throws GenericServiceException {
        Map<String, Object> context = UtilMisc.toMap("login.username", username, "login.password", password, "isServiceAuth", true, "locale", locale);

        if (Debug.verboseOn()) Debug.logVerbose("[ServiceDispathcer.authenticate] : Invoking UserLogin Service", module);

        // get the dispatch context and service model
        DispatchContext dctx = getLocalContext(localName);
        ModelService model = dctx.getModelService(service);

        // get the service engine
        GenericEngine engine = getGenericEngine(model.engineName);

        // invoke the service and get the UserLogin value object
        Map<String, Object> result = engine.runSync(localName, model, context);
        return (GenericValue) result.get("userLogin");
    }

    // checks the locale object in the context
    private Locale checkLocale(Map<String, Object> context) {
        Object locale = context.get("locale");
        Locale newLocale = null;

        if (locale != null) {
            if (locale instanceof Locale) {
                return (Locale) locale;
            } else if (locale instanceof String) {
                // en_US = lang_COUNTRY
                newLocale = UtilMisc.parseLocale((String) locale);
            }
        }

        if (newLocale == null) {
            newLocale = Locale.getDefault();
        }
        context.put("locale", newLocale);
        return newLocale;
    }

    // run startup services
    private synchronized int runStartupServices() {
        if (!enableSvcs || jm == null) {
            return 0;
        }
        int servicesScheduled = 0;
        List<StartupService> startupServices = null;
        try {
            startupServices = ServiceConfigUtil.getServiceEngine().getStartupServices();
        } catch (GenericConfigException e) {
            Debug.logWarning(e, "Exception thrown while getting service config: ", module);
            return 0;
        }
        for (StartupService startupService : startupServices) {
            String serviceName = startupService.getName();
            String runtimeDataId = startupService.getRuntimeDataId();
            int runtimeDelay = startupService.getRuntimeDelay();
            String sendToPool = startupService.getRunInPool();
            if (UtilValidate.isEmpty(sendToPool)) {
                try {
                    sendToPool = ServiceConfigUtil.getServiceEngine().getThreadPool().getSendToPool();
                } catch (GenericConfigException e) {
                    Debug.logError(e, "Unable to get send pool in service [" + serviceName + "]: ", module);
                }
            }
            // current time + 1 sec delay + extended delay
            long runtime = System.currentTimeMillis() + 1000 + runtimeDelay;
            try {
                jm.schedule(sendToPool, serviceName, runtimeDataId, runtime);
            } catch (JobManagerException e) {
                Debug.logError(e, "Unable to schedule service [" + serviceName + "]", module);
            }
        }

        return servicesScheduled;
    }

    private RunningService logService(String localName, ModelService modelService, int mode) {
        // set up the running service log
        RunningService rs = new RunningService(localName, modelService, mode);
        runLog.put(rs, this);
        return rs;
    }

    /**
     * Enables/Disables the Job Manager/Scheduler globally
     * (this will not effect any dispatchers already running)
     * @param enable
     */
    public static void enableJM(boolean enable) {
        ServiceDispatcher.enableJM = enable;
    }

    /**
     * Enables/Disables the JMS listeners globally
     * (this will not effect any dispatchers already running)
     * @param enable
     */
    public static void enableJMS(boolean enable) {
        ServiceDispatcher.enableJMS = enable;
    }

    /**
     * Enables/Disables the startup services globally
     * (this will not effect any dispatchers already running)
     * @param enable
     */
    public static void enableSvcs(boolean enable) {
        ServiceDispatcher.enableSvcs = enable;
    }

    public static Map<RunningService, ServiceDispatcher> getServiceLogMap() {
        return runLog;
    }

}
