| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.synapse.transport.passthru.util; |
| |
| import org.apache.axiom.om.OMElement; |
| import org.apache.axiom.om.OMNode; |
| import org.apache.axiom.om.OMText; |
| import org.apache.axiom.soap.SOAPEnvelope; |
| import org.apache.axis2.AxisFault; |
| import org.apache.axis2.Constants; |
| import org.apache.axis2.addressing.AddressingConstants; |
| import org.apache.axis2.addressing.AddressingHelper; |
| import org.apache.axis2.context.MessageContext; |
| import org.apache.axis2.description.WSDL2Constants; |
| import org.apache.axis2.engine.AxisConfiguration; |
| import org.apache.axis2.engine.Handler; |
| import org.apache.axis2.engine.Phase; |
| import org.apache.axis2.transport.RequestResponseTransport; |
| import org.apache.axis2.transport.TransportUtils; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.io.output.NullOutputStream; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.synapse.transport.passthru.PassThroughConstants; |
| import org.apache.synapse.transport.passthru.Pipe; |
| import org.apache.synapse.transport.passthru.config.PassThroughConfiguration; |
| |
| import javax.activation.DataHandler; |
| import javax.activation.DataSource; |
| import javax.xml.namespace.QName; |
| import javax.xml.stream.XMLStreamException; |
| |
| import java.io.BufferedInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.List; |
| |
| public class RelayUtils { |
| |
| private static final Log log = LogFactory.getLog(RelayUtils.class); |
| |
| private static final DeferredMessageBuilder messageBuilder = new DeferredMessageBuilder(); |
| |
| private static volatile Handler addressingInHandler = null; |
| private static boolean noAddressingHandler = false; |
| |
| private static Boolean forcePTBuild = null; |
| |
| static { |
| if (forcePTBuild == null){ |
| forcePTBuild = PassThroughConfiguration.getInstance().getBooleanProperty( |
| PassThroughConstants.FORCE_PASS_THROUGH_BUILDER); |
| if (forcePTBuild == null){ |
| forcePTBuild = true; |
| } |
| //this to keep track ignore the builder operation even though content level is enable. |
| } |
| } |
| |
| public static void buildMessage(org.apache.axis2.context.MessageContext msgCtx) throws IOException, |
| XMLStreamException { |
| |
| buildMessage(msgCtx, false); |
| } |
| |
| public static void buildMessage(MessageContext messageContext, |
| boolean earlyBuild) throws IOException, XMLStreamException { |
| |
| final Pipe pipe = (Pipe) messageContext.getProperty(PassThroughConstants.PASS_THROUGH_PIPE); |
| if (pipe != null && forcePTBuild && !PassThroughTransportUtils.builderInvoked(messageContext)) { |
| InputStream in = pipe.getInputStream(); |
| buildMessage(messageContext, earlyBuild, in); |
| return; |
| } |
| |
| SOAPEnvelope envelope = messageContext.getEnvelope(); |
| |
| QName firstElementQName; |
| if (envelope.getSOAPBodyFirstElementNS() != null) { |
| firstElementQName = new QName( |
| envelope.getSOAPBodyFirstElementNS().getNamespaceURI(), |
| envelope.getSOAPBodyFirstElementLocalName()); |
| } else if (envelope.getSOAPBodyFirstElementLocalName() != null){ |
| firstElementQName = new QName(envelope.getSOAPBodyFirstElementLocalName()); |
| } else { |
| firstElementQName = null; |
| } |
| |
| if (RelayConstants.BINARY_CONTENT_QNAME.equals(firstElementQName)) { |
| OMElement contentEle = envelope.getBody().getFirstElement(); |
| OMNode node = contentEle.getFirstOMChild(); |
| if (node != null && (node instanceof OMText)) { |
| OMText binaryDataNode = (OMText) node; |
| DataHandler dh = (DataHandler) binaryDataNode.getDataHandler(); |
| if (dh == null) { |
| throw new AxisFault("Error while building message"); |
| } |
| |
| DataSource dataSource = dh.getDataSource(); |
| //Ask the data source to stream, if it has not already cached the request |
| if (dataSource instanceof StreamingOnRequestDataSource) { |
| ((StreamingOnRequestDataSource) dataSource).setLastUse(true); |
| } |
| |
| InputStream in = dh.getInputStream(); |
| OMElement element = messageBuilder.getDocument(messageContext, in); |
| if (element != null) { |
| messageContext.setEnvelope(TransportUtils.createSOAPEnvelope(element)); |
| messageContext.setProperty(DeferredMessageBuilder.RELAY_FORMATTERS_MAP, |
| messageBuilder.getFormatters()); |
| |
| if (!earlyBuild) { |
| processAddressing(messageContext); |
| } |
| } |
| } |
| } |
| } |
| |
| private static void buildMessage(MessageContext messageContext, |
| boolean earlyBuild, InputStream in) throws IOException { |
| |
| BufferedInputStream bufferedInputStream = (BufferedInputStream) messageContext.getProperty( |
| PassThroughConstants.BUFFERED_INPUT_STREAM); |
| if (bufferedInputStream != null){ |
| try { |
| bufferedInputStream.reset(); |
| bufferedInputStream.mark(0); |
| } catch (Exception e) { |
| //just ignore the error |
| } |
| } else { |
| bufferedInputStream = new BufferedInputStream(in); |
| //TODO: need to handle properly; for the moment lets use around 100k buffer. |
| bufferedInputStream.mark(128 * 1024); |
| messageContext.setProperty(PassThroughConstants.BUFFERED_INPUT_STREAM, |
| bufferedInputStream); |
| } |
| |
| OMElement element = null; |
| try { |
| element = messageBuilder.getDocument(messageContext, bufferedInputStream); |
| } catch (Exception e) { |
| //Clearing the buffer when there is an exception occurred. |
| consumeAndDiscardMessage(messageContext); |
| messageContext.setProperty(PassThroughConstants.MESSAGE_BUILDER_INVOKED, Boolean.TRUE); |
| handleException("Error while building Passthrough stream", e); |
| } |
| |
| if (element != null) { |
| messageContext.setEnvelope(TransportUtils.createSOAPEnvelope(element)); |
| messageContext.setProperty(DeferredMessageBuilder.RELAY_FORMATTERS_MAP, |
| messageBuilder.getFormatters()); |
| messageContext.setProperty(PassThroughConstants.MESSAGE_BUILDER_INVOKED, Boolean.TRUE); |
| if (!earlyBuild) { |
| processAddressing(messageContext); |
| } |
| } |
| } |
| |
| private static void processAddressing(MessageContext messageContext) throws AxisFault { |
| if (noAddressingHandler) { |
| return; |
| } else if (addressingInHandler == null) { |
| synchronized (messageBuilder) { |
| if (addressingInHandler == null) { |
| AxisConfiguration axisConfig = messageContext.getConfigurationContext(). |
| getAxisConfiguration(); |
| List<Phase> phases = axisConfig.getInFlowPhases(); |
| boolean handlerFound = false; |
| for (Phase phase : phases) { |
| if ("Addressing".equals(phase.getName())) { |
| List<Handler> handlers = phase.getHandlers(); |
| for (Handler handler : handlers) { |
| if ("AddressingInHandler".equals(handler.getName())) { |
| addressingInHandler = handler; |
| handlerFound = true; |
| break; |
| } |
| } |
| break; |
| } |
| } |
| |
| if (!handlerFound) { |
| noAddressingHandler = true; |
| return; |
| } |
| } |
| } |
| } |
| |
| messageContext.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_IN_MESSAGES, "false"); |
| |
| Object disableAddressingForOutGoing = null; |
| if (messageContext.getProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES) != null){ |
| disableAddressingForOutGoing = messageContext.getProperty( |
| AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES); |
| } |
| addressingInHandler.invoke(messageContext); |
| |
| if (disableAddressingForOutGoing !=null){ |
| messageContext.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, |
| disableAddressingForOutGoing); |
| } |
| |
| if (messageContext.getAxisOperation() == null) { |
| return; |
| } |
| |
| String mepString = messageContext.getAxisOperation().getMessageExchangePattern(); |
| |
| if (isOneWay(mepString)) { |
| Object requestResponseTransport = messageContext.getProperty( |
| RequestResponseTransport.TRANSPORT_CONTROL); |
| if (requestResponseTransport != null) { |
| Boolean disableAck = getDisableAck(messageContext); |
| if (disableAck == null || !disableAck) { |
| ((RequestResponseTransport) requestResponseTransport).acknowledgeMessage( |
| messageContext); |
| } |
| } |
| } else if (AddressingHelper.isReplyRedirected(messageContext) && |
| AddressingHelper.isFaultRedirected(messageContext)) { |
| if (WSDL2Constants.MEP_URI_IN_OUT.equals(mepString)) { |
| // OR, if 2 way operation but the response is intended to not use the |
| // response channel of a 2-way transport then we don't need to keep the |
| // transport waiting. |
| Object requestResponseTransport = messageContext.getProperty( |
| RequestResponseTransport.TRANSPORT_CONTROL); |
| if (requestResponseTransport != null) { |
| |
| // We should send an early ack to the transport whenever possible, but |
| // some modules need to use the back channel, so we need to check if they |
| // have disabled this code. |
| Boolean disableAck = getDisableAck(messageContext); |
| if (disableAck == null || !disableAck) { |
| ((RequestResponseTransport) requestResponseTransport).acknowledgeMessage( |
| messageContext); |
| } |
| |
| } |
| } |
| } |
| } |
| |
| private static Boolean getDisableAck(MessageContext msgContext) throws AxisFault { |
| // We should send an early ack to the transport whenever possible, but some modules need |
| // to use the back channel, so we need to check if they have disabled this code. |
| Boolean disableAck = (Boolean) msgContext.getProperty( |
| Constants.Configuration.DISABLE_RESPONSE_ACK); |
| if (disableAck == null) { |
| disableAck = (Boolean) (msgContext.getAxisService() != null ? |
| msgContext.getAxisService().getParameterValue( |
| Constants.Configuration.DISABLE_RESPONSE_ACK) : null); |
| } |
| |
| return disableAck; |
| } |
| |
| private static boolean isOneWay(String mepString) { |
| return WSDL2Constants.MEP_URI_IN_ONLY.equals(mepString); |
| } |
| |
| /** |
| * Consumes the data in pipe completely in the given message context and discard it |
| * |
| * @param msgContext Axis2 Message context which contains the data |
| * @throws AxisFault |
| */ |
| private static void consumeAndDiscardMessage(MessageContext msgContext) throws AxisFault { |
| final Pipe pipe = (Pipe) msgContext.getProperty(PassThroughConstants.PASS_THROUGH_PIPE); |
| if (pipe != null) { |
| InputStream in = pipe.getInputStream(); |
| if (in != null) { |
| try { |
| IOUtils.copy(in, new NullOutputStream()); |
| } catch (IOException exception) { |
| handleException("Error when consuming the input stream to discard ", exception); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Perform an error log message to all logs @ ERROR and throws a AxisFault |
| * |
| * @param msg the log message |
| * @param e an Exception encountered |
| * @throws AxisFault |
| */ |
| private static void handleException(String msg, Exception e) throws AxisFault { |
| log.error(msg, e); |
| throw new AxisFault(msg, e); |
| } |
| } |