blob: c6735dd4f303927ede98584cc4141efbf82797d9 [file] [log] [blame]
/*
* Copyright 1999-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.sandesha2.msgprocessors;
import java.util.Collection;
import java.util.Iterator;
import javax.xml.namespace.QName;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.AbstractContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Nack;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
/**
* Responsible for processing an incoming acknowledgement message.
*
* @author Chamikara Jayalath <chamikaramj@gmail.com>
*/
public class AcknowledgementProcessor implements MsgProcessor {
public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
if (sequenceAck == null)
throw new SandeshaException("Sequence acknowledgement part is null");
AbstractContext context = rmMsgCtx.getContext();
if (context == null)
throw new SandeshaException("Context is null");
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
.getConfigurationContext());
SenderBeanMgr retransmitterMgr = storageManager
.getRetransmitterBeanMgr();
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges()
.iterator();
Iterator nackIterator = sequenceAck.getNackList().iterator();
String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
if (outSequenceId == null || "".equals(outSequenceId))
throw new SandeshaException("OutSequenceId is null");
SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean == null || internalSequenceBean.getValue() == null)
throw new SandeshaException("TempSequenceId is not set correctly");
String internalSequenceId = (String) internalSequenceBean.getValue();
//Following happens in the SandeshaGlobal handler
rmMsgCtx.getMessageContext()
.setProperty(Sandesha2Constants.ACK_PROCSSED, "true");
//Removing relatesTo - Some WSRM endpoints tend to set relatesTo value
// for ack messages.
//Because of this dispatching may go wrong.
//So we set relatesTo value to null for ackMessages. (this happens in
// the SandeshaGlobal handler)
//Do this only if this is a standalone ACK.
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.ACK)
rmMsgCtx.setRelatesTo(null);
SenderBean input = new SenderBean();
input.setInternalSequenceId(internalSequenceId);
Collection retransmitterEntriesOfSequence = retransmitterMgr
.find(input);
while (ackRangeIterator.hasNext()) {
AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator
.next();
long lower = ackRange.getLowerValue();
long upper = ackRange.getUpperValue();
for (long messageNo = lower; messageNo <= upper; messageNo++) {
SenderBean retransmitterBean = getRetransmitterEntry(
retransmitterEntriesOfSequence, messageNo);
if (retransmitterBean != null)
retransmitterMgr.delete(retransmitterBean.getMessageId());
}
}
while (nackIterator.hasNext()) {
Nack nack = (Nack) nackIterator.next();
long msgNo = nack.getNackNumber();
//TODO - Process Nack
}
//following get called in the SandesaInHandler
//if (justSendTerminateIfNeeded) {
//If all messages up to last message have been acknowledged.
//Add terminate Sequence message.
SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
if (lastOutMsgBean != null) {
Long lastOutMsgNoLng = (Long) lastOutMsgBean.getValue();
if (lastOutMsgNoLng == null)
throw new SandeshaException(
"Invalid object set for the Last Out Message");
long lastOutMessageNo = lastOutMsgNoLng.longValue();
if (lastOutMessageNo <= 0)
throw new SandeshaException(
"Invalid value set for the last out message");
boolean complete = SandeshaUtil.verifySequenceCompletion(
sequenceAck.getAcknowledgementRanges().iterator(),
lastOutMessageNo);
if (complete) {
addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
internalSequenceId);
}
//stopping the progress of the message further.
rmMsgCtx.getMessageContext().setPausedTrue(
new QName(Sandesha2Constants.IN_HANDLER_NAME));
}
}
private SenderBean getRetransmitterEntry(Collection collection,
long msgNo) {
Iterator it = collection.iterator();
while (it.hasNext()) {
SenderBean bean = (SenderBean) it.next();
if (bean.getMessageNumber() == msgNo)
return bean;
}
return null;
}
public void addTerminateSequenceMessage(RMMsgContext incomingAckRMMsg,
String outSequenceId, String internalSequenceId)
throws SandeshaException {
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(incomingAckRMMsg.getMessageContext()
.getConfigurationContext());
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
if (terminated != null && terminated.getValue() != null
&& "true".equals(terminated.getValue())) {
System.out.println("TERMINATE WAS ADDED PREVIOUSLY....");
return;
}
RMMsgContext terminateRMMessage = RMMsgCreator
.createTerminateSequenceMessage(incomingAckRMMsg, outSequenceId);
SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
Sandesha2Constants.SequenceProperties.TO_EPR);
EndpointReference toEPR = (EndpointReference) toBean.getValue();
if (toEPR == null)
throw new SandeshaException("To EPR has an invalid value");
terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
terminateRMMessage.setFrom(new EndpointReference(
Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
terminateRMMessage.setFaultTo(new EndpointReference(
Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
terminateRMMessage
.setWSAAction(Sandesha2Constants.WSRM.Actions.ACTION_TERMINATE_SEQUENCE);
terminateRMMessage
.setSOAPAction(Sandesha2Constants.WSRM.Actions.SOAP_ACTION_TERMINATE_SEQUENCE);
try {
terminateRMMessage.addSOAPEnvelope();
} catch (AxisFault e) {
throw new SandeshaException(e.getMessage());
}
String key = SandeshaUtil.storeMessageContext(terminateRMMessage
.getMessageContext());
SenderBean terminateBean = new SenderBean();
terminateBean.setKey(key);
//Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
//Otherwise this get send before return of the current request (ack).
terminateBean.setTimeToSend(System.currentTimeMillis()
+ Sandesha2Constants.TERMINATE_DELAY);
terminateBean.setMessageId(terminateRMMessage.getMessageId());
terminateBean.setSend(true);
terminateBean.setReSend(false);
SenderBeanMgr retramsmitterMgr = storageManager
.getRetransmitterBeanMgr();
SequencePropertyBean terminateAdded = new SequencePropertyBean();
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
terminateAdded.setSequenceId(outSequenceId);
terminateAdded.setValue("true");
seqPropMgr.insert(terminateAdded);
retramsmitterMgr.insert(terminateBean);
}
}