/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.axis2.description;

import java.util.HashMap;

import javax.xml.namespace.QName;

import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.util.UIDGenerator;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.OperationClient;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.async.AsyncResult;
import org.apache.axis2.client.async.AxisCallback;
import org.apache.axis2.client.async.Callback;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.i18n.Messages;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.util.Utils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class OutInAxisOperation extends TwoChannelAxisOperation {

	private static final Log log = LogFactory.getLog(OutInAxisOperation.class);

    public OutInAxisOperation() {
        super();
        //setup a temporary name
        QName tmpName = new QName(this.getClass().getName() + "_" + UIDGenerator.generateUID());
        this.setName(tmpName);
        setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN);
    }

    public OutInAxisOperation(QName name) {
        super(name);
        setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN);
    }

    public void addMessageContext(MessageContext msgContext,
                                  OperationContext opContext) throws AxisFault {
        HashMap<String, MessageContext> mep = opContext.getMessageContexts();
        MessageContext immsgContext = (MessageContext) mep
                .get(MESSAGE_LABEL_IN_VALUE);
        MessageContext outmsgContext = (MessageContext) mep
                .get(MESSAGE_LABEL_OUT_VALUE);

        if ((immsgContext != null) && (outmsgContext != null)) {
            throw new AxisFault(Messages.getMessage("mepcompleted"));
        }

        if (outmsgContext == null) {
            mep.put(MESSAGE_LABEL_OUT_VALUE, msgContext);
        } else {
            mep.put(MESSAGE_LABEL_IN_VALUE, msgContext);
            opContext.setComplete(true);
            opContext.cleanup();
        }
    }

    /**
     * Returns a MEP client for an Out-IN operation. This client can be used to
     * interact with a server which is offering an In-Out operation. To use the
     * client, you must call addMessageContext() with a message context and then
     * call execute() to execute the client.
     *
     * @param sc      The service context for this client to live within. Cannot be
     *                null.
     * @param options Options to use as defaults for this client. If any options are
     *                set specifically on the client then those override options
     *                here.
     */
    public OperationClient createClient(ServiceContext sc, Options options) {
        return new OutInAxisOperationClient(this, sc, options);
    }
}

/**
 * MEP client for moi.
 */
class OutInAxisOperationClient extends OperationClient {

    private static Log log = LogFactory.getLog(OutInAxisOperationClient.class);

    OutInAxisOperationClient(OutInAxisOperation axisOp, ServiceContext sc,
                             Options options) {
        super(axisOp, sc, options);
    }

    /**
     * Adds message context to operation context, so that it will handle the
     * logic correctly if the OperationContext is null then new one will be
     * created, and Operation Context will become null when some one calls reset().
     *
     * @param msgContext the MessageContext to add
     * @throws AxisFault
     */
    public void addMessageContext(MessageContext msgContext) throws AxisFault {
        msgContext.setServiceContext(sc);
        if (msgContext.getMessageID() == null) {
            setMessageID(msgContext);
        }
        axisOp.registerOperationContext(msgContext, oc);
    }

    /**
     * Returns the message context for a given message label.
     *
     * @param messageLabel :
     *                     label of the message and that can be either "Out" or "In" and
     *                     nothing else
     * @return Returns MessageContext.
     * @throws AxisFault
     */
    public MessageContext getMessageContext(String messageLabel)
            throws AxisFault {
        return oc.getMessageContext(messageLabel);
    }

    public void setCallback(Callback callback) {
        this.callback = callback;
    }

    /**
     * Executes the MEP. What this does depends on the specific MEP client. The
     * basic idea is to have the MEP client execute and do something with the
     * messages that have been added to it so far. For example, if its an Out-In
     * MEP, then if the Out message has been set, then executing the client asks
     * it to send the message and get the In message, possibly using a different
     * thread.
     *
     * @param block Indicates whether execution should block or return ASAP. What
     *              block means is of course a function of the specific MEP
     *              client. IGNORED BY THIS MEP CLIENT.
     * @throws AxisFault if something goes wrong during the execution of the MEP.
     */
    public void executeImpl(boolean block) throws AxisFault {
        if (log.isDebugEnabled()) {
            log.debug("Entry: OutInAxisOperationClient::execute, " + block);
        }
        if (completed) {
            throw new AxisFault(Messages.getMessage("mepiscomplted"));
        }
        ConfigurationContext cc = sc.getConfigurationContext();

        // copy interesting info from options to message context.
        MessageContext mc = oc.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
        if (mc == null) {
            throw new AxisFault(Messages.getMessage("outmsgctxnull"));
        }
        prepareMessageContext(cc, mc);

        if (options.getTransportIn() == null && mc.getTransportIn() == null) {
            mc.setTransportIn(ClientUtils.inferInTransport(cc
                    .getAxisConfiguration(), options, mc));
        } else if (mc.getTransportIn() == null) {
            mc.setTransportIn(options.getTransportIn());
        }

        /**
         * If a module has set the USE_ASYNC_OPERATIONS option then we override the behaviour
         * for sync calls, and effectively USE_CUSTOM_LISTENER too. However we leave real
         * async calls alone.
         */
        boolean useAsync = false;
        if (!mc.getOptions().isUseSeparateListener()) {
            Boolean useAsyncOption =
                    (Boolean) mc.getProperty(Constants.Configuration.USE_ASYNC_OPERATIONS);
			if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: useAsyncOption " + useAsyncOption);
            if (useAsyncOption != null) {
                useAsync = useAsyncOption.booleanValue();
            }
        }

        EndpointReference replyTo = mc.getReplyTo();
        if (replyTo != null) {
            if (replyTo.hasNoneAddress()) {
                throw new AxisFault( replyTo.getAddress() + "" +
                        " can not be used with OutInAxisOperationClient , user either "
                        + "fireAndForget or sendRobust)");
            }
            else if (replyTo.isWSAddressingAnonymous() &&
                     replyTo.getAllReferenceParameters() != null) {
                mc.setProperty(AddressingConstants.INCLUDE_OPTIONAL_HEADERS, Boolean.TRUE);
            }
            
            String customReplyTo = (String)options.getProperty(Options.CUSTOM_REPLYTO_ADDRESS);
            if ( ! (Options.CUSTOM_REPLYTO_ADDRESS_TRUE.equals(customReplyTo))) {
                if (!replyTo.hasAnonymousAddress()){
                    useAsync = true;
                }
            }
        }

        if (useAsync || mc.getOptions().isUseSeparateListener()) {
            sendAsync(useAsync, mc);
        } else {
            if (block) {
                // Send the SOAP Message and receive a response
                send(mc);
                completed = true;
            } else {
                sc.getConfigurationContext().getThreadPool().execute(
                        new NonBlockingInvocationWorker(callback, mc, axisCallback));
            }
        }
    }

    private void sendAsync(boolean useAsync, MessageContext mc)
            throws AxisFault {
        if (log.isDebugEnabled()) {
            log.debug("useAsync=" + useAsync + ", seperateListener=" +
                    mc.getOptions().isUseSeparateListener());
        }
        /**
         * We are following the async path. If the user hasn't set a callback object then we must
         * block until the whole MEP is complete, as they have no other way to get their reply message.
         */
        // THREADSAFE issue: Multiple threads could be trying to initialize the callback receiver
        // so it is synchronized.  It is not done within the else clause to avoid the 
        // double-checked lock antipattern.
        CallbackReceiver callbackReceiver;
        synchronized (axisOp) {
            if (axisOp.getMessageReceiver() != null &&
                    axisOp.getMessageReceiver() instanceof CallbackReceiver) {
                callbackReceiver = (CallbackReceiver) axisOp.getMessageReceiver();
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new callback receiver");
                }
                callbackReceiver = new CallbackReceiver();
                axisOp.setMessageReceiver(callbackReceiver);
				if (log.isDebugEnabled()) log.debug("OutInAxisOperation: callbackReceiver " + callbackReceiver + " : " + axisOp);
            }
        }

        SyncCallBack internalCallback = null;
        if (callback != null) {
            callbackReceiver.addCallback(mc.getMessageID(), callback);
			if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating callback");
        } else if (axisCallback != null) {
            callbackReceiver.addCallback(mc.getMessageID(), axisCallback);  
			if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating axis callback");			
        } else {
            if (log.isDebugEnabled()) {
                log.debug("Creating internal callback");
            }
            internalCallback = new SyncCallBack();
            callbackReceiver.addCallback(mc.getMessageID(), internalCallback);
			if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating internal callback");
        }

        /**
         * If USE_CUSTOM_LISTENER is set to 'true' the replyTo value will not be replaced and Axis2 will not
         * start its internal listner. Some other enntity (e.g. a module) should take care of obtaining the
         * response message.
         */
        Boolean useCustomListener =
                (Boolean) options.getProperty(Constants.Configuration.USE_CUSTOM_LISTENER);
        if (useAsync) {
            useCustomListener = Boolean.TRUE;
        }
        if (useCustomListener == null || !useCustomListener.booleanValue()) {
            EndpointReference replyTo = mc.getReplyTo();
            if (replyTo == null || replyTo.hasAnonymousAddress()){
                EndpointReference replyToFromTransport =
                        mc.getConfigurationContext().getListenerManager().
                                getEPRforService(sc.getAxisService().getName(),
                                        axisOp.getName().getLocalPart(), mc
                                        .getTransportIn().getName());

                if (replyTo == null) {
                    mc.setReplyTo(replyToFromTransport);
                } else {
                    replyTo.setAddress(replyToFromTransport.getAddress());
                }
            }
        }

        //if we don't do this , this guy will wait till it gets HTTP 202 in the HTTP case
        mc.setProperty(MessageContext.CLIENT_API_NON_BLOCKING, Boolean.TRUE);
        mc.getConfigurationContext().registerOperationContext(mc.getMessageID(), oc);
        AxisEngine.send(mc);
		if (internalCallback != null) {
            internalCallback.waitForCompletion(options.getTimeOutInMilliSeconds());

            // process the result of the invocation
            if (internalCallback.envelope == null) {
                if (internalCallback.error == null) {
                    log.error("Callback had neither error nor response");
                }
                if (options.isExceptionToBeThrownOnSOAPFault()) {
                    throw AxisFault.makeFault(internalCallback.error);
                }
            }
        }
    }

    /**
     * When synchronous send() gets back a response MessageContext, this is the workhorse
     * method which processes it.
     *
     * @param responseMessageContext the active response MessageContext
     * @throws AxisFault if something went wrong
     */
    protected void handleResponse(MessageContext responseMessageContext) throws AxisFault{
        // Options object reused above so soapAction needs to be removed so
        // that soapAction+wsa:Action on response don't conflict
        responseMessageContext.setSoapAction(null);

        if (responseMessageContext.getEnvelope() == null) {
            // If request is REST we assume the responseMessageContext is REST, so
            // set the variable
            /*
             * old code here was using the outbound message context to set the inbound SOAP namespace,
             * as such and passing it to TransportUtils.createSOAPMessage
             *
             * msgctx.getEnvelope().getNamespace().getNamespaceURI()
             *
             * However, the SOAP1.2 spec, appendix A indicates that if a SOAP1.2 message is sent to a SOAP1.1
             * endpoint, we will get a SOAP1.1 (fault) message response.  We need another way to set
             * the inbound SOAP version.  Best way to do this is to trust the content type and let
             * createSOAPMessage take care of figuring out what the SOAP namespace is.
             */
            SOAPEnvelope resenvelope = TransportUtils.createSOAPMessage(responseMessageContext);
            if (resenvelope != null) {
                responseMessageContext.setEnvelope(resenvelope);
            } else {
                throw new AxisFault(Messages
                        .getMessage("blockingInvocationExpectsResponse"));
            }
        }
        SOAPEnvelope resenvelope = responseMessageContext.getEnvelope();
        if (resenvelope != null) {
            AxisEngine.receive(responseMessageContext);
            if (responseMessageContext.getReplyTo() != null) {
                sc.setTargetEPR(responseMessageContext.getReplyTo());
            }

            // rampart handlers change the envelope and set the decrypted envelope
            // so need to check the new one else resenvelope.hasFault() become false.
            resenvelope = responseMessageContext.getEnvelope();
            if (resenvelope.hasFault()||responseMessageContext.isProcessingFault()) {
                if (options.isExceptionToBeThrownOnSOAPFault()) {
                    // does the SOAPFault has a detail element for Excpetion
                    throw Utils.getInboundFaultFromMessageContext(responseMessageContext);
                }
            }
        }
    }

    /**
     * Synchronously send the request and receive a response.  This relies on the transport
     * correctly connecting the response InputStream!
     *
     * @param msgContext the request MessageContext to send.
     * @return Returns MessageContext.
     * @throws AxisFault Sends the message using a two way transport and waits for a response
     */
    protected MessageContext send(MessageContext msgContext) throws AxisFault {

        // create the responseMessageContext

        MessageContext responseMessageContext =
                msgContext.getConfigurationContext().createMessageContext();

        responseMessageContext.setServerSide(false);
        responseMessageContext.setOperationContext(msgContext.getOperationContext());
        responseMessageContext.setOptions(new Options(options));
        responseMessageContext.setMessageID(msgContext.getMessageID());
        addMessageContext(responseMessageContext);
        responseMessageContext.setServiceContext(msgContext.getServiceContext());
        responseMessageContext.setAxisMessage(
                axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));

        //sending the message
        AxisEngine.send(msgContext);

        responseMessageContext.setDoingREST(msgContext.isDoingREST());

        // Copy RESPONSE properties which the transport set onto the request message context when it processed
        // the incoming response recieved in reply to an outgoing request.
        responseMessageContext.setProperty(MessageContext.TRANSPORT_HEADERS, 
                                           msgContext.getProperty(MessageContext.TRANSPORT_HEADERS));
        responseMessageContext.setProperty(HTTPConstants.MC_HTTP_STATUS_CODE,
                                           msgContext.getProperty(HTTPConstants.MC_HTTP_STATUS_CODE));

        responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgContext
                .getProperty(MessageContext.TRANSPORT_IN));
        responseMessageContext.setTransportIn(msgContext.getTransportIn());
        responseMessageContext.setTransportOut(msgContext.getTransportOut());
        handleResponse(responseMessageContext);
        return responseMessageContext;
    }

    /**
     * This class is the workhorse for a non-blocking invocation that uses a two
     * way transport.
     */
    private class NonBlockingInvocationWorker implements Runnable {
        private Callback callback;

        private MessageContext msgctx;
        private AxisCallback axisCallback;

        public NonBlockingInvocationWorker(Callback callback,
                                           MessageContext msgctx ,
                                           AxisCallback axisCallback) {
            this.callback = callback;
            this.msgctx = msgctx;
            this.axisCallback =axisCallback;
        }

        public void run() {
            try {
                // send the request and wait for response
                MessageContext response = send(msgctx);
                // call the callback
                if (response != null) {
                    SOAPEnvelope resenvelope = response.getEnvelope();
                    
                    if (resenvelope.hasFault()) {
                        SOAPBody body = resenvelope.getBody();
                        // If a fault was found, create an AxisFault with a MessageContext so that
                        // other programming models can deserialize the fault to an alternative form.
                        AxisFault fault = new AxisFault(body.getFault(), response);
                        if (callback != null) {
                            callback.onError(fault);
                        } else if (axisCallback != null) {
                            if (options.isExceptionToBeThrownOnSOAPFault()) {
                                axisCallback.onError(fault);
                            } else {
                                axisCallback.onFault(response);
                            }
                        }

                    } else {
                        if (callback != null) {
                            AsyncResult asyncResult = new AsyncResult(response);
                            callback.onComplete(asyncResult);
                        } else if (axisCallback != null) {
                            axisCallback.onMessage(response);
                        }

                    }
                }

            } catch (Exception e) {
                if (callback != null) {
                    callback.onError(e);
                } else if (axisCallback != null) {
                    axisCallback.onError(e);
                }

            } finally {
                if (callback != null) {
                    callback.setComplete(true);
                }else if (axisCallback != null) {
                    axisCallback.onComplete();
                }
            }
        }
    }

    /**
     * This class acts as a callback that allows users to wait on the result.
     */
    private class SyncCallBack implements AxisCallback {
        boolean complete;
        boolean receivedFault;

        public boolean waitForCompletion(long timeout) throws AxisFault {
            synchronized (this) {
                try {
                    if (complete) return !receivedFault;
                    wait(timeout);
                    if (!complete) {
                        // We timed out!
                        throw new AxisFault( Messages.getMessage("responseTimeOut"));
                    }
                } catch (InterruptedException e) {
                    // Something interrupted our wait!
                    error = e;
                }
            }

            if (error != null) throw AxisFault.makeFault(error);

            return !receivedFault;
        }

        /**
         * This is called when we receive a message.
         *
         * @param msgContext the (response) MessageContext
         */
        public void onMessage(MessageContext msgContext) {
            // Transport input stream gets closed after calling setComplete
            // method. Have to build the whole envelope including the
            // attachments at this stage. Data might get lost if the input
            // stream gets closed before building the whole envelope.

            // TODO: Shouldn't need to do this - need to hook up stream closure to Axiom completion
            this.envelope = msgContext.getEnvelope();
            this.envelope.buildWithAttachments();
        }

        /**
         * This gets called when a fault message is received.
         *
         * @param msgContext the MessageContext containing the fault.
         */
        public void onFault(MessageContext msgContext) {
           error = Utils.getInboundFaultFromMessageContext(msgContext);
        }

        /**
         * This is called at the end of the MEP no matter what happens, quite like a
         * finally block.
         */
        public synchronized void onComplete() {
			complete = true;
            notify();
        }

        private SOAPEnvelope envelope;

        private Exception error;

        public void onError(Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("Entry: OutInAxisOperationClient$SyncCallBack::onError, " + e);
            }
            error = e;
            if (log.isDebugEnabled()) {
                log.debug("Exit: OutInAxisOperationClient$SyncCallBack::onError");
            }
        }
    }

}
