/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.uima.adapter.jms.activemq;

import java.io.ByteArrayOutputStream;
import java.io.NotSerializableException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.management.ServiceNotFoundException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.Channel;
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.OutputChannel;
import org.apache.uima.aae.SerializerCache;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.UimaSerializer;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
import org.apache.uima.aae.error.MessageTimeoutException;
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.error.UimaEEServiceException;
import org.apache.uima.aae.jmx.ServiceInfo;
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.SerialFormat;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.util.Level;

import com.thoughtworks.xstream.XStream;

public class JmsOutputChannel implements OutputChannel {

  private static final Class CLASS_NAME = JmsOutputChannel.class;

  private static final long INACTIVITY_TIMEOUT = 5; // MINUTES

  private CountDownLatch controllerLatch = new CountDownLatch(1);

  private ActiveMQConnectionFactory connectionFactory;

  // Name of the external queue this service uses to receive messages
  private String serviceInputEndpoint;

  // Name of the internal queue this services uses to receive messages from delegates
  private String controllerInputEndpoint;

  // Name of the queue used by Cas Multiplier to receive requests to free CASes
  private String secondaryInputEndpoint;

  // The service controller
  private AnalysisEngineController analysisEngineController;

  // Cache containing connections to destinations this service interacts with
  // Each entry in this cache has an inactivity timer that times amount of time
  // elapsed since the last time a message was sent to the destination.
  private ConcurrentHashMap connectionMap = new ConcurrentHashMap();

  private String serverURI;

  private String serviceProtocolList = "";

  private volatile boolean aborting = false;

  private Destination freeCASTempQueue;

  private String hostIP = null;

  // By default every message will have expiration time added
  private volatile boolean addTimeToLive = true;

  private XStream xstream = new XStream();

  private Lock xstreamLock = new ReentrantLock();

  private Semaphore connectionSemaphore = new Semaphore(1);
  
  public JmsOutputChannel() {
	UimaSerializer.initXStream(xstream);
	   
    try {
    	if( System.getenv("IP") != null ) {
   		  hostIP = System.getenv("IP");
    	} else {
   	      hostIP = InetAddress.getLocalHost().getHostAddress();
    	}
    } catch (Exception e) { /* silently deal with this error */
    }
    // Check the environment for existence of NoTTL tag. If present,
    // the deployer of the service wants to avoid message expiration.
    if (System.getProperty("NoTTL") != null) {
      addTimeToLive = false;
    }

  }
  public boolean isStopping() {
	  return aborting;
  }
  /**
   * Sets the ActiveMQ Broker URI
   */
  public void setServerURI(String aServerURI) {
    serverURI = aServerURI;
  }

  protected void setFreeCasQueue(Destination destination) {
    freeCASTempQueue = destination;
  }

  public String getServerURI() {
    return serverURI;
  }

  public String getName() {
    return "";
  }

  /**
   * 
   * @param connectionFactory
   */
  public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
    this.connectionFactory.setTrustAllPackages(true);
  }

  public void setServiceInputEndpoint(String anEnpoint) {
    serviceInputEndpoint = anEnpoint;
  }

  public void setSecondaryInputQueue(String anEndpoint) {
    secondaryInputEndpoint = anEndpoint;
  }

  public ActiveMQConnectionFactory getConnectionFactory() {
    return this.connectionFactory;
  }

  public void initialize() throws AsynchAEException {
    if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "initialize",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connector_list__FINE",
                new Object[] { System.getProperty("ActiveMQConnectors") });
      }
      // Aggregate controller set this System property at startup in
      serviceProtocolList = System.getProperty("ActiveMQConnectors");
    }
  }

  /**
   * Serializes CAS using indicated Serializer.
   * 
   * @param aCAS
   *          - CAS instance to serialize
   * @param aSerializerKey
   *          - a key identifying which serializer to use
   * @return - String - serialized CAS as String
   * @throws Exception
   */
  public String serializeCAS(boolean isReply, CAS aCAS, String aCasReferenceId,
          SerialFormat aSerializerKey) throws Exception {

    long start = getAnalysisEngineController().getCpuTime();

    String serializedCas = null;
    //  Fetch dedicated Serializer associated with this thread
    UimaSerializer serializer = SerializerCache.lookupSerializerByThreadId();

    if (isReply || (aSerializerKey == SerialFormat.XMI)) {
      CacheEntry cacheEntry = getAnalysisEngineController().getInProcessCache()
              .getCacheEntryForCAS(aCasReferenceId);

      XmiSerializationSharedData serSharedData;
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "serializeCAS",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINE",
                new Object[] { aCasReferenceId });
      }
      if (isReply) {
        serSharedData = cacheEntry.getDeserSharedData();
        
        if (cacheEntry.acceptsDeltaCas()
                && (cacheEntry.getMarker() != null && cacheEntry.getMarker().isValid())) {
          serializedCas = serializer.serializeCasToXmi(aCAS, serSharedData, cacheEntry
                  .getMarker());
          cacheEntry.setSentDeltaCas(true);
        } else {
          serializedCas = serializer.serializeCasToXmi(aCAS, serSharedData);
          cacheEntry.setSentDeltaCas(false);
        }
        // if marker is invalid, create a fresh marker.
        if (cacheEntry.getMarker() != null && !cacheEntry.getMarker().isValid()) {
          cacheEntry.setMarker(aCAS.createMarker());
        }
        if ( !cacheEntry.sentDeltaCas() ) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "serializeCAS",
                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINEST",
                      new Object[] { aCasReferenceId, "FULL Cas serialized and sent." });
          }
        }
      } else {
        serSharedData = cacheEntry.getDeserSharedData();
        if (serSharedData == null) {
          serSharedData = new XmiSerializationSharedData();
          cacheEntry.setXmiSerializationData(serSharedData);
        }
        serializedCas = serializer.serializeCasToXmi(aCAS, serSharedData);
        int maxOutgoingXmiId = serSharedData.getMaxXmiId();
        // Save High Water Mark in case a merge is needed
        getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId)
                .setHighWaterMark(maxOutgoingXmiId);
      }

    } 
//      else if ("xcas".equalsIgnoreCase(aSerializerKey)) {
//      // Default is XCAS
//      ByteArrayOutputStream bos = new ByteArrayOutputStream();
//      try {
//        serializer.serializeToXCAS(bos, aCAS, null, null, null);
//        serializedCas = bos.toString();
//      } catch (Exception e) {
//        throw e;
//      } finally {
//        bos.close();
//      }
//    }

    LongNumericStatistic statistic;
    if ((statistic = getAnalysisEngineController().getMonitor().getLongNumericStatistic("",
            Monitor.TotalSerializeTime)) != null) {
      statistic.increment(getAnalysisEngineController().getCpuTime() - start);
    }

    return serializedCas;
  }

  /**
   * This method verifies that the destination (queue) exists. It opens a connection the a broker,
   * creates a session and a message producer. Finally, using the message producer, sends an empty
   * message to a queue. This API support enables checking for existence of the reply (temp) queue
   * before any processing of a cas is done. This is an optimization to prevent expensive processing
   * if the client destination is no longer available.
   */
  public void bindWithClientEndpoint(Endpoint anEndpoint) throws Exception {
    // check if the reply endpoint is a temp destination
    if (anEndpoint.getDestination() != null) {
      // create message producer if one doesnt exist for this destination
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
      // Create empty message
      TextMessage tm = endpointConnection.produceTextMessage("");
      // test sending a message to reply endpoint. This tests existence of
      // a temp queue. If the client has been shutdown, this will fail
      // with an exception.
      endpointConnection.send(tm, 0, false);
    }
  }

  private long getInactivityTimeout(String destination, String brokerURL) {
    if (System.getProperty(JmsConstants.SessionTimeoutOverride) != null) {
      try {
        long overrideTimeoutValue = Long.parseLong(System
                .getProperty(JmsConstants.SessionTimeoutOverride));
        // endpointConnection.setInactivityTimeout(overrideTimeoutValue); // If the connection is
        // not used within this interval it will be removed
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "getEndpointConnection",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_override_connection_timeout__FINE",
                  new Object[] { analysisEngineController, overrideTimeoutValue, destination,
                      brokerURL });
        }
        return overrideTimeoutValue;
      } catch (NumberFormatException e) {
        /* ignore. use the default */}
    } else {
      // endpointConnection.setInactivityTimeout(INACTIVITY_TIMEOUT); // If the connection is not
      // used within this interval it will be removed
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME)
                .logrb(
                        Level.FINE,
                        CLASS_NAME.getName(),
                        "getEndpointConnection",
                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAJMS_connection_timeout__FINE",
                        new Object[] { analysisEngineController, INACTIVITY_TIMEOUT, destination,
                            brokerURL });
      }
    }
    return (int) INACTIVITY_TIMEOUT; // default
  }
  /**
   * Stop JMS connection and close all sessions associated with this connection
   * 
   * @param brokerConnectionEntry
   */
  private void invalidateConnectionAndEndpoints(BrokerConnectionEntry brokerConnectionEntry ) {
    Connection conn = brokerConnectionEntry.getConnection();
    try {
    	if ( conn != null)
       if ( conn != null && !((ActiveMQConnection)conn).isClosed()) {
           for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
                   .entrySet()) {
              endpoints.getValue().close(); // close session and producer
           }
           brokerConnectionEntry.getConnection().close();
           brokerConnectionEntry.setConnection(null);
       }
    } catch (Exception e) {
    	e.printStackTrace();
      // Ignore this for now. Attempting to close connection that has been closed
      // Ignore we are shutting down
    } finally {
       brokerConnectionEntry.endpointMap.clear();
       connectionMap.remove(brokerConnectionEntry.getBrokerURL());
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(
                 Level.FINE,
                 CLASS_NAME.getName(),
                 "invalidateConnectionAndEndpoints",
                 JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAJMS_service_closing_connection__FINE",
                 new Object[] { getAnalysisEngineController().getComponentName(),
                   brokerConnectionEntry.getBrokerURL() });
       }
    }
    brokerConnectionEntry.setConnection(null);

  }
  private String getDestinationName(Endpoint anEndpoint) {
    String destination = anEndpoint.getEndpoint();
    if (anEndpoint.getDestination() != null
            && anEndpoint.getDestination() instanceof ActiveMQDestination) {
      destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
    }
    return destination;
  }
  private String getLookupKey(Endpoint anEndpoint) {
    String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
    String destination = getDestinationName(anEndpoint);
    if ( anEndpoint.getDelegateKey() != null ) {
      key = anEndpoint.getDelegateKey() + "-"+destination;
    } else {
      key = "Client-"+destination;
    }
    return key;
  }
  private BrokerConnectionEntry createConnectionEntry(String brokerURL)  {
    BrokerConnectionEntry brokerConnectionEntry = new BrokerConnectionEntry();
    connectionMap.put(brokerURL, brokerConnectionEntry);
    ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
    connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
    brokerConnectionEntry.setConnectionTimer(connectionTimer);
    brokerConnectionEntry.setBrokerURL(brokerURL);
    return brokerConnectionEntry;
  }
  /**
   * Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
   * {@link Endpoint} The endpoint identifies the destination that should receive the message. This
   * method references a cache that stores active connections. Active connections are those that are
   * fully bound and being used for communication. The key to locate the entry in the connection
   * cache is the queue name + broker URI. This uniquely identifies the destination. If an entry
   * does not exist in the cache, this routine will create a new connection, initialize it, and
   * cache it for future use. The cache is purely for optimization, to prevent opening a connection
   * for every message which is a costly operation. Instead the connection is opened, cached and
   * reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and uses a
   * timer to make sure stale connection are removed. If a connection is not used in a given time
   * interval, the connection is considered stale and is dropped from the cache.
   * 
   * @param anEndpoint
   *          - endpoint configuration containing connection information to a destination
   * @return -
   * @throws AsynchAEException
   */
  private synchronized JmsEndpointConnection_impl getEndpointConnection(Endpoint anEndpoint)
          throws AsynchAEException, ServiceShutdownException, ConnectException {
	  
    try {
      controllerLatch.await();
    } catch (InterruptedException e) {
    }
    JmsEndpointConnection_impl endpointConnection = null;
    String brokerConnectionURL = null;
    
    
    try {
        connectionSemaphore.acquire();
        //  If sending a Free Cas Request to a remote Cas Multiplier always use the CM's
        //  broker
        if ( anEndpoint.isFreeCasEndpoint() && anEndpoint.isCasMultiplier() && anEndpoint.isReplyEndpoint()) {
          brokerConnectionURL = anEndpoint.getServerURI();
        } else {
          //  If this is a reply to a client, use the same broker URL that manages this service input queue.
          //  Otherwise this is a request so use a broker specified in the endpoint object.
          brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
          
        }
        String key = getLookupKey(anEndpoint);
        String destination = getDestinationName(anEndpoint);

        // First get a Map containing destinations managed by a broker provided by the client
        BrokerConnectionEntry brokerConnectionEntry = null;
        boolean startInactivityReaperTimer = false;
        if (connectionMap.containsKey(brokerConnectionURL)) {
          brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
          // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
          // the null check just in case
          if (brokerConnectionEntry == null) {
            throw new AsynchAEException("Controller:"
                    + getAnalysisEngineController().getComponentName()
                    + " Unable to Lookup Broker Connection For URL:" + brokerConnectionURL);
          } 
          brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
          if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
        	  brokerConnectionEntry.getConnectionTimer().cancelTimer();
            invalidateConnectionAndEndpoints(brokerConnectionEntry);
            brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
            startInactivityReaperTimer = true;
//            System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
          }
        } else {
          brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
          //System.out.println("---------------- New Broker "+brokerConnectionURL);
//          System.out.println(">>>>>> Connection Map Size:"+connectionMap.size());
//          long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
          startInactivityReaperTimer = true;
        }
        if ( startInactivityReaperTimer ) {
            long inactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
            brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
            // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
            // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
            // closed.
            brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
        }
        
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "getEndpointConnection",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_acquiring_connection_to_endpoint__FINE",
                  new Object[] { getAnalysisEngineController().getComponentName(), destination,
                    brokerConnectionURL });
        }

        // check the cache first
        if (!brokerConnectionEntry.endpointExists(key)) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_create_new_connection__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
          
          Endpoint masterEndpoint = null;
          if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController ) {
            //  Check if the endpoint has previously FAILED. It may have been marked as FAILED
            //  due to a temp queue listener shutdown caused by a Broker failure. In such case,
            //  before sending a message to a remote delegate we need to start a new instance
            //  of a listener which creates a new temp reply queue. 
            if ( !anEndpoint.isReplyEndpoint() ) {  // this just means that we are not sending reply to a client
              //  The masterEndoint has the most current state. The 'anEndpoint' instance passed into
              //  this method is a clone from the master made at the begining of processing. The master endpoint
              //  may have been marked as FAILED after the clone was made
              masterEndpoint = ((AggregateAnalysisEngineController) getAnalysisEngineController()).
                        lookUpEndpoint(anEndpoint.getDelegateKey(),false);
              if ( masterEndpoint != null ) {
                //  Only one thread at a time is allowed here.
                synchronized( masterEndpoint ) {
                  if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
                	  
                	String name =  anEndpoint.getDestination().toString();
                    //  Returns InputChannel if the Reply Listener for the delegate has previously failed.
                    //  If the listener hasnt failed the getReplyInputChannel returns null
//                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDelegateKey());
                    InputChannel iC = getAnalysisEngineController().getReplyInputChannel(anEndpoint.getDestination().toString());
                    if ( iC != null ) { 
                      try {
                        // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
                    	// Also resets endpoint status to OK  
                        iC.createListener(anEndpoint.getDelegateKey(), anEndpoint);
                        iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());
                      } catch( Exception exx) { 
                        throw new AsynchAEException(exx);
                      }
                    } else{
                    	throw new AsynchAEException("Aggregate:"+getAnalysisEngineController()+" Has not yet recovered a listener for delegate: "+anEndpoint.getDelegateKey());
                    }
                  } else if ( !masterEndpoint.isFreeCasEndpoint() ) {
                    //  In case this thread blocked while the reply queue listener was created, make sure
                    //  that this endpoint uses the most up-date reply queue destination
                    anEndpoint.setDestination(masterEndpoint.getDestination());
                  }
                }
              }
            }
          }
          
          endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
                  getAnalysisEngineController());
          brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
//          long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
//          brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
//          // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
//          // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
//          // closed.
//          brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());

          // Connection is not in the cache, create a new connection, initialize it and cache it
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                    "getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_open_new_connection_to_endpoint__FINE",
                    new Object[] { getDestinationName(anEndpoint), brokerConnectionURL });
          }

          /**
           * Open connection to a broker, create JMS session and MessageProducer
           */
          endpointConnection.open();

          brokerConnectionEntry.getConnectionTimer().setConnectionCreationTimestamp(
                  endpointConnection.connectionCreationTimestamp);

          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_connection_opened_to_endpoint__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
        } else {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                    Level.FINE,
                    CLASS_NAME.getName(),
                    "getEndpointConnection",
                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_reusing_existing_connection__FINE",
                    new Object[] { getAnalysisEngineController().getComponentName(), destination,
                      brokerConnectionURL });
          }
          // Retrieve connection from the connection cache
          endpointConnection = brokerConnectionEntry.getEndpointConnection(key);
          // check the state of the connection and re-open it if necessary
          if (endpointConnection != null && !endpointConnection.isOpen()) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                      "getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_connection_closed_reopening_endpoint__FINE",
                      new Object[] { destination });
            }
            endpointConnection.open();
            if ( endpointConnection.isOpen()) {
                brokerConnectionEntry.getConnectionTimer()
                .setConnectionCreationTimestamp(System.nanoTime());
                if ( getAnalysisEngineController() instanceof AggregateAnalysisEngineController &&
                		anEndpoint.getDelegateKey() != null ) {
                	Endpoint masterEndpoint = 
                		((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(
                				anEndpoint.getDelegateKey(), false);
                	masterEndpoint.setStatus(Endpoint.OK);
                }
            } 
          }
        }

    
    
    }
    catch (InterruptedException e) {
      } finally {
	    connectionSemaphore.release();	  
      }
    
    return endpointConnection;
  }

  private void logRequest(String key, Endpoint anEndpoint ) {
    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
              "sendRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
              "key", new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getDelegateKey() });
    }

  }
  private Delegate startGetMetaTimerAndGetDelegate( Endpoint anEndpoint ) {
    Delegate delegate = null;
    if (anEndpoint.getDestination() != null) {
      String replyQueueName = ((ActiveMQDestination) anEndpoint.getDestination())
              .getPhysicalName().replaceAll(":", "_");
      if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
        String delegateKey = ((AggregateAnalysisEngineController) getAnalysisEngineController())
                .lookUpDelegateKey(anEndpoint.getEndpoint());
        ServiceInfo serviceInfo = ((AggregateAnalysisEngineController) getAnalysisEngineController())
                .getDelegateServiceInfo(delegateKey);
        if (serviceInfo != null) {
          serviceInfo.setReplyQueueName(replyQueueName);
          serviceInfo.setServiceKey(delegateKey);
        }
        delegate = lookupDelegate(delegateKey);
        if (delegate.getGetMetaTimeout() > 0) {
          delegate.startGetMetaRequestTimer();
        }
      }
    } 
    return delegate;
  }
  /**
   * Sends request message to a delegate.
   * 
   * @param aCommand
   *          - the type of request [Process|GetMeta]
   * @param anEndpoint
   *          - the destination where the delegate receives messages
   * 
   * @throws AsynchAEException
   */
  public void sendRequest(int aCommand, String aCasReferenceId, Endpoint anEndpoint) 
  throws AsynchAEException {
    Delegate delegate = null;
    try {
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);

      Message tm = endpointConnection.produceTextMessage("");
      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
      switch(aCommand) {
        case AsynchAEMessage.CollectionProcessComplete:
          logRequest("UIMAEE_send_cpc_req__FINE", anEndpoint);
        break;
        case AsynchAEMessage.ReleaseCAS:
          tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
          logRequest("UIMAJMS_releasecas_request__endpoint__FINEST", anEndpoint);
        break;
        case AsynchAEMessage.GetMeta:
          delegate = startGetMetaTimerAndGetDelegate(anEndpoint);
          logRequest("UIMAEE_service_sending_getmeta_request__FINE", anEndpoint);
        break;
        case AsynchAEMessage.Stop:
          tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
          logRequest("UIMAEE_service_sending_stop_request__FINE", anEndpoint);
        break;
        
        case AsynchAEMessage.Process:
          logRequest("UIMAEE_service_sending_process_request__FINE", anEndpoint);
          serializeCasAndSend(getAnalysisEngineController().
                  getInProcessCache().
                    getCacheEntryForCAS(aCasReferenceId), anEndpoint);
          return;  /// <<<<< RETURN - Done here >>>>
          
        
      };

      populateHeaderWithRequestContext(tm, anEndpoint, aCommand);

      // For remotes add a special property to the message. This property
      // will be echoed back by the service. This property enables matching
      // the reply with the right endpoint object managed by the aggregate.
       tm.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getServerURI());

      if (endpointConnection.send(tm, 0, true) != true) {
        throw new ServiceNotFoundException();
      }

    } catch (AsynchAEException e) {
      throw e;
    } catch (Exception e) {
      if (delegate != null && aCommand == AsynchAEMessage.GetMeta) {
        delegate.cancelDelegateGetMetaTimer();
      }
      // Handle the error
      ErrorContext errorContext = new ErrorContext();
      errorContext.add(AsynchAEMessage.Command, aCommand);
      errorContext.add(AsynchAEMessage.Endpoint, anEndpoint);
      getAnalysisEngineController().getErrorHandlerChain().handle(e, errorContext,
              getAnalysisEngineController());
    }
  }
  
  private void serializeCasAndSend(CacheEntry entry, Endpoint anEndpoint) throws Exception {
    if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
      String serializedCAS = getSerializedCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
              anEndpoint.isRetryEnabled());
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.FINEST,
                CLASS_NAME.getName(),
                "sendRequest",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_sending_serialized_cas__FINEST",
                new Object[] { getAnalysisEngineController().getComponentName(),
                    anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
      }
      // Send process request to remote delegate and start timeout timer
      sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);
    } else {
      byte[] serializedCAS = getBinaryCasAndReleaseIt(false, entry.getCasReferenceId(), anEndpoint,
              anEndpoint.isRetryEnabled());
      if (UIMAFramework.getLogger().isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.FINEST,
                CLASS_NAME.getName(),
                "sendRequest",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_sending_binary_cas__FINEST",
                new Object[] { getAnalysisEngineController().getComponentName(),
                    anEndpoint.getDelegateKey(), entry.getCasReferenceId(), serializedCAS });
      }
      
      // Send process request to remote delegate and start timeout timer
      sendCasToRemoteEndpoint(true, serializedCAS, entry, anEndpoint, true);

    }

  }


  public void sendReply(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
	  try {
      anEndpoint.setReplyEndpoint(true);
      if (anEndpoint.isRemote()) {
        if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
          // Serializes CAS and releases it back to CAS Pool
          String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint,
                  anEndpoint.isRetryEnabled());
          sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false);
        } else {
          byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint
                  .isRetryEnabled());
          if (binaryCas == null) {
            return;
          }
          sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false);
        }

      } else {
        // Not supported
      }
    } catch (ServiceShutdownException e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
    } catch (AsynchAEException e) {
      throw e;
    }

    catch (Exception e) {
      throw new AsynchAEException(e);
    }

  }
  public void sendReply(int aCommand, Endpoint anEndpoint, String aCasReferenceId, boolean notifyOnJmsException)
  throws AsynchAEException {
	    try {
	        if (aborting) {
	          return;
	        }
	        anEndpoint.setReplyEndpoint(true);
	        JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);

	        TextMessage tm = endpointConnection.produceTextMessage("");
	        tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
	        populateHeaderWithResponseContext(tm, anEndpoint, aCommand);
	        if ( aCasReferenceId != null ) {
		        tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
	        }

	        // If this service is a Cas Multiplier add to the message a FreeCasQueue.
	        // The client may need send Stop request to that queue.
	        if (aCommand == AsynchAEMessage.ServiceInfo
	                && getAnalysisEngineController().isCasMultiplier() ) {
	          if ( freeCASTempQueue != null ) {
		        	// Attach a temp queue to the outgoing message. This a queue where
		          // Free CAS notifications need to be sent from the client
		          tm.setJMSReplyTo(freeCASTempQueue);
	          }
	          // new services will receive FreeCas request via a targeted queue
	          StringBuffer selector = new StringBuffer().
	        		  append("TargetServiceId = ").
	        		  append("'").append(hostIP).append(":").
	        		  append(getAnalysisEngineController().getPID()).
	        		  append("' AND").
	        		  append(UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);
	          tm.setStringProperty(AsynchAEMessage.TargetingSelector,selector.toString());
	
	        }
	        //	Check if there was a failure while sending a message
	        if ( !endpointConnection.send(tm, 0, false, notifyOnJmsException) && notifyOnJmsException ) {
	        	throw new JMSException("JMS Send Failed. Check UIMA Log For Details.");
	        }
          if ( aCasReferenceId != null && aCommand == AsynchAEMessage.ServiceInfo) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(
                      Level.FINE,
                      CLASS_NAME.getName(),
                      "sendReply",
                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_sent_ack_message__FINE",
                      new Object[] {aCasReferenceId});
            }
          }


	        addIdleTime(tm);
	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
	          UIMAFramework.getLogger(CLASS_NAME).logrb(
	                  Level.FINE,
	                  CLASS_NAME.getName(),
	                  "sendReply",
	                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
	                  "UIMAJMS_cpc_reply_sent__FINE",
	                  new Object[] { getAnalysisEngineController().getComponentName(),
	                      anEndpoint.getEndpoint() });
	        }
	      } catch (JMSException e) {
	        if ( notifyOnJmsException ) {
	        	throw new AsynchAEException(e);
	        }
	        // Unable to establish connection to the endpoint. Log it and continue
	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
	                  "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
	                  "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "sendReply",
	                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
	                 e);
	        }
	      }

	      catch (ServiceShutdownException e) {
	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
	          
	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
	                  "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
	                  "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
	                  "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
	                  "UIMAJMS_exception__WARNING", e);
	        }
	      }

	      catch (AsynchAEException e) {
	        throw e;
	      } catch (Exception e) {
	        throw new AsynchAEException(e);
	      }

  }

  /**
   * Sends JMS Reply Message to a given endpoint. The reply message contains given Throwable (with
   * full stack)
   * 
   * @param t
   *          - Throwable to include in the reply message
   * @param anEndpoint
   *          - an endpoint to receive the reply message
   * @param aCasReferenceId
   *          - a unique CAS reference id
   * 
   * @throws AsynchAEException
   */
  public void sendReply(Throwable t, String aCasReferenceId, String aParentCasReferenceId,
          Endpoint anEndpoint, int aCommand) throws AsynchAEException {
    anEndpoint.setReplyEndpoint(true);
    try {
      Throwable wrapper = null;
      if (!(t instanceof UimaEEServiceException)) {
        // Strip off AsyncAEException and replace with UimaEEServiceException
        if (t instanceof AsynchAEException && t.getCause() != null) {
          wrapper = new UimaEEServiceException(t.getCause());
        } else {
          wrapper = new UimaEEServiceException(t);
        }
      }
      if (aborting) {
        return;
      }
      anEndpoint.setReplyEndpoint(true);
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);
      // Create Message that will contain serialized Exception with stack
      ObjectMessage om = endpointConnection.produceObjectMessage();
      //  Now try to catch non-serializable exception. The Throwable passed into this method may
      //  not be serializable. Catch the exception, and create a wrapper containing stringified  
      //  stack trace.
      try {
          //  serialize the Throwable
          if (wrapper == null) {
            om.setObject(t);
          } else {
            om.setObject(wrapper);
          }
      } catch( RuntimeException e) {
          //  Check if we failed due to non-serializable object in the Throwable
          if ( e.getCause() != null && e.getCause() instanceof NotSerializableException ) {
            //  stringify the stack trace
            StringWriter sw = new StringWriter();
            t.printStackTrace(new PrintWriter(sw));
            wrapper = new UimaEEServiceException(sw.toString());
            //  serialize the new wrapper
            om.setObject(wrapper);
          } else {
            throw e;  // rethrow
          }
        } catch( Exception e) {
          throw e; // rethrow
      }
        // Add common header properties
      populateHeaderWithResponseContext(om, anEndpoint, aCommand); // AsynchAEMessage.Process);

      om.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
      if (aCasReferenceId != null) {
        om.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
        if (aParentCasReferenceId != null) {
          om.setStringProperty(AsynchAEMessage.InputCasReference, aParentCasReferenceId);
        }
      }

      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_exception__FINE",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
      // Dispatch Message to destination
      endpointConnection.send(om, 0, false);
      addIdleTime(om);
    } catch (JMSException e) {
      // Unable to establish connection to the endpoint. Logit and continue
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
    } catch (ServiceShutdownException e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
    } catch (AsynchAEException e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
    } catch (Exception e) {
      throw new AsynchAEException(e);
    }
  }

  /**
   * 
   * @param aProcessingResourceMetadata
   * @param anEndpoint
   * @param serialize
   * @throws AsynchAEException
   */
  public void sendReply(ProcessingResourceMetaData aProcessingResourceMetadata,
          Endpoint anEndpoint, boolean serialize) throws AsynchAEException {
    if (aborting) {
      return;
    }
    long msgSize = 0;

    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    try {
    	
    	
      anEndpoint.setReplyEndpoint(true);
      // Initialize JMS connection to given endpoint
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);

      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_produce_txt_msg__FINE",
                new Object[] {});
      }
      TextMessage tm = endpointConnection.produceTextMessage("");

      // Collocated Aggregate components dont send metadata just empty reply
      // Such aggregate has merged its typesystem already since it shares
      // CasManager with its parent
      if (serialize) {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                  "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_serializing_meta__FINE", new Object[] {});
        }
        // Serialize metadata
        aProcessingResourceMetadata.toXML(bos);
        tm.setText(bos.toString());
        msgSize = bos.toString().length();
      }

      tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Metadata);
      // This service supports Binary Serialization
      // change comment on next 2 lines to install compressed serialization as the capability
      tm.setIntProperty(AsynchAEMessage.SERIALIZATION, AsynchAEMessage.BINARY_SERIALIZATION);
//      tm.setIntProperty(AsynchAEMessage.SERIALIZATION, AsynchAEMessage.BINARY_COMPRESSED_FILTERED_SERIALIZATION);

      populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.GetMeta);
      if (freeCASTempQueue != null) {
        // Attach a temp queue to the outgoing message. This a queue where
        // Free CAS notifications need to be sent from the client
        tm.setJMSReplyTo(freeCASTempQueue);
      }

      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_metadata_reply__endpoint__FINEST",
                new Object[] { serviceInputEndpoint, anEndpoint.getEndpoint() });
      }
      endpointConnection.send(tm, msgSize, false);
    } catch (JMSException e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
      // Unable to establish connection to the endpoint. Log it and continue
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendReply",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
    }

    catch (ServiceShutdownException e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
    } catch (Exception e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
      throw new AsynchAEException(e);
    } finally {
      try {
        bos.close();
      } catch (Exception e) {
      }
    }
  }

  private byte[] getBinaryCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
          boolean cacheSerializedCas) throws Exception {
    CAS cas = null;
    try {
      byte[] serializedCAS = null;
      // Using Cas reference Id retrieve CAS from the shared Cash
      cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
      ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
      CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
              aCasReferenceId);
      long t1 = getAnalysisEngineController().getCpuTime();
      // Serialize CAS for remote Delegates
      SerialFormat serializerType = anEndpoint.getSerialFormat();
      if (cas == null || entry == null) {
        return null;
      }
      //  Fetch dedicated Serializer associated with this thread
      UimaSerializer serializer = SerializerCache.lookupSerializerByThreadId();

      if (serializerType == SerialFormat.BINARY || serializerType == SerialFormat.COMPRESSED_FILTERED) {
        
        if (entry.acceptsDeltaCas() && isReply) {
          if (entry.getMarker() != null && entry.getMarker().isValid()) {
            if (serializerType == SerialFormat.COMPRESSED_FILTERED) {
              serializedCAS = serializer.serializeCasToBinary6(cas, entry.getMarker(), entry.getCompress6ReuseInfo());  
            } else {
              serializedCAS = serializer.serializeCasToBinary(cas, entry.getMarker());
            }
            entry.setSentDeltaCas(true);
          } else {
            if (serializerType == SerialFormat.COMPRESSED_FILTERED) {
              serializedCAS = serializer.serializeCasToBinary6(cas); 
            } else {
              serializedCAS = serializer.serializeCasToBinary(cas);
            }
            entry.setSentDeltaCas(false);
          }
        } else {
          // either is a reply to a caller not accepting delta, or
          //        is not a reply
          if (serializerType == SerialFormat.COMPRESSED_FILTERED) {
            if (isReply) {
              serializedCAS = serializer.serializeCasToBinary6(cas);  // never called? 
            } else {
              serializedCAS = serializer.serializeCasToBinary6(cas, entry, anEndpoint.getTypeSystemImpl());
            }
          } else {
            serializedCAS = serializer.serializeCasToBinary(cas);
          }
          entry.setSentDeltaCas(false);
          if (isReply) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getBinaryCas",
                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_serialize_cas__FINEST",
                        new Object[] { aCasReferenceId, "FULL Cas serialized and sent." });
            }
          }
        }
        // create a fresh marker
        if (entry.getMarker() != null && !entry.getMarker().isValid()) {
          entry.setMarker(cas.createMarker());
        }

      } else {
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.INFO,
                  CLASS_NAME.getName(),
                  "getBinaryCas",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_invalid_serializer__WARNING",
                  new Object[] { getAnalysisEngineController().getName(), serializerType,
                      anEndpoint.getEndpoint() });
        }
        throw new UimaEEServiceException("Invalid Serializer:" + serializerType + " For Endpoint:"
                + anEndpoint.getEndpoint());
      }
      long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;

      getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);

      entry.incrementTimeToSerializeCAS(timeToSerializeCas);
      casStats.incrementCasSerializationTime(timeToSerializeCas);
      getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
              timeToSerializeCas);
      return serializedCAS;
    } catch (Exception e) {
      throw new AsynchAEException(e);
    }

  }

  private String getSerializedCas(boolean isReply, String aCasReferenceId, Endpoint anEndpoint,
          boolean cacheSerializedCas) throws Exception {
    CAS cas = null;
    try {
      String serializedCAS = null;
      // Using Cas reference Id retrieve CAS from the shared Cash
      cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId);
      ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId);
      if (cas == null) {
        serializedCAS = getAnalysisEngineController().getInProcessCache().getSerializedCAS(
                aCasReferenceId);
      } else {
        CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(
                aCasReferenceId);
        long t1 = getAnalysisEngineController().getCpuTime();
        // Serialize CAS for remote Delegates
        SerialFormat serializer = anEndpoint.getSerialFormat();
        if (serializer == null) {
          serializer = SerialFormat.XMI;
        }
        serializedCAS = serializeCAS(isReply, cas, aCasReferenceId, serializer);
        long timeToSerializeCas = getAnalysisEngineController().getCpuTime() - t1;
        getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas);

        entry.incrementTimeToSerializeCAS(timeToSerializeCas);
        casStats.incrementCasSerializationTime(timeToSerializeCas);
        getAnalysisEngineController().getServicePerformance().incrementCasSerializationTime(
                timeToSerializeCas);
        if (cacheSerializedCas) {
          getAnalysisEngineController().getInProcessCache().saveSerializedCAS(aCasReferenceId,
                  serializedCAS);
        }
      }
      return serializedCAS;
    } catch (Exception e) {
      throw new AsynchAEException(e);
    }
  }

  private byte[] getBinaryCasAndReleaseIt(boolean isReply, String aCasReferenceId,
          Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
    try {
      return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
    } catch (Exception e) {
      throw new AsynchAEException(e);
    } finally {
      if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
              && anEndpoint.isRemote()) {
        getAnalysisEngineController().dropCAS(aCasReferenceId, true);
      }
    }
  }

  private String getSerializedCasAndReleaseIt(boolean isReply, String aCasReferenceId,
          Endpoint anEndpoint, boolean cacheSerializedCas) throws Exception {
    try {
      return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas);
    } catch (Exception e) {
      throw new AsynchAEException(e);
    } finally {
      if (getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController
              && anEndpoint.isRemote()) {
        getAnalysisEngineController().dropCAS(aCasReferenceId, true);
      }
    }
  }

  private void populateStats(Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId,
          int anAdminCommand, boolean isRequest) throws Exception {
    if (anEndpoint.isFinal()) {
      aTextMessage.setLongProperty("SENT-TIME", System.nanoTime());
    }

    if (anAdminCommand == AsynchAEMessage.Process) {
      if (isRequest) {
        long departureTime = System.nanoTime();
        getAnalysisEngineController().saveTime(departureTime, aCasReferenceId,
                anEndpoint.getEndpoint());
      } else {
        try {
          if ( getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
            CacheEntry entry = 
                    getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
            //  getDelegateMetrics returns an empty list if no metrics are found

            List<AnalysisEnginePerformanceMetrics> m = entry.getDelegateMetrics();

            if ( m.size() > 0 ) {
                String serializedPerformanceMetrics = "";

                try {
		          xstreamLock.lock();
		          serializedPerformanceMetrics = xstream.toXML(m); 
		          aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics, 
						   serializedPerformanceMetrics);
		         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
			        System.out.println("Thread:"+Thread.currentThread().getId()+" XStream Serialized Metrics:::\n"+serializedPerformanceMetrics);
		         }
                } catch( Exception e) {
		    throw e;
                } finally {
		          xstreamLock.unlock();
                }

		//              aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics, 
                //      xstream.toXML(entry.getDelegateMetrics()));
            }
          }
        } catch( Exception ex) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
                  "populateStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAEE_exception__WARNING", ex);        
        }
        ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(
                aCasReferenceId);

        aTextMessage.setLongProperty(AsynchAEMessage.TimeToSerializeCAS, casStats
                .getRawCasSerializationTime());
        aTextMessage.setLongProperty(AsynchAEMessage.TimeToDeserializeCAS, casStats
                .getRawCasDeserializationTime());
        aTextMessage.setLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats
                .getRawAnalysisTime());
        aTextMessage.setLongProperty(AsynchAEMessage.TimeWaitingForCAS,
                getAnalysisEngineController().getServicePerformance().getTimeWaitingForCAS());
        long iT = getAnalysisEngineController().getIdleTimeBetweenProcessCalls(
                AsynchAEMessage.Process);
        aTextMessage.setLongProperty(AsynchAEMessage.IdleTime, iT);
        String lookupKey = getAnalysisEngineController().getName();
        long arrivalTime = getAnalysisEngineController().getTime(aCasReferenceId, lookupKey); // serviceInputEndpoint);
        long timeInService = getAnalysisEngineController().getCpuTime() - arrivalTime;
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                  "populateStats", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_timein_service__FINEST",
                  new Object[] { serviceInputEndpoint, (double) timeInService / (double) 1000000 });
        }
      }
    }
  }

  private long getCommandTimeoutValue(Endpoint anEndpoint, int aCommand) {
    switch (aCommand) {
      case AsynchAEMessage.GetMeta:
        return anEndpoint.getMetadataRequestTimeout();
      case AsynchAEMessage.Process:
        return anEndpoint.getProcessRequestTimeout();
    }
    return 0; // no match for the command
  }

  /**
   * Adds Request specific properties to the JMS Header.
   * 
   * @param aMessage
   * @param anEndpoint
   * @param aCommand
   * @throws Exception
   */
  private void populateHeaderWithRequestContext(Message aMessage, Endpoint anEndpoint, int aCommand)
          throws Exception {
	  aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
    aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
    // TODO override default based on system property
    aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);
    long timeout = getCommandTimeoutValue(anEndpoint, aCommand);
    // If the timeout is defined in the Deployment Descriptor and
    // the service is configured to use time to live (TTL), add
    // JMS message expiration time. The TTL is by default always
    // added to the message. To override this add "-DNoTTL" to the
    // command line.
    if (timeout > 0 && addTimeToLive) {
      Delegate delegate = lookupDelegate(anEndpoint.getDelegateKey());
      long ttl = timeout;
      // How many CASes are in the list of CASes pending reply for this delegate
      int currentOutstandingCasListSize = delegate.getCasPendingReplyListSize();
      if (currentOutstandingCasListSize > 0) {
        // increase the time-to-live
        ttl *= currentOutstandingCasListSize;
      }
      aMessage.setJMSExpiration(ttl);
    }
    if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
      aMessage.setStringProperty(AsynchAEMessage.MessageFrom, controllerInputEndpoint);
      if (anEndpoint.isRemote()) {
        String protocol = serviceProtocolList;
        if (anEndpoint.getServerURI().trim().toLowerCase().startsWith("http")
                || (anEndpoint.getReplyToEndpoint() != null && anEndpoint.getReplyToEndpoint()
                        .trim().length() > 0)) {
          protocol = anEndpoint.getServerURI().trim();
          // protocol = extractURLWithProtocol(serviceProtocolList, "http");

          // get the replyto endpoint name
          String replyTo = anEndpoint.getReplyToEndpoint();
          if (replyTo == null && anEndpoint.getDestination() == null) {
            throw new AsynchAEException(
                    "replyTo endpoint name not specified for HTTP-based endpoint:"
                            + anEndpoint.getEndpoint());
          }
          if (replyTo == null) {
            replyTo = "";
          }
          aMessage.setStringProperty(AsynchAEMessage.MessageFrom, replyTo);

        }
        Object destination;
        
        if ((destination = anEndpoint.getDestination()) != null) {
          aMessage.setJMSReplyTo((Destination) destination);
          aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
        } else {
          aMessage.setStringProperty(UIMAMessage.ServerURI, protocol);
        }

        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "populateHeaderWithRequestContext",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_sending_new_msg_to_remote_FINE",
                  new Object[] { getAnalysisEngineController().getComponentName(),
                      anEndpoint.getServerURI(), anEndpoint.getEndpoint() });
        }
      } else // collocated
      {
        aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
      }
    }
  }

  /**
   * Adds Response specific properties to the JMS Header
   * 
   * @param aMessage
   * @param anEndpoint
   * @param aCommand
   * @throws Exception
   */
  private void populateHeaderWithResponseContext(Message aMessage, Endpoint anEndpoint, int aCommand)
          throws Exception {
    aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Response);
    aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
    aMessage.setStringProperty(AsynchAEMessage.MessageFrom, serviceInputEndpoint);

    if (anEndpoint.isRemote()) {
      aMessage.setStringProperty(UIMAMessage.ServerURI, getServerURI());
      if (hostIP != null) {
        aMessage.setStringProperty(AsynchAEMessage.ServerIP, hostIP);
        if ( getAnalysisEngineController() != null ) {
          aMessage.setStringProperty(AsynchAEMessage.UimaASProcessPID, getAnalysisEngineController().getPID()+":"+Thread.currentThread().getId());
        }
      }
      if (anEndpoint.getEndpointServer() != null) {
        aMessage.setStringProperty(AsynchAEMessage.EndpointServer, anEndpoint.getEndpointServer());
      }
    } else {
      aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());
    }
  }

  public AnalysisEngineController getAnalysisEngineController() {
    return analysisEngineController;
  }

  public void setController(AnalysisEngineController analysisEngineController) {
    this.analysisEngineController = analysisEngineController;
    controllerLatch.countDown();
  }

  public String getControllerInputEndpoint() {
    return controllerInputEndpoint;
  }

  public void setControllerInputEndpoint(String controllerInputEndpoint) {
    this.controllerInputEndpoint = controllerInputEndpoint;
  }

  private void dispatch(Message aMessage, Endpoint anEndpoint, CacheEntry entry, boolean isRequest,
          JmsEndpointConnection_impl endpointConnection, long msgSize) throws Exception {
	  
	  if ( anEndpoint == null ) { 
		  throw new UimaEEServiceException("UIMA AS service:"+getAnalysisEngineController().getComponentName()+" unable to dispatch request to a remote delegate. Endpoint is null. This is an invalid state.");
	  }
    // Add stats
    populateStats(aMessage, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process,
            isRequest);
    //  If this is a reply to a client, use the same broker URL that manages this service input queue.
    //  Otherwise this is a request so use a broker specified in the endpoint object.
    String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();

    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
      UIMAFramework.getLogger(CLASS_NAME).logrb(
              Level.FINE,
              CLASS_NAME.getName(),
              "dispatch",
              JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
              "UIMAJMS_sending_new_msg_to_remote_FINE",
              new Object[] { getAnalysisEngineController().getName(),
                brokerConnectionURL, endpointConnection.getEndpoint() });
    }
    // By default start a timer associated with a connection to the endpoint. Once a connection is
    // established with an
    // endpoint it is cached and reused for subsequent messaging. If the connection is not used
    // within a given interval
    // the timer silently expires and closes the connection. This mechanism is similar to what Web
    // Server does when
    // managing sessions. In case when we want the remote delegate to respond to a temporary queue,
    // which is implied
    // by anEndpoint.getDestination != null, we dont start the timer.
    boolean startConnectionTimer = isRequest ? false : true; // connection time is for replies
    // ----------------------------------------------------
    // Send Request Messsage to the Endpoint
    // ----------------------------------------------------
    // Add the CAS to the delegate's list of CASes pending reply. Do the add before
    // the send to eliminate a race condition where the reply is received (on different
    // thread) *before* the CAS is added to the list.
    if (isRequest) {
      anEndpoint.setWaitingForResponse(true);
      // Add CAS to the list of CASes pending reply
      addCasToOutstandingList(entry, isRequest, anEndpoint.getDelegateKey());
    } else {
      addIdleTime(aMessage);
    }
    // If the send fails it returns false.
    if (endpointConnection.send(aMessage, msgSize, startConnectionTimer) == false) {
      // Failure on sending a request requires cleanup that includes stopping a listener
      // on the delegate that we were unable to send a message to. The delegate state is
      // set to FAILED. If there are retries or more CASes to send to this delegate the
      // connection will be retried. The error handler called from the send() above already
      // removed the CAS from outstanding list.
      if (isRequest && anEndpoint.getDelegateKey() != null) {
    	  Endpoint master= ((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(anEndpoint.getDelegateKey(), false);
    	  master.setStatus(Endpoint.FAILED);
    	  
    	  String key = getLookupKey(anEndpoint);
    	    if (connectionMap.containsKey(brokerConnectionURL)) {
         	  // First get a Map containing destinations managed by a broker provided by the client
    	      BrokerConnectionEntry brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
    	      // check if the broker connection entry contains an endpoint that we failed on while
    	      // sending a message. If it exits, remove the endpoint to allow recovery to work on 
    	      // a subsequent message. 
    	      if ( brokerConnectionEntry != null ) {
        	      if (brokerConnectionEntry.endpointExists(key)) {
          	    	brokerConnectionEntry.removeEndpoint(key);
          	      }
    	      }
    	    } 
        // Spin recovery thread to handle send error. After the recovery thread
        // is started the current (process) thread goes back to a thread pool in
        // ThreadPoolExecutor. The recovery thread can than stop the listener and the
        // ThreadPoolExecutor since all threads are back in the pool. Any retries will
        // be done in the recovery thread.
        RecoveryThread recoveryThread = new RecoveryThread(this, anEndpoint, entry, isRequest,
                getAnalysisEngineController());
        Thread t = new Thread(Thread.currentThread().getThreadGroup().getParent(), recoveryThread);
        t.start();
      } else {
    	  try {
        	  CasStateEntry casStateEntry = getAnalysisEngineController().
				getLocalCache().lookupEntry(entry.getCasReferenceId());
        	  casStateEntry.setDeliveryToClientFailed();   // Mark the CAS, so that later we know that the delivery to client failed
        	  if ( anEndpoint != null ) {
				  // Add the reply destination (temp queue) to a dead client map
            	  Object clientDestination = anEndpoint.getDestination();
        		  if ( clientDestination != null && clientDestination instanceof TemporaryQueue ) {
        			  if ( !getAnalysisEngineController().
                	  		getDeadClientMap().containsKey(clientDestination.toString())) {
                    	  getAnalysisEngineController().
                    	  		getDeadClientMap().
                    	  			put(clientDestination.toString(),clientDestination.toString());
        			  }
        		  }
        	  }
        	  
    	  } catch( Exception e ) {
    		  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
    	                "dispatch", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
    	                "UIMAJMS_exception__WARNING", e);
    	  }
    	  
    	  
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.INFO,
                  CLASS_NAME.getName(),
                  "dispatch",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_send_reply_failed__INFO",
                  new Object[] { getAnalysisEngineController().getComponentName(),
                      brokerConnectionURL, endpointConnection.getEndpoint() });
        }
      }
    }
  }

  private void sendCasToRemoteEndpoint(boolean isRequest, Object aSerializedCAS, CacheEntry entry,
          Endpoint anEndpoint, boolean startTimer) throws AsynchAEException,
          ServiceShutdownException {
    CasStateEntry casStateEntry = null;
    long msgSize = 0;
    try {
      if (aborting) {
        return;
      }
      //  If this is a reply to a client, use the same broker URL that manages this service input queue.
      //  Otherwise this is a request so use a broker specified in the endpoint object.
      String brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();

      casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
              entry.getCasReferenceId());
      if (casStateEntry == null) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(
                Level.WARNING,
                CLASS_NAME.getName(),
                "sendCasToRemoteDelegate",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_unable_to_send_reply__WARNING",
                new Object[] { getAnalysisEngineController().getComponentName(),
                  anEndpoint.getDestination(), brokerConnectionURL, 
                  entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), 
                          entry.getCasReferenceId(), 0, 
                          new Exception("Unable to lookup entry in Local Cache for a given Cas Id")  });
        return;
      }

      // Get the connection object for a given endpoint
      JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint);

      if (!endpointConnection.isOpen()) {
        if (!isRequest) {
          return;
        }
      }
      Message tm = null;
      try {
        if (anEndpoint.getSerialFormat() == SerialFormat.XMI) {
          tm = endpointConnection.produceTextMessage((String)aSerializedCAS);
          if (aSerializedCAS != null) {
            msgSize = ((String)aSerializedCAS).length();
          }
          tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload);
        } else {
          // Create empty JMS Bytes Message
          tm = endpointConnection.produceByteMessage((byte[])aSerializedCAS);
          if (aSerializedCAS != null) {
            msgSize = ((byte[])aSerializedCAS).length;
          }
          tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload);
        }
      } catch (AsynchAEException ex) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.WARNING,
                  CLASS_NAME.getName(),
                  "sendCasToRemoteDelegate",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_unable_to_send_reply__WARNING",
                  new Object[] { getAnalysisEngineController().getComponentName(),
                  	anEndpoint.getDestination(), brokerConnectionURL, entry.getInputCasReferenceId() == null ? "" : entry.getInputCasReferenceId(), entry.getCasReferenceId(), 0, ex  });
        return;
      }
      // Add Cas Reference Id to the outgoing JMS Header
      tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
      // Add common properties to the JMS Header
      if (isRequest == true) {
        populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process);
      } else {
        populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process);   
        tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas());
      }
      // The following is true when the analytic is a CAS Multiplier
      if (casStateEntry.isSubordinate() && !isRequest) {
        // Override MessageType set in the populateHeaderWithContext above.
        // Make the reply message look like a request. This message will contain a new CAS
        // produced by the CAS Multiplier. The client will treat this CAS
        // differently from the input CAS.
        tm.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
        isRequest = true;
        // Save the id of the parent CAS
        tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry
                .getCasReferenceId()));
        // Add a sequence number assigned to this CAS by the controller
        tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence());
        // If this is a Cas Multiplier, add a reference to a special queue where
        // the client sends Free Cas Notifications
        if (freeCASTempQueue != null) {
          // Attach a temp queue to the outgoing message. This is a queue where
          // Free CAS notifications need to be sent from the client
          tm.setJMSReplyTo(freeCASTempQueue);
        }
        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
          UIMAFramework.getLogger(CLASS_NAME).logrb(
                  Level.FINE,
                  CLASS_NAME.getName(),
                  "sendCasToRemoteEndpoint",
                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                  "UIMAJMS_send_cas_to_collocated_service_detail__FINE",
                  new Object[] { getAnalysisEngineController().getComponentName(), "Remote",
                      anEndpoint.getEndpoint(), entry.getCasReferenceId(),
                      entry.getInputCasReferenceId(), entry.getInputCasReferenceId() });
        }
      }
      dispatch(tm, anEndpoint, entry, isRequest, endpointConnection, msgSize);

    } catch (JMSException e) {
      // Unable to establish connection to the endpoint. Logit and continue
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_unable_to_connect__INFO",
                new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint() });
      }
    } catch (ServiceShutdownException e) {
      throw e;
    } catch (AsynchAEException e) {
    	throw e;
    } catch (ConnectException e) {
		  casStateEntry.setDeliveryToClientFailed();
    } catch (Exception e) {
        throw new AsynchAEException(e);
      }

  }

  private Delegate lookupDelegate(String aDelegateKey) {
    if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController) {
      Delegate delegate = ((AggregateAnalysisEngineController) getAnalysisEngineController())
              .lookupDelegate(aDelegateKey);
      return delegate;
    }
    return null;
  }

  private void addCasToOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
    Delegate delegate = null;
    if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
      delegate.addCasToOutstandingList(entry.getCasReferenceId(), entry.getCas().hashCode(), false);  // false=dont start timer thread per CAS
    }
  }

  private void removeCasFromOutstandingList(CacheEntry entry, boolean isRequest, String aDelegateKey) {
    Delegate delegate = null;
    if (isRequest && (delegate = lookupDelegate(aDelegateKey)) != null) {
      delegate.removeCasFromOutstandingList(entry.getCasReferenceId());
    }
  }

  private String getTopParentCasReferenceId(String casReferenceId) throws Exception {
    if (!getAnalysisEngineController().getLocalCache().containsKey(casReferenceId)) {
      return null;
    }
    CasStateEntry casStateEntry = getAnalysisEngineController().getLocalCache().lookupEntry(
            casReferenceId);

    if (casStateEntry.isSubordinate()) {
      // Recurse until the top CAS reference Id is found
      return getTopParentCasReferenceId(casStateEntry.getInputCasReferenceId());
    }
    // Return the top ancestor CAS id
    return casStateEntry.getCasReferenceId();
  }

  private void addIdleTime(Message aMessage) {
    long t = System.nanoTime();
    getAnalysisEngineController().saveReplyTime(t, "");
  }
  public void stop() {
	    stop(Channel.CloseAllChannels, true);
}

  public void stop(boolean shutdownNow) {
	    stop(Channel.CloseAllChannels, shutdownNow);
  }

  public void stop(int channelsToClose, boolean shutdownNow) {
    aborting = true;
    try {
      // Fetch iterator over all Broker Connections. This service may be connected
      // to many brokers. Each broker connection may handle multiple sessions to
      // different reply queues
      Iterator it = connectionMap.keySet().iterator();
      JmsEndpointConnection_impl endpointConnection = null;
      // iterate over connections
      while (it.hasNext()) {
        // The key is the broker URL
        String key = (String) it.next();
        // Fetch a connection object for a given URL
        Object value = connectionMap.get(key);

        if (value instanceof BrokerConnectionEntry) {
          BrokerConnectionEntry brokerConnectionEntry = (BrokerConnectionEntry) value;
          // A connection object may have many endpoint objects. There is a separate
          // endpoint object per reply queue.
          Iterator replyEndpointIterator = brokerConnectionEntry.endpointMap.keySet().iterator();
          // Iterate over endpoints, each representing a reply queue
          while (replyEndpointIterator.hasNext()) {
            // Get endpoint object for a reply queue. The abort() call below
            // just closes a session and a producer. The JMS Connection is closed
            // outside of this while-loop when we clean up all the sessions.
            endpointConnection = brokerConnectionEntry.endpointMap
                    .get(replyEndpointIterator.next());
            // Close the session and the producer
            endpointConnection.abort();
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(
                      Level.INFO,
                      CLASS_NAME.getName(),
                      "stop",
                      JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_forced_endpoint_close__INFO",
                      new Object[] { getAnalysisEngineController().getName(),
                          endpointConnection.getEndpoint(), endpointConnection.getServerUri() });
            }
          }
          // Cancel any pending timers and finally close the JMS Connection to the
          // broker
          if (brokerConnectionEntry != null) {
            if (brokerConnectionEntry.getConnectionTimer() != null) {
              brokerConnectionEntry.getConnectionTimer().cancelTimer();
            }
            if (brokerConnectionEntry.getConnection() != null) {
              try {
                brokerConnectionEntry.getConnection().close();
              } catch (Exception ex) { /* ignore, we are stopping */
              }
            }
          }
        }
      }
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
                JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_output_channel_aborted__INFO",
                new Object[] { getAnalysisEngineController().getName() });
      }
    } catch (Exception e) {
      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "sendReply", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAEE_service_exception_WARNING", getAnalysisEngineController().getComponentName());

        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                "stop", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                "UIMAJMS_exception__WARNING", e);
      }
    }
  }


  public void cancelTimers() {
    if (connectionMap.size() > 0) {
      Iterator<String> it = connectionMap.keySet().iterator();
      while (it.hasNext()) {
        String key = it.next();
        BrokerConnectionEntry ce = (BrokerConnectionEntry) connectionMap.get(key);
        if (ce != null && ce.getConnectionTimer() != null) {
          ce.getConnectionTimer().cancelTimer();
        }
      }
    }
  }

  public static class BrokerConnectionEntry {
    private String brokerURL;

    private Connection connection;

    private ConnectionTimer connectionTimer;

    Map<Object, JmsEndpointConnection_impl> endpointMap = new ConcurrentHashMap<Object, JmsEndpointConnection_impl>();

    public String getBrokerURL() {
      return brokerURL;
    }

    public void setConnectionTimer(ConnectionTimer aConnectionTimer) {
      connectionTimer = aConnectionTimer;
    }

    public ConnectionTimer getConnectionTimer() {
      return connectionTimer;
    }

    public void setBrokerURL(String brokerURL) {
      this.brokerURL = brokerURL;
    }

    public Connection getConnection() {
      return connection;
    }

    public void setConnection(Connection connection) {
      this.connection = connection;
    }

    public void addEndpointConnection(Object key, JmsEndpointConnection_impl endpointConnection) {
      endpointMap.put(key, endpointConnection);
//      System.out.println("----------- Endpoint Map for Endpoint:"+key+" Has:"+endpointMap.size()+" Entries");
    }

    public JmsEndpointConnection_impl getEndpointConnection(Object key) {
      return endpointMap.get(key);
    }

    public boolean endpointExists(Object key) {
      return endpointMap.containsKey(key);
    }

    public void removeEndpoint(Object key) {
      endpointMap.remove(key);
    }
  }

  protected class ConnectionTimer {
    private final Class CLASS_NAME = ConnectionTimer.class;

    private long inactivityTimeout;

    private AnalysisEngineController controller;

    private BrokerConnectionEntry brokerDestinations;

    private long connectionCreationTimestamp;

    private String componentName = "";

    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    private boolean doLog = true;
    
    public ConnectionTimer(BrokerConnectionEntry aBrokerDestinations) {
      brokerDestinations = aBrokerDestinations;
    }

    public synchronized void setInactivityTimeout(long anInactivityTimeout) {
      inactivityTimeout = anInactivityTimeout;
    }

    public void setAnalysisEngineController(AnalysisEngineController aController) {
      controller = aController;
      if (controller != null) {
        componentName = controller.getComponentName();
      }
    }

    public void setConnectionCreationTimestamp(long aConnectionCreationTimestamp) {
      connectionCreationTimestamp = aConnectionCreationTimestamp;
    }

    public void stopSessionReaperTimer() {
      if ( scheduler != null ) {
    	  scheduler.shutdownNow();
      }
    }
    /**
     * Schedules regular cleanup of JMS sessions. A session is cleaned up (closed) if it has not
     * been used in an interval defined by value in inactivityTimeout.
     * 
     * @param aComponentName
     */
    public synchronized void startSessionReaperTimer( String aComponentName) {
        //	Fire the runnable at fixed intervals equal to inactivityTimeout value
        scheduler.scheduleWithFixedDelay(new Runnable(){
            public void run() {
              //  System.out.println("SessionReaper Thread Woke Up After:"+inactivityTimeout*60*1000+" Millis");
                long inactivityThreshold = inactivityTimeout*60*1000;  // normalize into millis
                if ( doLog && UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
                  doLog = false;
                  UIMAFramework.getLogger(CLASS_NAME).logrb(
                          Level.FINE,
                          CLASS_NAME.getName(),
                          "startSessionReaperTimer.run",
                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                          "UIMAJMS_inactivity_timer_expired__FINE",
                          new Object[] { Thread.currentThread().getId(), componentName,
                              inactivityTimeout, brokerDestinations.getBrokerURL() });
                }
                try {
                    connectionSemaphore.acquire();
                    if (brokerDestinations.getConnection() != null
                            && !((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()) {
                      try {
                      	Iterator<Entry<Object, JmsEndpointConnection_impl>> it = 
                      			brokerDestinations.endpointMap.entrySet().iterator();
                      	//	loop through all endpoint wrappers (JmsEndpointConnection_impl) and
                      	//  close sessions which have not been used for a defined (inactivity_timeout)
                      	//  amount of time. 
                      	while( it.hasNext() ) {
                      		Entry<Object, JmsEndpointConnection_impl> value = it.next();
                      		long lastDispatchTime = value.getValue().lastDispatchTimestamp.get();
//                			System.out.println("-------- lastDispatchTime:"+lastDispatchTime+" Delta:"+(System.currentTimeMillis() - lastDispatchTime)+" InactivityThreshold:"+inactivityThreshold);
                      		if ( lastDispatchTime > 0 && (System.currentTimeMillis() - lastDispatchTime) >= inactivityThreshold ) {
                      			value.getValue().close();  // close the jms session
                      			it.remove();
//                    			System.out.println("-------- Closing Session for Destination:"+value.getValue().delegateEndpoint.getDestination());

//                    			System.out.println("-------- Closing Session for Destination:"+value.getValue().delegateEndpoint.getDestination());
//                                UIMAFramework.getLogger(CLASS_NAME).logrb(
//                                        Level.INFO,
//                                        CLASS_NAME.getName(),
//                                        "startTimer",
//                                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
//                                        "UIMAJMS_removed_expired_session__INFO",
//                                        new Object[] { Thread.currentThread().getId(), componentName,
//                                            inactivityTimeout, value.getValue().delegateEndpoint.getDestination(), brokerDestinations.getBrokerURL()  });

                    		}
                      	}
                      } catch (Exception e) {
                      	e.printStackTrace();
                      } finally {
                        try {
                      	  if ( brokerDestinations.endpointMap.isEmpty() ) {
                          	    try {
                          		    brokerDestinations.getConnection().stop();
                                    brokerDestinations.getConnection().close();
                          	    } catch( Exception e) {
                          	    }
                                brokerDestinations.setConnection(null);
                                brokerDestinations.endpointMap.clear();
                                connectionMap.remove(brokerDestinations);
                                stopSessionReaperTimer();

                                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
                                    UIMAFramework.getLogger(CLASS_NAME).logrb(
                                            Level.FINE,
                                            CLASS_NAME.getName(),
                                            "startTimer",
                                            JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                                            "UIMAJMS_closing_broker_connection__FINE",
                                            new Object[] { Thread.currentThread().getId(), componentName,
                                                brokerDestinations.getBrokerURL(),inactivityTimeout  });
                                  }
                      	  }
                        } catch( Exception e) {
                        	UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                                    "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                                    "UIMAJMS_exception__WARNING", e);
                        }
                      }
                    }
                  } catch (Exception e) {
                	  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                              "sendReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                              "UIMAJMS_exception__WARNING", e);
                  } finally {
                	  connectionSemaphore.release();
                  }

            }

        }, inactivityTimeout * 60, inactivityTimeout * 60,TimeUnit.SECONDS);
      }
    private void cancelTimer() {
      if ( scheduler != null ) {
        scheduler.shutdownNow();
      }
    }

    public synchronized void stopTimer() {
      cancelTimer();
    }
  }

  private static class RecoveryThread implements Runnable {
    Endpoint endpoint;

    CacheEntry entry;

    boolean isRequest;

    AnalysisEngineController controller;

    JmsOutputChannel outputChannel;

    public RecoveryThread(JmsOutputChannel channel, Endpoint anEndpoint, CacheEntry anEntry,
            boolean isRequest, AnalysisEngineController aController) {
      endpoint = anEndpoint;
      entry = anEntry;
      controller = aController;
      this.isRequest = isRequest;
      outputChannel = channel;
    }

    public void run() {
      Delegate delegate = outputChannel.lookupDelegate(endpoint.getDelegateKey());
      // Removes the failed CAS from the list of CASes pending reply. This also
      // cancels the timer if this CAS was the oldest pending CAS, and if there
      // are other CASes pending a fresh timer is started.
      outputChannel.removeCasFromOutstandingList(entry, isRequest, endpoint.getDelegateKey());
      if (delegate != null) {
        // Mark this delegate as Failed
        delegate.getEndpoint().setStatus(Endpoint.FAILED);
        // Destroy listener associated with a reply queue for this delegate
        InputChannel ic = controller.getInputChannel(delegate.getEndpoint().getDestination()
                .toString());
        if (ic != null && delegate != null && delegate.getEndpoint() != null) {
          ic.destroyListener(delegate.getEndpoint().getDestination().toString(), endpoint
                  .getDelegateKey());
        }
        // Setup error context and handle failure in the error handler
        ErrorContext errorContext = new ErrorContext();
        errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
        errorContext.add(AsynchAEMessage.CasReference, entry.getCasReferenceId());
        errorContext.add(AsynchAEMessage.Endpoint, endpoint);
        errorContext.handleSilently(true); // dont dump exception to the log
        // Failure on send treat as timeout
        delegate.handleError(new MessageTimeoutException(), errorContext);
      }

    }
  }
}
