/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.uima.aae.jms_adapter;

import java.util.HashMap;
import java.util.Map;

import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.client.UimaASStatusCallbackListener;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.analysis_engine.AnalysisEngineServiceStub;
import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.resource.Parameter;
import org.apache.uima.resource.Resource;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.apache.uima.resource.ResourceServiceException;
import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
import org.apache.uima.resource.metadata.ResourceMetaData;
import org.apache.uima.util.Level;

public class JmsAnalysisEngineServiceStub extends UimaAsBaseCallbackListener implements
        AnalysisEngineServiceStub {
  
  private static final Class CLASS_NAME = JmsAnalysisEngineServiceStub.class;

  public static final String PARAM_BROKER_URL = "brokerUrl";

  public static final String PARAM_ENDPOINT = "endpoint";

  public static final String PARAM_TIMEOUT = "timeout";

  public static final String PARAM_GETMETA_TIMEOUT = "getmetatimeout";

  public static final String PARAM_CPC_TIMEOUT = "cpctimeout";

  public static final String PARAM_BIN_SERIALIZTION = "binary_serialization";

  private Object mux = new Object();

  private boolean cpcReceived;

  private UimaAsynchronousEngine uimaEEEngine;

  public JmsAnalysisEngineServiceStub(Resource owner, Parameter[] parameters)
          throws ResourceInitializationException {
    // read parameters
    String brokerUrl = null;
    String endpoint = null;
    int timeout = 0;
    int getMetaTimeout = 0;
    int cpcTimeout = 0;

    String binary_serialization = null;
    for (int i = 0; i < parameters.length; i++) {
      if (PARAM_BROKER_URL.equalsIgnoreCase(parameters[i].getName())) {
        brokerUrl = parameters[i].getValue();
      } else if (PARAM_ENDPOINT.equalsIgnoreCase(parameters[i].getName())) {
        endpoint = parameters[i].getValue();
      } else if (PARAM_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
        timeout = Integer.parseInt(parameters[i].getValue());
      } else if (PARAM_BIN_SERIALIZTION.equalsIgnoreCase(parameters[i].getName())) {
        binary_serialization = parameters[i].getValue();
      } else if (PARAM_GETMETA_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
        getMetaTimeout = Integer.parseInt(parameters[i].getValue());
      } else if (PARAM_CPC_TIMEOUT.equalsIgnoreCase(parameters[i].getName())) {
        cpcTimeout = Integer.parseInt(parameters[i].getValue());
      }
    }

    // initialize UIMA EE Engine
    Map appCtxt = new HashMap();
    appCtxt.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
    appCtxt.put(UimaAsynchronousEngine.Endpoint, endpoint);
    appCtxt.put(UimaAsynchronousEngine.CasPoolSize, 0);
    if (timeout > 0) {
      System.out.println("Setting Process Timeout: " + timeout);
      appCtxt.put(UimaAsynchronousEngine.Timeout, timeout);
    }
    if (getMetaTimeout > 0) {
      System.out.println("Setting GetMeta Timeout: " + getMetaTimeout);
      appCtxt.put(UimaAsynchronousEngine.GetMetaTimeout, getMetaTimeout);
    }
    if (cpcTimeout > 0) {
      System.out.println("Setting CPC Timeout: " + cpcTimeout);
      appCtxt.put(UimaAsynchronousEngine.CpcTimeout, cpcTimeout);
    }
    if (binary_serialization != null && binary_serialization.equalsIgnoreCase("true")) {
      System.out.println("Using binary serialization");
      appCtxt.put(UimaAsynchronousEngine.SerializationStrategy, "binary");
    }
    uimaEEEngine = new BaseUIMAAsynchronousEngine_impl();
    uimaEEEngine.addStatusCallbackListener(this);
    uimaEEEngine.initialize(appCtxt);
    System.out.println("adapter init complete");
  }

  /**
   * @see org.apache.uima.resource.service.ResourceServiceStub#callGetMetaData()
   */
  public ResourceMetaData callGetMetaData() throws ResourceServiceException {
    // metadata already retrieved during initialization
    try {
      //return uimaEEEngine.getMetaData();
      
      ResourceMetaData  rmd = uimaEEEngine.getMetaData();
      if ( rmd != null ) {
        ((ProcessingResourceMetaData)rmd).getOperationalProperties().setMultipleDeploymentAllowed(true);
        return rmd;
      }
      
    } catch (ResourceInitializationException e) {
      throw new ResourceServiceException(e);
    }
    
    throw new ResourceServiceException(new Exception("Uima AS getMetaData() call failed."));
  }

  /**
   * @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callGetAnalysisEngineMetaData()
   */
  public AnalysisEngineMetaData callGetAnalysisEngineMetaData() throws ResourceServiceException {
    return (AnalysisEngineMetaData) callGetMetaData();
  }

  /**
   * @see org.apache.uima.analysis_engine.service.AnalysisEngineServiceStub#callProcess(CAS)
   */
  public void callProcess(CAS aCAS) throws ResourceServiceException {
    try {
      uimaEEEngine.sendAndReceiveCAS(aCAS);
    } catch (ResourceProcessException e) {
      throw new ResourceServiceException(e);
    }
  }

  /**
   * @see CasObjectProcessorServiceStub#callProcessCas(CAS)
   */
  public void callProcessCas(CAS aCAS) throws ResourceServiceException {
    callProcess(aCAS);
  }

  /**
   * @see org.apache.uima.resource.service.impl.ResourceServiceStub#destroy()
   */
  public void destroy() {
    try {
      // System.out.println("destroy methjdssdx");
      uimaEEEngine.stop();
    } catch (Exception e) {
      if (UIMAFramework.getLogger().isLoggable(Level.WARNING)) {
        UIMAFramework.getLogger().log(Level.WARNING, e.getMessage(), e);
      }
    }
  }

  /**
   * @see org.apache.uima.collection.impl.service.CasObjectProcessorServiceStub#callBatchProcessComplete()
   */
  public void callBatchProcessComplete() throws ResourceServiceException {
    // Not supported. Do nothing, rather than throw an exception, since this is called
    // in the normal course of CPE processing.
  }

  /**
   * @see org.apache.uima.collection.impl.service.CasObjectProcessorServiceStub#callCollectionProcessComplete()
   */
  public void callCollectionProcessComplete() throws ResourceServiceException {
    try {
      cpcReceived = false;
      uimaEEEngine.collectionProcessingComplete();
      // make this routine synchronous
      // System.out.println("CPC no wakeup needed");
      synchronized (mux) {
        while (!cpcReceived) {
          try {
            mux.wait();
            // System.out.println("CPC wakeup");
          } catch (InterruptedException e) {
            // Only here if something interrupts this thread
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                      "callCollectionProcessComplete", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
                      new Object[] { Thread.currentThread().getName(), e });
            }
          }
        }
      }
    } catch (ResourceProcessException e) {
      throw new ResourceServiceException(e);
    }
  }

  /*
   * (non-Javadoc)
   * 
   * @see
   * org.apache.uima.aae.client.UimaASStatusCallbackListener#collectionProcessComplete(org.apache
   * .uima.collection.EntityProcessStatus)
   */
  public void collectionProcessComplete(EntityProcessStatus aStatus) {
    synchronized (mux) {
      // System.out.println("CPC reply done got one");
      cpcReceived = true;
      mux.notifyAll();
    }
  }

  /*
   * (non-Javadoc)
   * 
   * @see
   * org.apache.uima.aae.client.UimaASStatusCallbackListener#entityProcessComplete(org.apache.uima
   * .cas.CAS, org.apache.uima.collection.EntityProcessStatus)
   */
  public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus) {
    // not used
  }

  /*
   * (non-Javadoc)
   * 
   * @see
   * org.apache.uima.aae.client.UimaASStatusCallbackListener#initializationComplete(org.apache.uima
   * .collection.EntityProcessStatus)
   */
  public void initializationComplete(EntityProcessStatus aStatus) {
    // not used
  }

}
