blob: 111b4d3970aa028e4587c9dd913df8d08c4b71cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sandesha2.util;
import java.util.Iterator;
import java.util.List;
import javax.xml.namespace.QName;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAP11Constants;
import org.apache.axiom.soap.SOAP12Constants;
import org.apache.axiom.soap.SOAP12Version;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axiom.soap.SOAPFault;
import org.apache.axiom.soap.SOAPFaultCode;
import org.apache.axiom.soap.SOAPFaultDetail;
import org.apache.axiom.soap.SOAPFaultReason;
import org.apache.axiom.soap.SOAPFaultSubCode;
import org.apache.axiom.soap.SOAPFaultText;
import org.apache.axiom.soap.SOAPVersion;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.client.async.AxisCallback;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.FaultData;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.SequenceFault;
/**
* Has logic to check for possible RM related faults and create it.
*/
public class FaultManager {
private static final Log log = LogFactory.getLog(FaultManager.class);
/**
* Check weather the LastMessage number has been exceeded and generate the
* fault if it is.
*
* @param msgCtx
* @return
*/
public static void checkForLastMsgNumberExceeded(RMMsgContext applicationRMMessage, StorageManager storageManager)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::checkForLastMsgNumberExceeded");
/*
* TODO - This code currently doesn't actually work
Sequence sequence = (Sequence) applicationRMMessage.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
long messageNumber = sequence.getMessageNumber().getMessageNumber();
String sequenceID = sequence.getIdentifier().getIdentifier();
boolean lastMessageNumberExceeded = false;
String reason = null;
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
if (rmsBean != null) {
long lastMessageNo = rmsBean.getLastOutMessage();
if (messageNumber > lastMessageNo) {
lastMessageNumberExceeded = true;
reason = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberLargerThanLastMsg, Long
.toString(messageNumber), Long.toString(lastMessageNo));
}
}
if (lastMessageNumberExceeded) {
FaultData faultData = new FaultData();
faultData.setType(Sandesha2Constants.SOAPFaults.FaultType.LAST_MESSAGE_NO_EXCEEDED);
int SOAPVersion = SandeshaUtil.getSOAPVersion(applicationRMMessage.getSOAPEnvelope());
if (SOAPVersion == Sandesha2Constants.SOAPVersion.v1_1)
faultData.setCode(SOAP11Constants.FAULT_CODE_SENDER);
else
faultData.setCode(SOAP12Constants.FAULT_CODE_SENDER);
faultData.setSubcode(Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED);
faultData.setReason(reason);
SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
String rmNamespace = applicationRMMessage.getRMNamespaceValue();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmNamespace, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(sequenceID);
faultData.setDetail(identifierElement);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded, lastMessageNumberExceeded");
getFault(applicationRMMessage, faultData, storageManager);
}
*/
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForLastMsgNumberExceeded");
}
public static RMMsgContext checkForMessageNumberRoleover(MessageContext messageContext) {
return null;
}
/**
* Check whether a Sequence message (a) belongs to a unknown sequence
* (generates an UnknownSequence fault) (b) message number exceeds a
* predifined limit ( genenrates a Message Number Rollover fault)
*
* @param msgCtx
* @return true if no exception has been thrown and the sequence doesn't exist
* @throws SandeshaException
*/
public static boolean checkForUnknownSequence(RMMsgContext rmMessageContext, String sequenceID,
StorageManager storageManager, boolean piggybackedMessage) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::checkForUnknownSequence, " + sequenceID);
boolean validSequence = false;
// Look for an outbound sequence
if (SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID) != null) {
validSequence = true;
// Look for an inbound sequence
} else if(SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID) != null) {
validSequence = true;
}
if (!validSequence) {
if (log.isDebugEnabled())
log.debug("Sequence not valid " + sequenceID);
// Return an UnknownSequence error
MessageContext messageContext = rmMessageContext.getMessageContext();
FaultData data = new FaultData();
data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE ));
SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(sequenceID);
data.setDetail(identifierElement);
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSequenceFault, sequenceID));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNKNOWN_SEQUENCE);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForUnknownSequence, Sequence unknown");
boolean faultThrowable = !piggybackedMessage;
getOrSendFault(rmMessageContext, data, faultThrowable, null); //unknown sequence so cannot send to acksTo
return true;
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForUnknownSequence");
return false;
}
/**
* Check weather the Acknowledgement is invalid and generate a fault if it
* is.
*
* @param msgCtx
* @return
* @throws SandeshaException
*/
public static boolean checkForInvalidAcknowledgement(RMMsgContext ackRMMessageContext, SequenceAcknowledgement sequenceAcknowledgement,
StorageManager storageManager, RMSBean rmsBean, boolean piggybackedMessage)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::checkForInvalidAcknowledgement");
boolean invalidAck = false;
List<Range> sequenceAckList = sequenceAcknowledgement.getAcknowledgementRanges();
Iterator<Range> it = sequenceAckList.iterator();
while (it.hasNext()) {
Range acknowledgementRange = it.next();
if (acknowledgementRange.lowerValue > acknowledgementRange.upperValue) {
invalidAck = true;
// check upper isn't bigger than the highest out msg number
} else if ( acknowledgementRange.upperValue > rmsBean.getHighestOutMessageNumber() ) {
invalidAck = true;
}
if (invalidAck) {
makeInvalidAcknowledgementFault(ackRMMessageContext, sequenceAcknowledgement,
acknowledgementRange, storageManager, piggybackedMessage, rmsBean.getToEndpointReference());
return true;
}
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForInvalidAcknowledgement: ack is valid");
return false;
}
/**
* Makes an InvalidAcknowledgement fault.
* @param rmMsgCtx
* @param storageManager
* @param message
* @throws AxisFault
*/
public static void makeInvalidAcknowledgementFault(RMMsgContext rmMsgCtx,
SequenceAcknowledgement sequenceAcknowledgement, Range acknowledgementRange,
StorageManager storageManager, boolean piggybackedMessage, EndpointReference acksToEPR) throws AxisFault {
FaultData data = new FaultData();
SOAPVersion version = rmMsgCtx.getMessageContext().getEnvelope().getVersion();
data.setCode(version.getSenderFaultCode());
if (log.isDebugEnabled())
log.debug("makingInvalidAck piggy=" + piggybackedMessage + ": soap=" + version);
data.setType(Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT);
data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMsgCtx.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.INVALID_ACKNOWLEDGEMENT ));
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidAckFault));
data.setDetail(sequenceAcknowledgement.getOriginalSequenceAckElement());
boolean throwable = !piggybackedMessage;
getOrSendFault(rmMsgCtx, data, throwable, acksToEPR);
}
/**
* Makes a Create sequence refused fault
*/
public static void makeCreateSequenceRefusedFault(RMMsgContext rmMessageContext, String detail, Exception e, EndpointReference acksToEPR) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::makeCreateSequenceRefusedFault, " + detail);
// Return a CreateSequenceRefused error
MessageContext messageContext = rmMessageContext.getMessageContext();
FaultData data = new FaultData();
data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED ));
SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(detail);
data.setDetail(identifierElement);
data.setDetailString(detail);
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.CREATE_SEQUENCE_REFUSED);
data.setExceptionString(SandeshaUtil.getStackTraceFromException(e));
if(log.isWarnEnabled())
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused) + ", " + detail);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::makeCreateSequenceRefusedFault");
getOrSendFault(rmMessageContext, data, true, acksToEPR);
}
/**
* Makes the WSMC UnsupportedSelectionFault
*/
public static void makeUnsupportedSelectionFault(RMMsgContext rmMessageContext, QName unsupportedElement) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::makeUnsupportedSelectionFault, " + unsupportedElement);
// Return a UnsupportedSelectionFault error
FaultData data = new FaultData();
data.setCode(rmMessageContext.getMessageContext().getEnvelope().getVersion().getReceiverFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(Sandesha2Constants.SPEC_2007_02.MC_NS_URI,
Sandesha2Constants.SOAPFaults.FaultType.UNSUPPORTED_SELECTION ));
SOAPFactory factory = (SOAPFactory)rmMessageContext.getMessageContext().getEnvelope().getOMFactory();
OMElement element = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.UNSUPPORTED_ELEMENT,
Sandesha2Constants.SPEC_2007_02.MC_NS_URI, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_MC);
element.setText(unsupportedElement);
data.setDetail(element);
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unsuportedSelectionFault));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.UNSUPPORTED_SELECTION);
makeMakeConnectionFault(rmMessageContext, data);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::makeUnsupportedSelectionFault");
}
/**
* Makes WSMC MissingSelectionFault
*/
public static void makeMissingSelectionFault(RMMsgContext rmMessageContext) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::makeMissingSelectionFault");
FaultData data = new FaultData();
data.setCode(rmMessageContext.getMessageContext().getEnvelope().getVersion().getReceiverFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(Sandesha2Constants.SPEC_2007_02.MC_NS_URI,
Sandesha2Constants.SOAPFaults.FaultType.MISSING_SELECTION ));
data.setDetail(null);
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.missingSelectionFault));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.MISSING_SELECTION);
makeMakeConnectionFault(rmMessageContext, data);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::makeMissingSelectionFault");
}
private static void makeMakeConnectionFault(RMMsgContext referenceRMMsgContext, FaultData data) throws AxisFault {
SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory();
boolean isSOAP12 = factory.getSOAPVersion() == SOAP12Version.getSingleton();
SOAPFaultCode faultCode = factory.createSOAPFaultCode();
QName faultCodeValue = isSOAP12 ? data.getCode() : data.getSubcode();
faultCode.setValue(faultCodeValue);
if (isSOAP12) {
SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode);
faultSubCode.setValue(data.getSubcode());
}
SOAPFaultReason reason = factory.createSOAPFaultReason();
SOAPFaultText reasonText = factory.createSOAPFaultText();
reasonText.setText(data.getReason());
SOAPFaultDetail detail = factory.createSOAPFaultDetail();
if (data.getDetail() != null)
detail.addDetailEntry(data.getDetail());
if (isSOAP12) {
reasonText.setLang(Sandesha2Constants.LANG_EN);
reason.addSOAPText(reasonText);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
} else {
reason.setText(data.getReason());
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
}
AxisFault fault = new AxisFault(faultCodeValue, data.getReason(), "", "", data.getDetail());
fault.setFaultAction(Sandesha2Constants.SPEC_2007_02.Actions.MC_FAULT);
//if this is throwable throwing it out, else we will log here.
throw fault;
}
/**
* Checks if a sequence is terminated and returns a SequenceTerminated fault.
* @param referenceRMMessage
* @param sequenceID
* @param rmdBean
* @return
* @throws AxisFault
*/
public static boolean checkForSequenceTerminated(RMMsgContext referenceRMMessage, String sequenceID, RMSequenceBean bean
, boolean piggybackedMessage)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::checkForSequenceTerminated, " + sequenceID);
if (bean!=null && bean.isTerminated()) {
MessageContext referenceMessage = referenceRMMessage.getMessageContext();
FaultData data = new FaultData();
data.setCode(referenceMessage.getEnvelope().getVersion().getSenderFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED ));
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED);
SOAPFactory factory = (SOAPFactory)referenceMessage.getEnvelope().getOMFactory();
String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(sequenceID);
data.setDetail(identifierElement);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForSequenceTerminated, sequence terminated");
boolean throwable = !piggybackedMessage;
getOrSendFault(referenceRMMessage, data, throwable, bean.getAcksToEndpointReference());
return true;
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForSequenceTerminated");
return false;
}
public static boolean checkForSequenceClosed(RMMsgContext referenceRMMessage, String sequenceID,
RMDBean rmdBean, boolean piggybackedMessage) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::checkForSequenceClosed, " + sequenceID);
if (rmdBean!=null && rmdBean.isClosed()) {
MessageContext referenceMessage = referenceRMMessage.getMessageContext();
FaultData data = new FaultData();
data.setCode(referenceMessage.getEnvelope().getVersion().getSenderFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED ));
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotAcceptMsgAsSequenceClosedFault));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_CLOSED);
SOAPFactory factory = (SOAPFactory)referenceMessage.getEnvelope().getOMFactory();
String rmNamespaceValue = referenceRMMessage.getRMNamespaceValue();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmNamespaceValue, Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(sequenceID);
data.setDetail(identifierElement);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForSequenceClosed, sequence closed");
boolean throwable = !piggybackedMessage;
getOrSendFault(referenceRMMessage, data,throwable, rmdBean.getAcksToEndpointReference());
return true;
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::checkForSequenceClosed");
return false;
}
/**
* Adds the necessary Fault elements as properties to the message context.
* Or if this is a SOAP11 Fault, generates the correct RM Fault and sends.
*
* @param referenceRMMsgContext - Message in reference to which the fault will be generated.
* @param data - data for the fault
* @param throwable - This tells weather or not it is ok to throw the fault out. I.e. this should not be done when processing
* piggybacked acks since this will stop the carrier message from being processed.
* @return - The dummy fault to be thrown out.
*
* @throws AxisFault
*/
public static void getOrSendFault(RMMsgContext referenceRMMsgContext, FaultData data, boolean throwable, EndpointReference acksToEPR) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::getOrSendFault: " + referenceRMMsgContext + "," + data + "," + throwable + "," + acksToEPR);
SOAPFactory factory = (SOAPFactory) referenceRMMsgContext.getSOAPEnvelope().getOMFactory();
SOAPFaultCode faultCode = factory.createSOAPFaultCode();
faultCode.setValue(data.getCode());
if (factory.getSOAPVersion() == SOAP12Version.getSingleton()) {
SOAPFaultSubCode faultSubCode = factory.createSOAPFaultSubCode(faultCode);
faultSubCode.setValue(data.getSubcode());
}
SOAPFaultReason reason = factory.createSOAPFaultReason();
SOAPFaultDetail detail = factory.createSOAPFaultDetail();
detail.addDetailEntry(data.getDetail());
String SOAPNamespaceValue = factory.getSoapVersionURI();
if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue))
{
SOAPFaultText reasonText = factory.createSOAPFaultText();
reasonText.setText(data.getReason());
reasonText.setLang(Sandesha2Constants.LANG_EN);
reason.addSOAPText(reasonText);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_REASON_LOCAL_NAME, reason);
referenceRMMsgContext.setProperty(SOAP12Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail());
fault.setFaultAction(SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
//if this is throwable throwing it out, else we will log here.
if (throwable)
{
if (referenceRMMsgContext.getMessageContext().isServerSide()) {
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::getOrSendFault: " + fault);
throw fault;
}
}
else
log.error("Sandesha2 got a fault when processing the message " + referenceRMMsgContext.getMessageId(), fault);
}
else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals (SOAPNamespaceValue))
{
reason.setText(data.getReason());
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_CODE_LOCAL_NAME, faultCode);
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_DETAIL_LOCAL_NAME, detail);
referenceRMMsgContext.setProperty(SOAP11Constants.SOAP_FAULT_STRING_LOCAL_NAME, reason);
// Need to send this message as the Axis Layer doesn't set the "SequenceFault" header
MessageContext faultMessageContext =
MessageContextBuilder.createFaultMessageContext(referenceRMMsgContext.getMessageContext(), null);
if(acksToEPR!=null){
if (log.isDebugEnabled())
log.debug("Debug: FaultManager::getOrSendFault: rewrriting fault destination EPR to " + acksToEPR);
faultMessageContext.setTo(acksToEPR);
}
SOAPFaultEnvelopeCreator.addSOAPFaultEnvelope(faultMessageContext, Sandesha2Constants.SOAPVersion.v1_1, data, referenceRMMsgContext.getRMNamespaceValue());
// Set the action // TODO SET THE ACTION BASED ON THE SPEC
faultMessageContext.setWSAAction(
SpecSpecificConstants.getAddressingFaultAction(referenceRMMsgContext.getRMSpecVersion()));
if(throwable)
{
if (log.isDebugEnabled())
log.debug("Sending fault message " + faultMessageContext.getEnvelope().getHeader());
//Sending the message
//having a surrounded try block will make sure that the error is logged here
//and that this does not disturb the processing of a carrier message.
try {
AxisEngine.sendFault(faultMessageContext);
EndpointReference destination = faultMessageContext.getTo();
if(destination == null || destination.hasAnonymousAddress()) {
TransportUtils.setResponseWritten(referenceRMMsgContext.getMessageContext(), true);
}
} catch (Exception e) {
AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail());
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFaultDueToException, fault.getMessage(), e.getMessage());
log.error(message);
}
}
else
{
AxisFault fault = new AxisFault(data.getCode(), data.getReason(), "", "", data.getDetail());
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendFaultDueToException, fault.getMessage());
log.error(message);
}
}
else
{
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unknownSoapVersion);
throw new SandeshaException (message);
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::getOrSendFault");
}
public static boolean isRMFault (String faultSubcodeValue) {
if (faultSubcodeValue==null)
return false;
if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.INVALID_ACKNOWLEDGEMENT.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.LAST_MESSAGE_NO_EXCEEDED.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_CLOSED.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equalsIgnoreCase (faultSubcodeValue) ||
Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equalsIgnoreCase (faultSubcodeValue) ) {
return true;
}
return false;
}
private static InvocationResponse manageIncomingFault (AxisFault fault, RMMsgContext rmMsgCtx, SOAPFault faultPart, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::manageIncomingFault");
InvocationResponse response = InvocationResponse.CONTINUE;
if (log.isErrorEnabled())
log.error(fault);
SandeshaListener listner = (SandeshaListener) rmMsgCtx.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
if (listner!=null)
listner.onError(fault);
// Get the SOAPVersion
SOAPFactory factory = (SOAPFactory) rmMsgCtx.getSOAPEnvelope().getOMFactory();
String SOAPNamespaceValue = factory.getSoapVersionURI();
String soapFaultSubcode = null;
String identifier = null;
boolean isSOAP11SequenceUnknownFault = false;
if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(SOAPNamespaceValue)) {
// Need to get the sequence part from the Header.
if (log.isDebugEnabled())
log.debug("soap11");
// try {
SequenceFault sequenceFault = rmMsgCtx.getSequenceFault();
// If the sequence fault part is not null, then we have an RM specific fault.
if (sequenceFault != null) {
soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart();
// Get the identifier - if there is one.
identifier = sequenceFault.getFaultCode().getDetail();
isSOAP11SequenceUnknownFault = true;
if (log.isDebugEnabled())
log.debug("isSOAP11SequenceUnknownFault " + identifier);
}
// } catch (SandeshaException e) {
// if (log.isDebugEnabled())
// log.debug("Unable to process SequenceFault", e);
// }
}
// If we haven't found a soapFaultSubcode at this point - look inside the AxisFault for the information.
if (soapFaultSubcode == null && faultPart.getCode() != null &&
faultPart.getCode().getSubCode() != null &&
faultPart.getCode().getSubCode().getValue() != null)
soapFaultSubcode = faultPart.getCode().getSubCode().getValue().getTextAsQName().getLocalPart();
// Get the identifier, if there is one.
SOAPFaultDetail detail = faultPart.getDetail();
if (detail != null && !isSOAP11SequenceUnknownFault)
{
// At this point we may not know what RM NS is in use.
OMElement identifierOM = detail.getFirstChildWithName(new QName(Sandesha2Constants.SPEC_2005_02.NS_URI,
Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
if (identifierOM != null){
identifier = identifierOM.getText();
}else{
identifierOM = detail.getFirstChildWithName(new QName(Sandesha2Constants.SPEC_2007_02.NS_URI,
Sandesha2Constants.WSRM_COMMON.IDENTIFIER));
if (identifierOM != null){
identifier = identifierOM.getText();
}
}
}
if (Sandesha2Constants.SOAPFaults.Subcodes.CREATE_SEQUENCE_REFUSED.equals(soapFaultSubcode)) {
processCreateSequenceRefusedFault(rmMsgCtx, fault);
} else if (Sandesha2Constants.SOAPFaults.Subcodes.UNKNOWN_SEQUENCE.equals(soapFaultSubcode) ||
Sandesha2Constants.SOAPFaults.Subcodes.SEQUENCE_TERMINATED.equals(soapFaultSubcode) ||
Sandesha2Constants.SOAPFaults.Subcodes.MESSAGE_NUMBER_ROLEOVER.equals(soapFaultSubcode)) {
processSequenceUnknownFault(rmMsgCtx, fault, identifier, transaction);
}
// If the operation is an Sandesha In Only operation, or the fault is a recognised fault,
// then stop the message from being processed further.
// To configure the actions for Sandesha to drop, add them to the module.xml under
// Sandesha2InOnly operation.
if (isRMFault(soapFaultSubcode))
response = InvocationResponse.ABORT;
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::manageIncomingFault, " + response);
return response;
}
public static InvocationResponse processMessagesForFaults (RMMsgContext rmMsgCtx, StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processMessagesForFaults");
InvocationResponse response = InvocationResponse.CONTINUE;
// Rather than look for a SOAPFault, check the action of the message to see if that matches
// an RM fault or is the addressing fault (RM 1.0).
boolean isFault = isRMFaultAction(rmMsgCtx.getMessageContext().getWSAAction());
if (isFault) {
Transaction transaction = null;
try {
transaction = storageManager.getTransaction();
SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope();
if (envelope==null)
return response;
SOAPFault faultPart = envelope.getBody().getFault();
// constructing the fault
AxisFault axisFault = getAxisFaultFromFromSOAPFault(faultPart, rmMsgCtx);
response = manageIncomingFault (axisFault, rmMsgCtx, faultPart, transaction);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} finally {
if (transaction != null && transaction.isActive())
transaction.rollback();
}
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processMessagesForFaults, " + response);
return response;
}
private static boolean isRMFaultAction(String action) {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processMessagesForFaults , "+action);
boolean isFaultAction = false;
if (AddressingConstants.Final.WSA_FAULT_ACTION.equals(action) ||
Sandesha2Constants.SPEC_2007_02.Actions.SOAP_ACTION_FAULT.equals(action))
isFaultAction = true;
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processMessagesForFaults , "+isFaultAction);
return isFaultAction;
}
private static AxisFault getAxisFaultFromFromSOAPFault(SOAPFault faultPart, RMMsgContext rmMsgCtx) {
String soapFaultSubcode = null;
SequenceFault sequenceFault = rmMsgCtx.getSequenceFault();
AxisFault axisFault = null;
// If the sequence fault part is not null, then we have an RM specific fault.
if (sequenceFault != null) {
soapFaultSubcode = sequenceFault.getFaultCode().getFaultCode().getLocalPart();
//Need to concatenate all info about the error into string
StringBuffer faultSB = new StringBuffer();
faultSB.append(soapFaultSubcode + " ");
faultSB.append(sequenceFault.getFaultCode().getDetail() + " ");
faultSB.append(faultPart.getDetail().getText());
axisFault = new AxisFault(faultSB.toString(), sequenceFault.getFaultCode().getFaultCode());
} else {
axisFault = new AxisFault(faultPart.getCode(), faultPart.getReason(), faultPart.getNode(), faultPart
.getRole(), faultPart.getDetail());
}
return axisFault;
}
/**
* Checks to see if the message number received is == to the Long.MAX_VALUE
*
* Throws and AxisFault, or sends a Fault message if the condition is met.
* @throws AxisFault
*/
public static boolean checkForMessageRolledOver(RMMsgContext rmMessageContext, String sequenceId, long msgNo, RMDBean bean)throws AxisFault {
if (msgNo == Long.MAX_VALUE) {
if (log.isDebugEnabled())
log.debug("Max message number reached " + msgNo);
// Return a CreateSequenceRefused error
MessageContext messageContext = rmMessageContext.getMessageContext();
FaultData data = new FaultData();
data.setCode(messageContext.getEnvelope().getVersion().getSenderFaultCode());
data.setSubcode(SpecSpecificConstants.getFaultSubcode(rmMessageContext.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER ));
SOAPFactory factory = (SOAPFactory)messageContext.getEnvelope().getOMFactory();
OMElement identifierElement = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.IDENTIFIER,
rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
identifierElement.setText(sequenceId);
OMElement maxMsgNumber = factory.createOMElement(Sandesha2Constants.WSRM_COMMON.MAX_MSG_NUMBER,
rmMessageContext.getRMNamespaceValue(), Sandesha2Constants.WSRM_COMMON.NS_PREFIX_RM);
maxMsgNumber.setText(Long.toString(msgNo));
data.setDetail(identifierElement);
data.setDetail2(maxMsgNumber);
data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.messageNumberRollover));
data.setType(Sandesha2Constants.SOAPFaults.FaultType.MESSAGE_NUMBER_ROLLOVER);
getOrSendFault(rmMessageContext, data, true, bean.getAcksToEndpointReference());
return true;
}
return false;
}
/**
* On receipt of a CreateSequenceRefused fault, terminate the sequence and notify any waiting
* clients of the error.
* @param fault
* @throws AxisFault
*/
private static void processCreateSequenceRefusedFault(RMMsgContext rmMsgCtx, AxisFault fault) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processCreateSequenceRefusedFault");
ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
if(log.isWarnEnabled()){
String name = "NOT_SET";
if(rmMsgCtx != null){
MessageContext mCtx = rmMsgCtx.getMessageContext();
if(mCtx != null){
AxisService axisSvc = mCtx.getAxisService();
if(axisSvc != null){
name = axisSvc.getName();
}
}
}
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reliableMessagingNotEnabled, name));
}
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
.getAxisConfiguration());
RelatesTo relatesTo = rmMsgCtx.getMessageContext().getRelatesTo();
String createSeqMsgId = null;
if (relatesTo != null) {
createSeqMsgId = relatesTo.getValue();
} else {
// Work out the related message from the operation context
OperationContext context = rmMsgCtx.getMessageContext().getOperationContext();
MessageContext createSeq = context.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
if(createSeq != null) createSeqMsgId = createSeq.getMessageID();
}
if(createSeqMsgId == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.relatesToNotAvailable);
log.error(message);
throw new SandeshaException(message);
}
SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
RMSBean rmsBean = rmsBeanMgr.retrieve(createSeqMsgId);
if (rmsBean == null) {
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Unable to find RMSBean");
return;
}
if(rmsBean.getSequenceID()!=null){
//we got a cseqRefused but the sequence is already setup - this implies a timing condition whereby several resends of the cSeqReq have been sent out.
//The best thing to do here is to ignore it.
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Sequence already established - no requirement to cleanup");
return;
}
// Indicate that there was an error when sending the Create Sequence.
rmsBean.setLastSendError(fault);
// Mark the sequence as terminated
rmsBean.setTerminated(true);
// Update the RMSBean
rmsBeanMgr.update(rmsBean);
SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
if (createSequenceSenderBean == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
// deleting the create sequence entry.
retransmitterMgr.delete(createSeqMsgId);
// Notify the clients of a failure
notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
rmMsgCtx.pause();
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmsBean);
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processCreateSequenceRefusedFault");
}
/**
* If the RMD returns a SequenceTerminated, or an Unknown sequence fault, then we should
* mark the RMS Sequence as terminated and notify clients of the error.
*
* @param rmMsgCtx
* @param fault
* @param identifier
*/
private static void processSequenceUnknownFault(RMMsgContext rmMsgCtx, AxisFault fault, String sequenceID, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: FaultManager::processSequenceUnknownFault " + sequenceID);
ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
.getAxisConfiguration());
// Find the rmsBean
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceID);
if (rmsBean != null) {
rmMsgCtx.pause();
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmsBean);
if(!TerminateManager.terminateSendingSide(rmsBean, storageManager, true, transaction)){
// We did not reallocate so we notify the clients of a failure
notifyClientsOfFault(rmsBean.getInternalSequenceID(), storageManager, configCtx, fault);
//Mark the RMSBean as reallocation failed and update last activation time
transaction = storageManager.getTransaction();
rmsBean.setLastActivatedTime(System.currentTimeMillis());
storageManager.getRMSBeanMgr().update(rmsBean);
if(transaction != null && transaction.isActive()) transaction.commit();
}
}
else {
RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceID);
if (rmdBean != null) {
rmMsgCtx.pause();
// Cleanup sending side.
if (log.isDebugEnabled())
log.debug("Terminating sending sequence " + rmdBean);
TerminateManager.cleanReceivingSideOnTerminateMessage(configCtx, rmdBean.getSequenceID(), storageManager);
// Update the last activated time.
rmdBean.setLastActivatedTime(System.currentTimeMillis());
// Update the bean in the map
storageManager.getRMDBeanMgr().update(rmdBean);
}
else {
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processSequenceUnknownFault Unable to find sequence");
return;
}
}
if (log.isDebugEnabled())
log.debug("Exit: FaultManager::processSequenceUnknownFault");
}
static void notifyClientsOfFault(String internalSequenceId,
StorageManager storageManager, ConfigurationContext configCtx, AxisFault fault) throws SandeshaException {
// Locate and update all of the messages for this sequence, now that we know
// the sequence id.
SenderBean target = new SenderBean();
target.setInternalSequenceID(internalSequenceId);
Iterator<SenderBean> iterator = storageManager.getSenderBeanMgr().find(target).iterator();
while (iterator.hasNext()) {
SenderBean tempBean = iterator.next();
if (tempBean.getMessageType() != Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG &&
tempBean.getMessageType() != Sandesha2Constants.MessageTypes.ACK) {
String messageStoreKey = tempBean.getMessageContextRefKey();
// Retrieve the message context.
MessageContext context = storageManager.retrieveMessageContext(messageStoreKey, configCtx);
AxisOperation axisOperation = context.getAxisOperation();
if (axisOperation != null) {
MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
Object callback = ((CallbackReceiver)msgReceiver).lookupCallback(context.getMessageID());
if(callback instanceof AxisCallback) {
try {
((CallbackReceiver)msgReceiver).addCallback(context.getMessageID(),(AxisCallback)callback);
} catch (AxisFault axisFault) {
throw new SandeshaException(axisFault);
}
((AxisCallback)callback).onError(fault);
}
// this is actually to support synapse. Synpase Axis Operation does not have a callBackMessageReceiver
// synapse AxisOperation always has the synapse message receiver. And also to be send in the synapse
// fault mediators we need to set the SENDING_FAULT property as well.
} else if (msgReceiver != null && tempBean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
try {
//since there is no reponse we set this message as the fault reply
context.getOptions().setRelationships(new RelatesTo[]{new RelatesTo(context.getMessageID())});
context.setProperty("SENDING_FAULT", Boolean.TRUE);
msgReceiver.receive(context);
} catch (AxisFault axisFault) {
log.error(axisFault.getMessage());
throw new SandeshaException("Can not invoke the message receiver ", axisFault);
}
}
}
}
}
}
}