| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sandesha2.util; |
| |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import javax.xml.namespace.QName; |
| |
| import org.apache.axiom.om.OMElement; |
| import org.apache.axiom.soap.SOAP11Constants; |
| import org.apache.axiom.soap.SOAP12Constants; |
| import org.apache.axiom.soap.SOAP12Version; |
| import org.apache.axiom.soap.SOAPEnvelope; |
| import org.apache.axiom.soap.SOAPFactory; |
| import org.apache.axiom.soap.SOAPFault; |
| import org.apache.axiom.soap.SOAPFaultCode; |
| import org.apache.axiom.soap.SOAPFaultDetail; |
| import org.apache.axiom.soap.SOAPFaultReason; |
| import org.apache.axiom.soap.SOAPFaultSubCode; |
| import org.apache.axiom.soap.SOAPFaultText; |
| import org.apache.axiom.soap.SOAPVersion; |
| import org.apache.axis2.AxisFault; |
| import org.apache.axis2.addressing.AddressingConstants; |
| import org.apache.axis2.addressing.EndpointReference; |
| import org.apache.axis2.addressing.RelatesTo; |
| import org.apache.axis2.client.async.AxisCallback; |
| import org.apache.axis2.context.ConfigurationContext; |
| import org.apache.axis2.context.MessageContext; |
| import org.apache.axis2.context.OperationContext; |
| import org.apache.axis2.description.AxisOperation; |
| import org.apache.axis2.description.AxisService; |
| import org.apache.axis2.engine.AxisEngine; |
| import org.apache.axis2.engine.MessageReceiver; |
| import org.apache.axis2.engine.Handler.InvocationResponse; |
| import org.apache.axis2.transport.TransportUtils; |
| import org.apache.axis2.util.CallbackReceiver; |
| import org.apache.axis2.util.MessageContextBuilder; |
| import org.apache.axis2.wsdl.WSDLConstants; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.sandesha2.FaultData; |
| import org.apache.sandesha2.RMMsgContext; |
| import org.apache.sandesha2.Sandesha2Constants; |
| import org.apache.sandesha2.SandeshaException; |
| import org.apache.sandesha2.client.SandeshaClientConstants; |
| import org.apache.sandesha2.client.SandeshaListener; |
| import org.apache.sandesha2.i18n.SandeshaMessageHelper; |
| import org.apache.sandesha2.i18n.SandeshaMessageKeys; |
| import org.apache.sandesha2.storage.StorageManager; |
| import org.apache.sandesha2.storage.Transaction; |
| import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr; |
| import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr; |
| import org.apache.sandesha2.storage.beans.RMDBean; |
| import org.apache.sandesha2.storage.beans.RMSBean; |
| import org.apache.sandesha2.storage.beans.RMSequenceBean; |
| import org.apache.sandesha2.storage.beans.SenderBean; |
| import org.apache.sandesha2.wsrm.SequenceAcknowledgement; |
| import org.apache.sandesha2.wsrm.SequenceFault; |
| |
| /** |
| * Has logic to check for possible RM related faults and create it. |
| */ |
| |
| public class FaultManager { |
| |
| private static final Log log = LogFactory.getLog(FaultManager.class); |
| |
| /** |
| * Check weather the LastMessage number has been exceeded and generate the |
| * fault if it is. |
| * |
| * @param msgCtx |
| * @return |
| */ |
| public static void checkForLastMsgNumberExceeded(RMMsgContext applicationRMMessage, StorageManager storageManager) |
| throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::checkForLastMsgNumberExceeded"); |
| /* |
| * TODO - This code currently doesn't actually work |
| Sequence sequence = (Sequence) applicationRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE); |
| long messageNumber = sequence.getMessageNumber().getMessageNumber(); |
| String sequenceID = sequence.getIdentifier().getIdentifier(); |
| |
| boolean lastMessageNumberExceeded = false; |
| String reason = null; |
| |
| RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID); |
| if (rmsBean != null) { |
| long lastMessageNo = rmsBean.getLastOutMessage(); |
| if (messageNumber > lastMessageNo) { |
| lastMessageNumberExceeded = true; |
| reason = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberLargerThanLastMsg, Long |
| .toString(messageNumber), Long.toString(lastMessageNo)); |
| } |
| } |
| |
| if (lastMessageNumberExceeded) { |
| FaultData faultData = new FaultData(); |
| faultData.setType(Sandesha2Constants.SOAPFaults.FaultType.LAST_MESSAGE_NO_EXCEEDED); |
| int SOAPVersion = SandeshaUtil.getSOAPVersion(applicationRMMessage.getSOAPEnvelope()); |
| if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1) |
| faultData.setCode(SOAP11Constants.FAULT_CODE_SENDER); |
| else |
| faultData.setCode(SOAP12Constants.FAULT_CODE_SENDER); |
| |
| faultData.setSubcode(Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED); |
| faultData.setReason(reason); |
| |
| SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion); |
| String rmNamespace = applicationRMMessage.getRMNamespaceValue(); |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmNamespace, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(sequenceID); |
| |
| faultData.setDetail(identifierElement); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded, lastMessageNumberExceeded"); |
| getFault(applicationRMMessage, faultData, storageManager); |
| } |
| */ |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded"); |
| } |
| |
| public static RMMsgContext checkForMessageNumberRoleover(MessageContext messageContext) { |
| return null; |
| } |
| |
| /** |
| * Check whether a Sequence message (a) belongs to a unknown sequence |
| * (generates an UnknownSequence fault) (b) message number exceeds a |
| * predifined limit ( genenrates a Message Number Rollover fault) |
| * |
| * @param msgCtx |
| * @return true if no exception has been thrown and the sequence doesn't exist |
| * @throws SandeshaException |
| */ |
| public static boolean checkForUnknownSequence(RMMsgContext rmMessageContext, String sequenceID, |
| StorageManager storageManager, boolean piggybackedMessage) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::checkForUnknownSequence, " + sequenceID); |
| |
| boolean validSequence = false; |
| |
| // Look for an outbound sequence |
| if (SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID) != null) { |
| validSequence = true; |
| // Look for an inbound sequence |
| } else if(SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID) != null) { |
| validSequence = true; |
| } |
| |
| if (!validSequence) { |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sequence not valid " + sequenceID); |
| |
| // Return an UnknownSequence error |
| MessageContext messageContext = rmMessageContext.getMessageContext(); |
| |
| FaultData data = new FaultData(); |
| data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE )); |
| |
| SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory(); |
| |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(sequenceID); |
| |
| data.setDetail(identifierElement); |
| |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSequenceFault, sequenceID)); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForUnknownSequence, Sequence unknown"); |
| |
| boolean faultThrowable = !piggybackedMessage; |
| getOrSendFault(rmMessageContext, data, faultThrowable, null); //unknown sequence so cannot send to acksTo |
| return true; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForUnknownSequence"); |
| return false; |
| } |
| |
| /** |
| * Check weather the Acknowledgement is invalid and generate a fault if it |
| * is. |
| * |
| * @param msgCtx |
| * @return |
| * @throws SandeshaException |
| */ |
| public static boolean checkForInvalidAcknowledgement(RMMsgContext ackRMMessageContext, SequenceAcknowledgement sequenceAcknowledgement, |
| StorageManager storageManager, RMSBean rmsBean, boolean piggybackedMessage) |
| throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::checkForInvalidAcknowledgement"); |
| |
| boolean invalidAck = false; |
| List<Range> sequenceAckList = sequenceAcknowledgement.getAcknowledgementRanges(); |
| Iterator<Range> it = sequenceAckList.iterator(); |
| |
| while (it.hasNext()) { |
| Range acknowledgementRange = it.next(); |
| if (acknowledgementRange.lowerValue > acknowledgementRange.upperValue) { |
| invalidAck = true; |
| // check upper isn't bigger than the highest out msg number |
| } else if ( acknowledgementRange.upperValue > rmsBean.getHighestOutMessageNumber() ) { |
| invalidAck = true; |
| } |
| |
| if (invalidAck) { |
| makeInvalidAcknowledgementFault(ackRMMessageContext, sequenceAcknowledgement, |
| acknowledgementRange, storageManager, piggybackedMessage, rmsBean.getToEndpointReference()); |
| return true; |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForInvalidAcknowledgement: ack is valid"); |
| return false; |
| } |
| |
| /** |
| * Makes an InvalidAcknowledgement fault. |
| * @param rmMsgCtx |
| * @param storageManager |
| * @param message |
| * @throws AxisFault |
| */ |
| public static void makeInvalidAcknowledgementFault(RMMsgContext rmMsgCtx, |
| SequenceAcknowledgement sequenceAcknowledgement, Range acknowledgementRange, |
| StorageManager storageManager, boolean piggybackedMessage, EndpointReference acksToEPR) throws AxisFault { |
| FaultData data = new FaultData(); |
| SOAPVersion version = rmMsgCtx.getMessageContext().getEnvelope().getVersion(); |
| data.setCode(version.getSenderFaultCode()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("makingInvalidAck piggy=" + piggybackedMessage + ": soap=" + version); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT); |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMsgCtx.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT )); |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckFault)); |
| |
| data.setDetail(sequenceAcknowledgement.getOriginalSequenceAckElement()); |
| |
| boolean throwable = !piggybackedMessage; |
| getOrSendFault(rmMsgCtx, data, throwable, acksToEPR); |
| } |
| |
| /** |
| * Makes a Create sequence refused fault |
| */ |
| public static void makeCreateSequenceRefusedFault(RMMsgContext rmMessageContext, String detail, Exception e, EndpointReference acksToEPR) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::makeCreateSequenceRefusedFault, " + detail); |
| |
| // Return a CreateSequenceRefused error |
| MessageContext messageContext = rmMessageContext.getMessageContext(); |
| |
| FaultData data = new FaultData(); |
| data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED )); |
| |
| SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory(); |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(detail); |
| data.setDetail(identifierElement); |
| data.setDetailString(detail); |
| |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused)); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED); |
| |
| data.setExceptionString(SandeshaUtil.getStackTraceFromException(e)); |
| |
| if(log.isWarnEnabled()) |
| log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused) + ", " + detail); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::makeCreateSequenceRefusedFault"); |
| getOrSendFault(rmMessageContext, data, true, acksToEPR); |
| } |
| |
| /** |
| * Makes the WSMC UnsupportedSelectionFault |
| */ |
| public static void makeUnsupportedSelectionFault(RMMsgContext rmMessageContext, QName unsupportedElement) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::makeUnsupportedSelectionFault, " + unsupportedElement); |
| |
| // Return a UnsupportedSelectionFault error |
| |
| FaultData data = new FaultData(); |
| data.setCode(rmMessageContext.getMessageContext().getEnvelope().getVersion().getReceiverFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(Sandesha2Constants.SPEC_2007_02.MC_NS_URI, |
| Sandesha2Constants.SOAPFaults.FaultType.UNSUPPORTED_SELECTION )); |
| |
| SOAPFactory factory = (SOAPFactory)rmMessageContext.getMessageContext().getEnvelope().getOMFactory(); |
| OMElement element = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.UNSUPPORTED_ELEMENT, |
| Sandesha2Constants.SPEC_2007_02.MC_NS_URI, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_MC); |
| element.setText(unsupportedElement); |
| data.setDetail(element); |
| |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unsuportedSelectionFault)); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNSUPPORTED_SELECTION); |
| |
| makeMakeConnectionFault(rmMessageContext, data); |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::makeUnsupportedSelectionFault"); |
| } |
| |
| /** |
| * Makes WSMC MissingSelectionFault |
| */ |
| public static void makeMissingSelectionFault(RMMsgContext rmMessageContext) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::makeMissingSelectionFault"); |
| |
| FaultData data = new FaultData(); |
| data.setCode(rmMessageContext.getMessageContext().getEnvelope().getVersion().getReceiverFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(Sandesha2Constants.SPEC_2007_02.MC_NS_URI, |
| Sandesha2Constants.SOAPFaults.FaultType.MISSING_SELECTION )); |
| |
| data.setDetail(null); |
| |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.missingSelectionFault)); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.MISSING_SELECTION); |
| |
| makeMakeConnectionFault(rmMessageContext, data); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::makeMissingSelectionFault"); |
| } |
| |
| private static void makeMakeConnectionFault(RMMsgContext referenceRMMsgContext, FaultData data) throws AxisFault { |
| SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory(); |
| |
| boolean isSOAP12 = factory.getSOAPVersion() == SOAP12Version.getSingleton(); |
| SOAPFaultCode faultCode = factory.createSOAPFaultCode(); |
| QName faultCodeValue = isSOAP12 ? data.getCode() : data.getSubcode(); |
| faultCode.setValue(faultCodeValue); |
| if (isSOAP12) { |
| SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode); |
| faultSubCode.setValue(data.getSubcode()); |
| } |
| |
| SOAPFaultReason reason = factory.createSOAPFaultReason(); |
| SOAPFaultText reasonText = factory.createSOAPFaultText(); |
| reasonText.setText(data.getReason()); |
| |
| SOAPFaultDetail detail = factory.createSOAPFaultDetail(); |
| if (data.getDetail() != null) |
| detail.addDetailEntry(data.getDetail()); |
| |
| if (isSOAP12) { |
| reasonText.setLang(Sandesha2Constants.LANG_EN); |
| reason.addSOAPText(reasonText); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail); |
| |
| } else { |
| reason.setText(data.getReason()); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason); |
| |
| } |
| |
| AxisFault fault = new AxisFault(faultCodeValue, data.getReason(), "", "", data.getDetail()); |
| fault.setFaultAction(Sandesha2Constants.SPEC_2007_02.Actions.MC_FAULT); |
| |
| //if this is throwable throwing it out, else we will log here. |
| throw fault; |
| } |
| |
| /** |
| * Checks if a sequence is terminated and returns a SequenceTerminated fault. |
| * @param referenceRMMessage |
| * @param sequenceID |
| * @param rmdBean |
| * @return |
| * @throws AxisFault |
| */ |
| public static boolean checkForSequenceTerminated(RMMsgContext referenceRMMessage, String sequenceID, RMSequenceBean bean |
| , boolean piggybackedMessage) |
| |
| throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::checkForSequenceTerminated, " + sequenceID); |
| |
| if (bean!=null && bean.isTerminated()) { |
| MessageContext referenceMessage = referenceRMMessage.getMessageContext(); |
| FaultData data = new FaultData(); |
| data.setCode(referenceMessage.getEnvelope().getVersion().getSenderFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED )); |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID)); |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED); |
| |
| SOAPFactory factory = (SOAPFactory)referenceMessage.getEnvelope().getOMFactory(); |
| String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue(); |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(sequenceID); |
| |
| data.setDetail(identifierElement); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForSequenceTerminated, sequence terminated"); |
| |
| boolean throwable = !piggybackedMessage; |
| getOrSendFault(referenceRMMessage, data, throwable, bean.getAcksToEndpointReference()); |
| return true; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForSequenceTerminated"); |
| return false; |
| } |
| |
| public static boolean checkForSequenceClosed(RMMsgContext referenceRMMessage, String sequenceID, |
| RMDBean rmdBean, boolean piggybackedMessage) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID); |
| |
| if (rmdBean!=null && rmdBean.isClosed()) { |
| MessageContext referenceMessage = referenceRMMessage.getMessageContext(); |
| FaultData data = new FaultData(); |
| data.setCode(referenceMessage.getEnvelope().getVersion().getSenderFaultCode()); |
| |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED )); |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotAcceptMsgAsSequenceClosedFault)); |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED); |
| |
| SOAPFactory factory = (SOAPFactory)referenceMessage.getEnvelope().getOMFactory(); |
| String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue(); |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(sequenceID); |
| |
| data.setDetail(identifierElement); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed"); |
| |
| boolean throwable = !piggybackedMessage; |
| getOrSendFault(referenceRMMessage, data,throwable, rmdBean.getAcksToEndpointReference()); |
| return true; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::checkForSequenceClosed"); |
| return false; |
| } |
| |
| /** |
| * Adds the necessary Fault elements as properties to the message context. |
| * Or if this is a SOAP11 Fault, generates the correct RM Fault and sends. |
| * |
| * @param referenceRMMsgContext - Message in reference to which the fault will be generated. |
| * @param data - data for the fault |
| * @param throwable - This tells weather or not it is ok to throw the fault out. I.e. this should not be done when processing |
| * piggybacked acks since this will stop the carrier message from being processed. |
| * @return - The dummy fault to be thrown out. |
| * |
| * @throws AxisFault |
| */ |
| public static void getOrSendFault(RMMsgContext referenceRMMsgContext, FaultData data, boolean throwable, EndpointReference acksToEPR) throws AxisFault { |
| |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::getOrSendFault: " + referenceRMMsgContext + "," + data + "," + throwable + "," + acksToEPR); |
| |
| SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory(); |
| |
| SOAPFaultCode faultCode = factory.createSOAPFaultCode(); |
| faultCode.setValue(data.getCode()); |
| if (factory.getSOAPVersion() == SOAP12Version.getSingleton()) { |
| SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode); |
| faultSubCode.setValue(data.getSubcode()); |
| } |
| |
| SOAPFaultReason reason = factory.createSOAPFaultReason(); |
| |
| SOAPFaultDetail detail = factory.createSOAPFaultDetail(); |
| detail.addDetailEntry(data.getDetail()); |
| |
| String SOAPNamespaceValue = factory.getSoapVersionURI(); |
| |
| if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) |
| { |
| SOAPFaultText reasonText = factory.createSOAPFaultText(); |
| reasonText.setText(data.getReason()); |
| reasonText.setLang(Sandesha2Constants.LANG_EN); |
| reason.addSOAPText(reasonText); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason); |
| referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail); |
| |
| AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail()); |
| fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion())); |
| |
| //if this is throwable throwing it out, else we will log here. |
| |
| if (throwable) |
| { |
| if (referenceRMMsgContext.getMessageContext().isServerSide()) { |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::getOrSendFault: " + fault); |
| throw fault; |
| } |
| } |
| else |
| log.error("Sandesha2 got a fault when processing the message " + referenceRMMsgContext.getMessageId(), fault); |
| } |
| else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals (SOAPNamespaceValue)) |
| { |
| |
| reason.setText(data.getReason()); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail); |
| referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason); |
| // Need to send this message as the Axis Layer doesn't set the "SequenceFault" header |
| MessageContext faultMessageContext = |
| MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null); |
| if(acksToEPR!=null){ |
| if (log.isDebugEnabled()) |
| log.debug("Debug: FaultManager::getOrSendFault: rewrriting fault destination EPR to " + acksToEPR); |
| faultMessageContext.setTo(acksToEPR); |
| } |
| |
| SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue()); |
| |
| // Set the action // TODO SET THE ACTION BASED ON THE SPEC |
| faultMessageContext.setWSAAction( |
| SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion())); |
| |
| if(throwable) |
| { |
| if (log.isDebugEnabled()) |
| log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader()); |
| |
| //Sending the message |
| //having a surrounded try block will make sure that the error is logged here |
| //and that this does not disturb the processing of a carrier message. |
| try { |
| AxisEngine.sendFault(faultMessageContext); |
| |
| EndpointReference destination = faultMessageContext.getTo(); |
| if(destination == null || destination.hasAnonymousAddress()) { |
| TransportUtils.setResponseWritten(referenceRMMsgContext.getMessageContext(), true); |
| } |
| } catch (Exception e) { |
| AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail()); |
| String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFaultDueToException, fault.getMessage(), e.getMessage()); |
| log.error(message); |
| } |
| } |
| else |
| { |
| AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail()); |
| String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFaultDueToException, fault.getMessage()); |
| log.error(message); |
| } |
| } |
| else |
| { |
| String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion); |
| throw new SandeshaException (message); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::getOrSendFault"); |
| } |
| |
| public static boolean isRMFault (String faultSubcodeValue) { |
| if (faultSubcodeValue==null) |
| return false; |
| |
| if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.INVALID_ACKNOWLEDGEMENT.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_CLOSED.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equalsIgnoreCase (faultSubcodeValue) || |
| Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equalsIgnoreCase (faultSubcodeValue) ) { |
| |
| return true; |
| } |
| |
| return false; |
| |
| } |
| |
| private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::manageIncomingFault"); |
| InvocationResponse response = InvocationResponse.CONTINUE; |
| if (log.isErrorEnabled()) |
| log.error(fault); |
| |
| SandeshaListener listner = (SandeshaListener) rmMsgCtx.getProperty(SandeshaClientConstants.SANDESHA_LISTENER); |
| if (listner!=null) |
| listner.onError(fault); |
| |
| // Get the SOAPVersion |
| SOAPFactory factory = (SOAPFactory) rmMsgCtx.getSOAPEnvelope().getOMFactory(); |
| String SOAPNamespaceValue = factory.getSoapVersionURI(); |
| |
| String soapFaultSubcode = null; |
| String identifier = null; |
| boolean isSOAP11SequenceUnknownFault = false; |
| |
| if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) { |
| // Need to get the sequence part from the Header. |
| if (log.isDebugEnabled()) |
| log.debug("soap11"); |
| // try { |
| SequenceFault sequenceFault = rmMsgCtx.getSequenceFault(); |
| |
| // If the sequence fault part is not null, then we have an RM specific fault. |
| if (sequenceFault != null) { |
| soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart(); |
| // Get the identifier - if there is one. |
| identifier = sequenceFault.getFaultCode().getDetail(); |
| isSOAP11SequenceUnknownFault = true; |
| if (log.isDebugEnabled()) |
| log.debug("isSOAP11SequenceUnknownFault " + identifier); |
| } |
| |
| // } catch (SandeshaException e) { |
| // if (log.isDebugEnabled()) |
| // log.debug("Unable to process SequenceFault", e); |
| // } |
| } |
| |
| // If we haven't found a soapFaultSubcode at this point - look inside the AxisFault for the information. |
| if (soapFaultSubcode == null && faultPart.getCode() != null && |
| faultPart.getCode().getSubCode() != null && |
| faultPart.getCode().getSubCode().getValue() != null) |
| |
| soapFaultSubcode = faultPart.getCode().getSubCode().getValue().getTextAsQName().getLocalPart(); |
| |
| // Get the identifier, if there is one. |
| SOAPFaultDetail detail = faultPart.getDetail(); |
| if (detail != null && !isSOAP11SequenceUnknownFault) |
| { |
| // At this point we may not know what RM NS is in use. |
| OMElement identifierOM = detail.getFirstChildWithName(new QName(Sandesha2Constants.SPEC_2005_02.NS_URI, |
| Sandesha2Constants.WSRM_COMMON.IDENTIFIER)); |
| if (identifierOM != null){ |
| identifier = identifierOM.getText(); |
| }else{ |
| identifierOM = detail.getFirstChildWithName(new QName(Sandesha2Constants.SPEC_2007_02.NS_URI, |
| Sandesha2Constants.WSRM_COMMON.IDENTIFIER)); |
| if (identifierOM != null){ |
| identifier = identifierOM.getText(); |
| } |
| } |
| } |
| |
| if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equals(soapFaultSubcode)) { |
| processCreateSequenceRefusedFault(rmMsgCtx, fault); |
| } else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) || |
| Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) || |
| Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode)) { |
| processSequenceUnknownFault(rmMsgCtx, fault, identifier, transaction); |
| } |
| |
| // If the operation is an Sandesha In Only operation, or the fault is a recognised fault, |
| // then stop the message from being processed further. |
| // To configure the actions for Sandesha to drop, add them to the module.xml under |
| // Sandesha2InOnly operation. |
| if (isRMFault(soapFaultSubcode)) |
| response = InvocationResponse.ABORT; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::manageIncomingFault, " + response); |
| return response; |
| } |
| |
| public static InvocationResponse processMessagesForFaults (RMMsgContext rmMsgCtx, StorageManager storageManager) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::processMessagesForFaults"); |
| |
| InvocationResponse response = InvocationResponse.CONTINUE; |
| |
| // Rather than look for a SOAPFault, check the action of the message to see if that matches |
| // an RM fault or is the addressing fault (RM 1.0). |
| |
| boolean isFault = isRMFaultAction(rmMsgCtx.getMessageContext().getWSAAction()); |
| |
| if (isFault) { |
| Transaction transaction = null; |
| |
| try { |
| transaction = storageManager.getTransaction(); |
| |
| SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope(); |
| if (envelope==null) |
| return response; |
| |
| SOAPFault faultPart = envelope.getBody().getFault(); |
| |
| // constructing the fault |
| AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, rmMsgCtx); |
| response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, transaction); |
| |
| if(transaction != null && transaction.isActive()) transaction.commit(); |
| transaction = null; |
| } finally { |
| if (transaction != null && transaction.isActive()) |
| transaction.rollback(); |
| } |
| } |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processMessagesForFaults, " + response); |
| return response; |
| } |
| |
| |
| private static boolean isRMFaultAction(String action) { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::processMessagesForFaults , "+action); |
| |
| boolean isFaultAction = false; |
| |
| if (AddressingConstants.Final.WSA_FAULT_ACTION.equals(action) || |
| Sandesha2Constants.SPEC_2007_02.Actions.SOAP_ACTION_FAULT.equals(action)) |
| isFaultAction = true; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::processMessagesForFaults , "+isFaultAction); |
| return isFaultAction; |
| } |
| |
| private static AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart, RMMsgContext rmMsgCtx) { |
| |
| String soapFaultSubcode = null; |
| SequenceFault sequenceFault = rmMsgCtx.getSequenceFault(); |
| AxisFault axisFault = null; |
| |
| // If the sequence fault part is not null, then we have an RM specific fault. |
| if (sequenceFault != null) { |
| soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart(); |
| |
| //Need to concatenate all info about the error into string |
| StringBuffer faultSB = new StringBuffer(); |
| faultSB.append(soapFaultSubcode + " "); |
| faultSB.append(sequenceFault.getFaultCode().getDetail() + " "); |
| faultSB.append(faultPart.getDetail().getText()); |
| |
| axisFault = new AxisFault(faultSB.toString(), sequenceFault.getFaultCode().getFaultCode()); |
| } else { |
| axisFault = new AxisFault(faultPart.getCode(), faultPart.getReason(), faultPart.getNode(), faultPart |
| .getRole(), faultPart.getDetail()); |
| } |
| |
| return axisFault; |
| } |
| |
| /** |
| * Checks to see if the message number received is == to the Long.MAX_VALUE |
| * |
| * Throws and AxisFault, or sends a Fault message if the condition is met. |
| * @throws AxisFault |
| */ |
| public static boolean checkForMessageRolledOver(RMMsgContext rmMessageContext, String sequenceId, long msgNo, RMDBean bean)throws AxisFault { |
| if (msgNo == Long.MAX_VALUE) { |
| if (log.isDebugEnabled()) |
| log.debug("Max message number reached " + msgNo); |
| // Return a CreateSequenceRefused error |
| MessageContext messageContext = rmMessageContext.getMessageContext(); |
| |
| FaultData data = new FaultData(); |
| data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode()); |
| data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(), |
| Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER )); |
| |
| SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory(); |
| OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER, |
| rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| identifierElement.setText(sequenceId); |
| |
| OMElement maxMsgNumber = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.MAX_MSG_NUMBER, |
| rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM); |
| maxMsgNumber.setText(Long.toString(msgNo)); |
| |
| data.setDetail(identifierElement); |
| data.setDetail2(maxMsgNumber); |
| |
| data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.messageNumberRollover)); |
| |
| data.setType(Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER); |
| |
| getOrSendFault(rmMessageContext, data, true, bean.getAcksToEndpointReference()); |
| |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * On receipt of a CreateSequenceRefused fault, terminate the sequence and notify any waiting |
| * clients of the error. |
| * @param fault |
| * @throws AxisFault |
| */ |
| private static void processCreateSequenceRefusedFault(RMMsgContext rmMsgCtx, AxisFault fault) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::processCreateSequenceRefusedFault"); |
| |
| ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext(); |
| |
| if(log.isWarnEnabled()){ |
| String name = "NOT_SET"; |
| if(rmMsgCtx != null){ |
| MessageContext mCtx = rmMsgCtx.getMessageContext(); |
| if(mCtx != null){ |
| AxisService axisSvc = mCtx.getAxisService(); |
| if(axisSvc != null){ |
| name = axisSvc.getName(); |
| } |
| } |
| } |
| log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reliableMessagingNotEnabled, name)); |
| } |
| |
| StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx |
| .getAxisConfiguration()); |
| |
| RelatesTo relatesTo = rmMsgCtx.getMessageContext().getRelatesTo(); |
| String createSeqMsgId = null; |
| if (relatesTo != null) { |
| createSeqMsgId = relatesTo.getValue(); |
| } else { |
| // Work out the related message from the operation context |
| OperationContext context = rmMsgCtx.getMessageContext().getOperationContext(); |
| MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE); |
| if(createSeq != null) createSeqMsgId = createSeq.getMessageID(); |
| } |
| if(createSeqMsgId == null) { |
| String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable); |
| log.error(message); |
| throw new SandeshaException(message); |
| } |
| |
| SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr(); |
| RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr(); |
| |
| RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId); |
| if (rmsBean == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Unable to find RMSBean"); |
| return; |
| } |
| if(rmsBean.getSequenceID()!=null){ |
| //we got a cseqRefused but the sequence is already setup - this implies a timing condition whereby several resends of the cSeqReq have been sent out. |
| //The best thing to do here is to ignore it. |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Sequence already established - no requirement to cleanup"); |
| return; |
| } |
| |
| // Indicate that there was an error when sending the Create Sequence. |
| rmsBean.setLastSendError(fault); |
| // Mark the sequence as terminated |
| rmsBean.setTerminated(true); |
| |
| // Update the RMSBean |
| rmsBeanMgr.update(rmsBean); |
| |
| SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId); |
| if (createSequenceSenderBean == null) |
| throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound)); |
| |
| // deleting the create sequence entry. |
| retransmitterMgr.delete(createSeqMsgId); |
| |
| // Notify the clients of a failure |
| notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault); |
| |
| rmMsgCtx.pause(); |
| |
| // Cleanup sending side. |
| if (log.isDebugEnabled()) |
| log.debug("Terminating sending sequence " + rmsBean); |
| TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processCreateSequenceRefusedFault"); |
| } |
| |
| /** |
| * If the RMD returns a SequenceTerminated, or an Unknown sequence fault, then we should |
| * mark the RMS Sequence as terminated and notify clients of the error. |
| * |
| * @param rmMsgCtx |
| * @param fault |
| * @param identifier |
| */ |
| private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault { |
| if (log.isDebugEnabled()) |
| log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID); |
| |
| ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext(); |
| |
| StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx |
| .getAxisConfiguration()); |
| |
| // Find the rmsBean |
| RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID); |
| if (rmsBean != null) { |
| |
| rmMsgCtx.pause(); |
| |
| // Cleanup sending side. |
| if (log.isDebugEnabled()) |
| log.debug("Terminating sending sequence " + rmsBean); |
| if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction)){ |
| // We did not reallocate so we notify the clients of a failure |
| notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault); |
| |
| //Mark the RMSBean as reallocation failed and update last activation time |
| transaction = storageManager.getTransaction(); |
| rmsBean.setLastActivatedTime(System.currentTimeMillis()); |
| storageManager.getRMSBeanMgr().update(rmsBean); |
| if(transaction != null && transaction.isActive()) transaction.commit(); |
| } |
| } |
| else { |
| RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID); |
| if (rmdBean != null) { |
| rmMsgCtx.pause(); |
| |
| // Cleanup sending side. |
| if (log.isDebugEnabled()) |
| log.debug("Terminating sending sequence " + rmdBean); |
| TerminateManager.cleanReceivingSideOnTerminateMessage(configCtx, rmdBean.getSequenceID(), storageManager); |
| |
| // Update the last activated time. |
| rmdBean.setLastActivatedTime(System.currentTimeMillis()); |
| |
| // Update the bean in the map |
| storageManager.getRMDBeanMgr().update(rmdBean); |
| |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processSequenceUnknownFault Unable to find sequence"); |
| return; |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Exit: FaultManager::processSequenceUnknownFault"); |
| } |
| |
| static void notifyClientsOfFault(String internalSequenceId, |
| StorageManager storageManager, ConfigurationContext configCtx, AxisFault fault) throws SandeshaException { |
| // Locate and update all of the messages for this sequence, now that we know |
| // the sequence id. |
| SenderBean target = new SenderBean(); |
| target.setInternalSequenceID(internalSequenceId); |
| |
| Iterator<SenderBean> iterator = storageManager.getSenderBeanMgr().find(target).iterator(); |
| while (iterator.hasNext()) { |
| SenderBean tempBean = iterator.next(); |
| |
| if (tempBean.getMessageType() != Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG && |
| tempBean.getMessageType() != Sandesha2Constants.MessageTypes.ACK) { |
| String messageStoreKey = tempBean.getMessageContextRefKey(); |
| |
| // Retrieve the message context. |
| MessageContext context = storageManager.retrieveMessageContext(messageStoreKey, configCtx); |
| |
| AxisOperation axisOperation = context.getAxisOperation(); |
| if (axisOperation != null) { |
| |
| MessageReceiver msgReceiver = axisOperation.getMessageReceiver(); |
| if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) { |
| |
| Object callback = ((CallbackReceiver)msgReceiver).lookupCallback(context.getMessageID()); |
| if(callback instanceof AxisCallback) { |
| try { |
| ((CallbackReceiver)msgReceiver).addCallback(context.getMessageID(),(AxisCallback)callback); |
| } catch (AxisFault axisFault) { |
| throw new SandeshaException(axisFault); |
| } |
| |
| ((AxisCallback)callback).onError(fault); |
| } |
| // this is actually to support synapse. Synpase Axis Operation does not have a callBackMessageReceiver |
| // synapse AxisOperation always has the synapse message receiver. And also to be send in the synapse |
| // fault mediators we need to set the SENDING_FAULT property as well. |
| } else if (msgReceiver != null && tempBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) { |
| try { |
| //since there is no reponse we set this message as the fault reply |
| context.getOptions().setRelationships(new RelatesTo[]{new RelatesTo(context.getMessageID())}); |
| context.setProperty("SENDING_FAULT", Boolean.TRUE); |
| msgReceiver.receive(context); |
| } catch (AxisFault axisFault) { |
| log.error(axisFault.getMessage()); |
| throw new SandeshaException("Can not invoke the message receiver ", axisFault); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |