| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.axis2.description; |
| |
| import java.util.HashMap; |
| |
| import javax.xml.namespace.QName; |
| |
| import org.apache.axiom.om.util.UUIDGenerator; |
| import org.apache.axiom.soap.SOAPBody; |
| import org.apache.axiom.soap.SOAPEnvelope; |
| import org.apache.axis2.AxisFault; |
| import org.apache.axis2.Constants; |
| import org.apache.axis2.addressing.AddressingConstants; |
| import org.apache.axis2.addressing.EndpointReference; |
| import org.apache.axis2.client.OperationClient; |
| import org.apache.axis2.client.Options; |
| import org.apache.axis2.client.async.AsyncResult; |
| import org.apache.axis2.client.async.AxisCallback; |
| import org.apache.axis2.client.async.Callback; |
| import org.apache.axis2.context.ConfigurationContext; |
| import org.apache.axis2.context.MessageContext; |
| import org.apache.axis2.context.OperationContext; |
| import org.apache.axis2.context.ServiceContext; |
| import org.apache.axis2.engine.AxisEngine; |
| import org.apache.axis2.i18n.Messages; |
| import org.apache.axis2.transport.TransportUtils; |
| import org.apache.axis2.transport.http.HTTPConstants; |
| import org.apache.axis2.util.CallbackReceiver; |
| import org.apache.axis2.util.Utils; |
| import org.apache.axis2.wsdl.WSDLConstants; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| public class OutInAxisOperation extends TwoChannelAxisOperation { |
| |
| private static final Log log = LogFactory.getLog(OutInAxisOperation.class); |
| |
| public OutInAxisOperation() { |
| super(); |
| //setup a temporary name |
| QName tmpName = new QName(this.getClass().getName() + "_" + UUIDGenerator.getUUID()); |
| this.setName(tmpName); |
| setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN); |
| } |
| |
| public OutInAxisOperation(QName name) { |
| super(name); |
| setMessageExchangePattern(WSDL2Constants.MEP_URI_OUT_IN); |
| } |
| |
| public void addMessageContext(MessageContext msgContext, |
| OperationContext opContext) throws AxisFault { |
| HashMap<String, MessageContext> mep = opContext.getMessageContexts(); |
| MessageContext immsgContext = (MessageContext) mep |
| .get(MESSAGE_LABEL_IN_VALUE); |
| MessageContext outmsgContext = (MessageContext) mep |
| .get(MESSAGE_LABEL_OUT_VALUE); |
| |
| if ((immsgContext != null) && (outmsgContext != null)) { |
| throw new AxisFault(Messages.getMessage("mepcompleted")); |
| } |
| |
| if (outmsgContext == null) { |
| mep.put(MESSAGE_LABEL_OUT_VALUE, msgContext); |
| } else { |
| mep.put(MESSAGE_LABEL_IN_VALUE, msgContext); |
| opContext.setComplete(true); |
| opContext.cleanup(); |
| } |
| } |
| |
| /** |
| * Returns a MEP client for an Out-IN operation. This client can be used to |
| * interact with a server which is offering an In-Out operation. To use the |
| * client, you must call addMessageContext() with a message context and then |
| * call execute() to execute the client. |
| * |
| * @param sc The service context for this client to live within. Cannot be |
| * null. |
| * @param options Options to use as defaults for this client. If any options are |
| * set specifically on the client then those override options |
| * here. |
| */ |
| public OperationClient createClient(ServiceContext sc, Options options) { |
| return new OutInAxisOperationClient(this, sc, options); |
| } |
| } |
| |
| /** |
| * MEP client for moi. |
| */ |
| class OutInAxisOperationClient extends OperationClient { |
| |
| private static Log log = LogFactory.getLog(OutInAxisOperationClient.class); |
| |
| OutInAxisOperationClient(OutInAxisOperation axisOp, ServiceContext sc, |
| Options options) { |
| super(axisOp, sc, options); |
| } |
| |
| /** |
| * Adds message context to operation context, so that it will handle the |
| * logic correctly if the OperationContext is null then new one will be |
| * created, and Operation Context will become null when some one calls reset(). |
| * |
| * @param msgContext the MessageContext to add |
| * @throws AxisFault |
| */ |
| public void addMessageContext(MessageContext msgContext) throws AxisFault { |
| msgContext.setServiceContext(sc); |
| if (msgContext.getMessageID() == null) { |
| setMessageID(msgContext); |
| } |
| axisOp.registerOperationContext(msgContext, oc); |
| } |
| |
| /** |
| * Returns the message context for a given message label. |
| * |
| * @param messageLabel : |
| * label of the message and that can be either "Out" or "In" and |
| * nothing else |
| * @return Returns MessageContext. |
| * @throws AxisFault |
| */ |
| public MessageContext getMessageContext(String messageLabel) |
| throws AxisFault { |
| return oc.getMessageContext(messageLabel); |
| } |
| |
| public void setCallback(Callback callback) { |
| this.callback = callback; |
| } |
| |
| /** |
| * Executes the MEP. What this does depends on the specific MEP client. The |
| * basic idea is to have the MEP client execute and do something with the |
| * messages that have been added to it so far. For example, if its an Out-In |
| * MEP, then if the Out message has been set, then executing the client asks |
| * it to send the message and get the In message, possibly using a different |
| * thread. |
| * |
| * @param block Indicates whether execution should block or return ASAP. What |
| * block means is of course a function of the specific MEP |
| * client. IGNORED BY THIS MEP CLIENT. |
| * @throws AxisFault if something goes wrong during the execution of the MEP. |
| */ |
| public void executeImpl(boolean block) throws AxisFault { |
| if (log.isDebugEnabled()) { |
| log.debug("Entry: OutInAxisOperationClient::execute, " + block); |
| } |
| if (completed) { |
| throw new AxisFault(Messages.getMessage("mepiscomplted")); |
| } |
| ConfigurationContext cc = sc.getConfigurationContext(); |
| |
| // copy interesting info from options to message context. |
| MessageContext mc = oc.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE); |
| if (mc == null) { |
| throw new AxisFault(Messages.getMessage("outmsgctxnull")); |
| } |
| prepareMessageContext(cc, mc); |
| |
| if (options.getTransportIn() == null && mc.getTransportIn() == null) { |
| mc.setTransportIn(ClientUtils.inferInTransport(cc |
| .getAxisConfiguration(), options, mc)); |
| } else if (mc.getTransportIn() == null) { |
| mc.setTransportIn(options.getTransportIn()); |
| } |
| |
| /** |
| * If a module has set the USE_ASYNC_OPERATIONS option then we override the behaviour |
| * for sync calls, and effectively USE_CUSTOM_LISTENER too. However we leave real |
| * async calls alone. |
| */ |
| boolean useAsync = false; |
| if (!mc.getOptions().isUseSeparateListener()) { |
| Boolean useAsyncOption = |
| (Boolean) mc.getProperty(Constants.Configuration.USE_ASYNC_OPERATIONS); |
| if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: useAsyncOption " + useAsyncOption); |
| if (useAsyncOption != null) { |
| useAsync = useAsyncOption.booleanValue(); |
| } |
| } |
| |
| EndpointReference replyTo = mc.getReplyTo(); |
| if (replyTo != null) { |
| if (replyTo.hasNoneAddress()) { |
| throw new AxisFault( replyTo.getAddress() + "" + |
| " can not be used with OutInAxisOperationClient , user either " |
| + "fireAndForget or sendRobust)"); |
| } |
| else if (replyTo.isWSAddressingAnonymous() && |
| replyTo.getAllReferenceParameters() != null) { |
| mc.setProperty(AddressingConstants.INCLUDE_OPTIONAL_HEADERS, Boolean.TRUE); |
| } |
| |
| String customReplyTo = (String)options.getProperty(Options.CUSTOM_REPLYTO_ADDRESS); |
| if ( ! (Options.CUSTOM_REPLYTO_ADDRESS_TRUE.equals(customReplyTo))) { |
| if (!replyTo.hasAnonymousAddress()){ |
| useAsync = true; |
| } |
| } |
| } |
| |
| if (useAsync || mc.getOptions().isUseSeparateListener()) { |
| sendAsync(useAsync, mc); |
| } else { |
| if (block) { |
| // Send the SOAP Message and receive a response |
| send(mc); |
| completed = true; |
| } else { |
| sc.getConfigurationContext().getThreadPool().execute( |
| new NonBlockingInvocationWorker(callback, mc, axisCallback)); |
| } |
| } |
| } |
| |
| private void sendAsync(boolean useAsync, MessageContext mc) |
| throws AxisFault { |
| if (log.isDebugEnabled()) { |
| log.debug("useAsync=" + useAsync + ", seperateListener=" + |
| mc.getOptions().isUseSeparateListener()); |
| } |
| /** |
| * We are following the async path. If the user hasn't set a callback object then we must |
| * block until the whole MEP is complete, as they have no other way to get their reply message. |
| */ |
| // THREADSAFE issue: Multiple threads could be trying to initialize the callback receiver |
| // so it is synchronized. It is not done within the else clause to avoid the |
| // double-checked lock antipattern. |
| CallbackReceiver callbackReceiver; |
| synchronized (axisOp) { |
| if (axisOp.getMessageReceiver() != null && |
| axisOp.getMessageReceiver() instanceof CallbackReceiver) { |
| callbackReceiver = (CallbackReceiver) axisOp.getMessageReceiver(); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("Creating new callback receiver"); |
| } |
| callbackReceiver = new CallbackReceiver(); |
| axisOp.setMessageReceiver(callbackReceiver); |
| if (log.isDebugEnabled()) log.debug("OutInAxisOperation: callbackReceiver " + callbackReceiver + " : " + axisOp); |
| } |
| } |
| |
| SyncCallBack internalCallback = null; |
| if (callback != null) { |
| callbackReceiver.addCallback(mc.getMessageID(), callback); |
| if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating callback"); |
| } else if (axisCallback != null) { |
| callbackReceiver.addCallback(mc.getMessageID(), axisCallback); |
| if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating axis callback"); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("Creating internal callback"); |
| } |
| internalCallback = new SyncCallBack(); |
| callbackReceiver.addCallback(mc.getMessageID(), internalCallback); |
| if (log.isDebugEnabled()) log.debug("OutInAxisOperationClient: Creating internal callback"); |
| } |
| |
| /** |
| * If USE_CUSTOM_LISTENER is set to 'true' the replyTo value will not be replaced and Axis2 will not |
| * start its internal listner. Some other enntity (e.g. a module) should take care of obtaining the |
| * response message. |
| */ |
| Boolean useCustomListener = |
| (Boolean) options.getProperty(Constants.Configuration.USE_CUSTOM_LISTENER); |
| if (useAsync) { |
| useCustomListener = Boolean.TRUE; |
| } |
| if (useCustomListener == null || !useCustomListener.booleanValue()) { |
| EndpointReference replyTo = mc.getReplyTo(); |
| if (replyTo == null || replyTo.hasAnonymousAddress()){ |
| EndpointReference replyToFromTransport = |
| mc.getConfigurationContext().getListenerManager(). |
| getEPRforService(sc.getAxisService().getName(), |
| axisOp.getName().getLocalPart(), mc |
| .getTransportIn().getName()); |
| |
| if (replyTo == null) { |
| mc.setReplyTo(replyToFromTransport); |
| } else { |
| replyTo.setAddress(replyToFromTransport.getAddress()); |
| } |
| } |
| } |
| |
| //if we don't do this , this guy will wait till it gets HTTP 202 in the HTTP case |
| mc.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.TRUE); |
| mc.getConfigurationContext().registerOperationContext(mc.getMessageID(), oc); |
| AxisEngine.send(mc); |
| if (internalCallback != null) { |
| internalCallback.waitForCompletion(options.getTimeOutInMilliSeconds()); |
| |
| // process the result of the invocation |
| if (internalCallback.envelope == null) { |
| if (internalCallback.error == null) { |
| log.error("Callback had neither error nor response"); |
| } |
| if (options.isExceptionToBeThrownOnSOAPFault()) { |
| throw AxisFault.makeFault(internalCallback.error); |
| } |
| } |
| } |
| } |
| |
| /** |
| * When synchronous send() gets back a response MessageContext, this is the workhorse |
| * method which processes it. |
| * |
| * @param responseMessageContext the active response MessageContext |
| * @throws AxisFault if something went wrong |
| */ |
| protected void handleResponse(MessageContext responseMessageContext) throws AxisFault{ |
| // Options object reused above so soapAction needs to be removed so |
| // that soapAction+wsa:Action on response don't conflict |
| responseMessageContext.setSoapAction(null); |
| |
| if (responseMessageContext.getEnvelope() == null) { |
| // If request is REST we assume the responseMessageContext is REST, so |
| // set the variable |
| /* |
| * old code here was using the outbound message context to set the inbound SOAP namespace, |
| * as such and passing it to TransportUtils.createSOAPMessage |
| * |
| * msgctx.getEnvelope().getNamespace().getNamespaceURI() |
| * |
| * However, the SOAP1.2 spec, appendix A indicates that if a SOAP1.2 message is sent to a SOAP1.1 |
| * endpoint, we will get a SOAP1.1 (fault) message response. We need another way to set |
| * the inbound SOAP version. Best way to do this is to trust the content type and let |
| * createSOAPMessage take care of figuring out what the SOAP namespace is. |
| */ |
| SOAPEnvelope resenvelope = TransportUtils.createSOAPMessage(responseMessageContext); |
| if (resenvelope != null) { |
| responseMessageContext.setEnvelope(resenvelope); |
| } else { |
| throw new AxisFault(Messages |
| .getMessage("blockingInvocationExpectsResponse")); |
| } |
| } |
| SOAPEnvelope resenvelope = responseMessageContext.getEnvelope(); |
| if (resenvelope != null) { |
| AxisEngine.receive(responseMessageContext); |
| if (responseMessageContext.getReplyTo() != null) { |
| sc.setTargetEPR(responseMessageContext.getReplyTo()); |
| } |
| if (resenvelope.hasFault()||responseMessageContext.isProcessingFault()) { |
| if (options.isExceptionToBeThrownOnSOAPFault()) { |
| // does the SOAPFault has a detail element for Excpetion |
| throw Utils.getInboundFaultFromMessageContext(responseMessageContext); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Synchronously send the request and receive a response. This relies on the transport |
| * correctly connecting the response InputStream! |
| * |
| * @param msgContext the request MessageContext to send. |
| * @return Returns MessageContext. |
| * @throws AxisFault Sends the message using a two way transport and waits for a response |
| */ |
| protected MessageContext send(MessageContext msgContext) throws AxisFault { |
| |
| // create the responseMessageContext |
| |
| MessageContext responseMessageContext = |
| msgContext.getConfigurationContext().createMessageContext(); |
| |
| responseMessageContext.setServerSide(false); |
| responseMessageContext.setOperationContext(msgContext.getOperationContext()); |
| responseMessageContext.setOptions(new Options(options)); |
| responseMessageContext.setMessageID(msgContext.getMessageID()); |
| addMessageContext(responseMessageContext); |
| responseMessageContext.setServiceContext(msgContext.getServiceContext()); |
| responseMessageContext.setAxisMessage( |
| axisOp.getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE)); |
| |
| //sending the message |
| AxisEngine.send(msgContext); |
| |
| responseMessageContext.setDoingREST(msgContext.isDoingREST()); |
| |
| // Copy RESPONSE properties which the transport set onto the request message context when it processed |
| // the incoming response recieved in reply to an outgoing request. |
| responseMessageContext.setProperty(MessageContext.TRANSPORT_HEADERS, |
| msgContext.getProperty(MessageContext.TRANSPORT_HEADERS)); |
| responseMessageContext.setProperty(HTTPConstants.MC_HTTP_STATUS_CODE, |
| msgContext.getProperty(HTTPConstants.MC_HTTP_STATUS_CODE)); |
| |
| responseMessageContext.setProperty(MessageContext.TRANSPORT_IN, msgContext |
| .getProperty(MessageContext.TRANSPORT_IN)); |
| responseMessageContext.setTransportIn(msgContext.getTransportIn()); |
| responseMessageContext.setTransportOut(msgContext.getTransportOut()); |
| handleResponse(responseMessageContext); |
| return responseMessageContext; |
| } |
| |
| /** |
| * This class is the workhorse for a non-blocking invocation that uses a two |
| * way transport. |
| */ |
| private class NonBlockingInvocationWorker implements Runnable { |
| private Callback callback; |
| |
| private MessageContext msgctx; |
| private AxisCallback axisCallback; |
| |
| public NonBlockingInvocationWorker(Callback callback, |
| MessageContext msgctx , |
| AxisCallback axisCallback) { |
| this.callback = callback; |
| this.msgctx = msgctx; |
| this.axisCallback =axisCallback; |
| } |
| |
| public void run() { |
| try { |
| // send the request and wait for response |
| MessageContext response = send(msgctx); |
| // call the callback |
| if (response != null) { |
| SOAPEnvelope resenvelope = response.getEnvelope(); |
| |
| if (resenvelope.hasFault()) { |
| SOAPBody body = resenvelope.getBody(); |
| // If a fault was found, create an AxisFault with a MessageContext so that |
| // other programming models can deserialize the fault to an alternative form. |
| AxisFault fault = new AxisFault(body.getFault(), response); |
| if (callback != null) { |
| callback.onError(fault); |
| } else if (axisCallback != null) { |
| axisCallback.onError(fault); |
| } |
| |
| } else { |
| if (callback != null) { |
| AsyncResult asyncResult = new AsyncResult(response); |
| callback.onComplete(asyncResult); |
| } else if (axisCallback != null) { |
| axisCallback.onMessage(response); |
| } |
| |
| } |
| } |
| |
| } catch (Exception e) { |
| if (callback != null) { |
| callback.onError(e); |
| } else if (axisCallback != null) { |
| axisCallback.onError(e); |
| } |
| |
| } finally { |
| if (callback != null) { |
| callback.setComplete(true); |
| }else if (axisCallback != null) { |
| axisCallback.onComplete(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * This class acts as a callback that allows users to wait on the result. |
| */ |
| private class SyncCallBack implements AxisCallback { |
| boolean complete; |
| boolean receivedFault; |
| |
| public boolean waitForCompletion(long timeout) throws AxisFault { |
| synchronized (this) { |
| try { |
| if (complete) return !receivedFault; |
| wait(timeout); |
| if (!complete) { |
| // We timed out! |
| throw new AxisFault( Messages.getMessage("responseTimeOut")); |
| } |
| } catch (InterruptedException e) { |
| // Something interrupted our wait! |
| error = e; |
| } |
| } |
| |
| if (error != null) throw AxisFault.makeFault(error); |
| |
| return !receivedFault; |
| } |
| |
| /** |
| * This is called when we receive a message. |
| * |
| * @param msgContext the (response) MessageContext |
| */ |
| public void onMessage(MessageContext msgContext) { |
| // Transport input stream gets closed after calling setComplete |
| // method. Have to build the whole envelope including the |
| // attachments at this stage. Data might get lost if the input |
| // stream gets closed before building the whole envelope. |
| |
| // TODO: Shouldn't need to do this - need to hook up stream closure to Axiom completion |
| this.envelope = msgContext.getEnvelope(); |
| this.envelope.buildWithAttachments(); |
| } |
| |
| /** |
| * This gets called when a fault message is received. |
| * |
| * @param msgContext the MessageContext containing the fault. |
| */ |
| public void onFault(MessageContext msgContext) { |
| error = Utils.getInboundFaultFromMessageContext(msgContext); |
| } |
| |
| /** |
| * This is called at the end of the MEP no matter what happens, quite like a |
| * finally block. |
| */ |
| public synchronized void onComplete() { |
| complete = true; |
| notify(); |
| } |
| |
| private SOAPEnvelope envelope; |
| |
| private Exception error; |
| |
| public void onError(Exception e) { |
| if (log.isDebugEnabled()) { |
| log.debug("Entry: OutInAxisOperationClient$SyncCallBack::onError, " + e); |
| } |
| error = e; |
| if (log.isDebugEnabled()) { |
| log.debug("Exit: OutInAxisOperationClient$SyncCallBack::onError"); |
| } |
| } |
| } |
| |
| } |