blob: 551e9bc0355feab5c9d9c69193905adf2dc967fb [file] [log] [blame]
/*
* Copyright 1999-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.apache.sandesha2.workers;
import java.util.Collection;
import java.util.Iterator;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* This is responsible for sending and re-sending messages of Sandesha2. This represent a thread that keep running all
* the time. This keep looking at the Sender table to find out any entries that should be sent.
*
* @author Chamikara Jayalath <chamikaramj@gmail.com>
*/
public class Sender extends Thread {
private boolean senderStarted = false;
private ConfigurationContext context = null;
public synchronized void stopSender() {
senderStarted = false;
}
public synchronized boolean isSenderStarted() {
return senderStarted;
}
public void run() {
while (senderStarted) {
try {
if (context == null)
throw new SandeshaException(
"Can't continue the Sender. Context is null");
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(context);
SenderBeanMgr mgr = storageManager
.getRetransmitterBeanMgr();
Collection coll = mgr.findMsgsToSend();
Iterator iter = coll.iterator();
while (iter.hasNext()) {
SenderBean bean = (SenderBean) iter.next();
String key = (String) bean.getKey();
MessageContext msgCtx = SandeshaUtil
.getStoredMessageContext(key);
try {
RMMsgContext rmMsgCtx = MsgInitializer
.initializeMessage(msgCtx);
updateMessage(msgCtx);
ServiceContext serviceContext = msgCtx
.getServiceContext();
Object debug = null;
if (serviceContext != null) {
debug = msgCtx
.getProperty(Sandesha2ClientAPI.SANDESHA_DEBUG_MODE);
if (debug != null && "on".equals(debug)) {
System.out
.println("DEBUG: Sender is sending a '"
+ SandeshaUtil
.getMessageTypeString(rmMsgCtx
.getMessageType())
+ "' message.");
}
}
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
//piggybacking if an ack if available for the same
// sequence.
AcknowledgementManager
.piggybackAckIfPresent(rmMsgCtx);
}
try {
new AxisEngine(context).send(msgCtx);
} catch (Exception e) {
//Exception is sending. retry later
System.out
.println("Exception thrown in sending...");
e.printStackTrace();
}
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
retransmitterAdjuster.adjustRetransmittion(bean);
mgr.update(bean);
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
//terminate sending side.
TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
String sequenceID = terminateSequence.getIdentifier().getIdentifier();
ConfigurationContext configContext = msgCtx.getConfigurationContext();
TerminateManager.terminateSendingSide(configContext,sequenceID);
}
} catch (AxisFault e1) {
e1.printStackTrace();
} catch (Exception e3) {
e3.printStackTrace();
}
//changing the values of the sent bean.
//bean.setLastSentTime(System.currentTimeMillis());
//bean.setSentCount(bean.getSentCount() + 1);
//update if resend=true otherwise delete. (reSend=false
// means
// send only once).
if (bean.isReSend())
mgr.update(bean);
else
mgr.delete(bean.getMessageId());
}
} catch (SandeshaException e) {
e.printStackTrace();
return;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
//e1.printStackTrace();
System.out.println("Sender was interupted...");
e1.printStackTrace();
System.out.println("End printing Interrupt...");
}
}
}
private boolean isResponseExpected(RMMsgContext rmMsgCtx) {
boolean responseExpected = false;
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.CREATE_SEQ) {
responseExpected = true;
}
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
//a ack may arrive. (not a application response)
if (rmMsgCtx.getMessageContext().getAxisOperation()
.getMessageExchangePattern().equals(
org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {
responseExpected = true;
}
}
return true;
}
public void start(ConfigurationContext context) {
senderStarted = true;
this.context = context;
super.start();
}
private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
try {
RMMsgContext rmMsgCtx1 = MsgInitializer.initializeMessage(msgCtx1);
rmMsgCtx1.addSOAPEnvelope();
} catch (AxisFault e) {
throw new SandeshaException("Exception in updating contexts");
}
}
private void checkForSyncResponses(MessageContext msgCtx) throws AxisFault {
boolean responsePresent = (msgCtx
.getProperty(MessageContext.TRANSPORT_IN) != null);
if (responsePresent) {
//create the response
MessageContext response = new MessageContext(msgCtx
.getConfigurationContext(), msgCtx.getSessionContext(), msgCtx
.getTransportIn(), msgCtx.getTransportOut());
response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
.getProperty(MessageContext.TRANSPORT_IN));
response.setServerSide(false);
//If request is REST we assume the response is REST, so set the
// variable
response.setDoingREST(msgCtx.isDoingREST());
response
.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
response.setServiceGroupContext(msgCtx.getServiceGroupContext());
response.setServiceContext(msgCtx.getServiceContext());
response.setAxisService(msgCtx.getAxisService());
response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
//setting the in-flow.
//ArrayList inPhaseHandlers =
// response.getAxisOperation().getRemainingPhasesInFlow();
/*
* if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
* ArrayList phases =
* msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
* response.getAxisOperation().setRemainingPhasesInFlow(phases); }
*/
//Changed following from TransportUtils to SandeshaUtil since op.
// context is anavailable.
SOAPEnvelope resenvelope = null;
try {
resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
.getEnvelope().getNamespace().getName());
} catch (AxisFault e) {
//TODO: change to log.debug
e.printStackTrace();
}
if (resenvelope != null) {
AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext());
response.setEnvelope(resenvelope);
engine.receive(response);
}
}
}
}