blob: 5e8963ff946e82a74b2f4799a6aa1a2fbc93aa05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.axis2.transport.base;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.transport.TransportSender;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.description.WSDL2Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axiom.util.UIDGenerator;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.Map;
import java.util.Set;
public abstract class AbstractTransportSender extends AbstractHandler implements TransportSender {
/** the reference to the actual commons logger to be used for log messages */
protected Log log = null;
/** the axis2 configuration context */
protected ConfigurationContext cfgCtx = null;
/** transport in description */
private TransportInDescription transportIn = null;
/** transport out description */
private TransportOutDescription transportOut = null;
/** JMX support */
private TransportMBeanSupport mbeanSupport;
/** Metrics collector for the sender */
protected MetricsCollector metrics = new MetricsCollector();
/** state of the listener */
private int state = BaseConstants.STOPPED;
/**
* A constructor that makes subclasses pick up the correct logger
*/
protected AbstractTransportSender() {
log = LogFactory.getLog(this.getClass());
}
/**
* Initialize the generic transport sender.
*
* @param cfgCtx the axis configuration context
* @param transportOut the transport-out description
* @throws AxisFault on error
*/
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut)
throws AxisFault {
this.cfgCtx = cfgCtx;
this.transportOut = transportOut;
this.transportIn = cfgCtx.getAxisConfiguration().getTransportIn(getTransportName());
this.state = BaseConstants.STARTED;
// register with JMX
mbeanSupport = new TransportMBeanSupport(this, getTransportName());
mbeanSupport.register();
log.info(getTransportName().toUpperCase() + " Sender started");
}
public void stop() {
if (state != BaseConstants.STARTED) return;
state = BaseConstants.STOPPED;
mbeanSupport.unregister();
log.info(getTransportName().toUpperCase() + " Sender Shutdown");
}
public void cleanup(MessageContext msgContext) throws AxisFault {}
public abstract void sendMessage(MessageContext msgCtx, String targetEPR,
OutTransportInfo outTransportInfo) throws AxisFault;
public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
// is there a transport url which may be different to the WS-A To but has higher precedence
String targetAddress = (String) msgContext.getProperty(
Constants.Configuration.TRANSPORT_URL);
if (targetAddress != null) {
sendMessage(msgContext, targetAddress, null);
} else if (msgContext.getTo() != null && !msgContext.getTo().hasAnonymousAddress()) {
targetAddress = msgContext.getTo().getAddress();
if (!msgContext.getTo().hasNoneAddress()) {
sendMessage(msgContext, targetAddress, null);
} else {
//Don't send the message.
return InvocationResponse.CONTINUE;
}
} else if (msgContext.isServerSide()) {
// get the out transport info for server side when target EPR is unknown
sendMessage(msgContext, null,
(OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO));
}
return InvocationResponse.CONTINUE;
}
/**
* Process a new incoming message (Response) through the axis engine
* @param msgCtx the axis MessageContext
* @param trpHeaders the map containing transport level message headers
* @param soapAction the optional soap action or null
* @param contentType the optional content-type for the message
*/
public void handleIncomingMessage(
MessageContext msgCtx, Map trpHeaders,
String soapAction, String contentType) {
// set the soapaction if one is available via a transport header
if (soapAction != null) {
msgCtx.setSoapAction(soapAction);
}
// set the transport headers to the message context
msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders);
// send the message context through the axis engine
try {
try {
AxisEngine.receive(msgCtx);
} catch (AxisFault e) {
if (log.isDebugEnabled()) {
log.debug("Error receiving message", e);
}
if (msgCtx.isServerSide()) {
AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e));
}
}
} catch (AxisFault axisFault) {
logException("Error processing response message", axisFault);
}
}
/**
* Create a new axis MessageContext for an incoming response message
* through this transport, for the given outgoing message
*
* @param outMsgCtx the outgoing message
* @return the newly created message context
*/
public MessageContext createResponseMessageContext(MessageContext outMsgCtx) {
MessageContext responseMsgCtx = null;
try {
responseMsgCtx = outMsgCtx.getOperationContext().
getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
} catch (AxisFault af) {
log.error("Error getting IN message context from the operation context", af);
}
if (responseMsgCtx == null) {
responseMsgCtx = new MessageContext();
responseMsgCtx.setOperationContext(outMsgCtx.getOperationContext());
}
responseMsgCtx.setIncomingTransportName(getTransportName());
responseMsgCtx.setTransportOut(transportOut);
responseMsgCtx.setTransportIn(transportIn);
responseMsgCtx.setMessageID(UIDGenerator.generateURNString());
responseMsgCtx.setDoingREST(outMsgCtx.isDoingREST());
responseMsgCtx.setProperty(
MessageContext.TRANSPORT_IN, outMsgCtx.getProperty(MessageContext.TRANSPORT_IN));
responseMsgCtx.setAxisMessage(outMsgCtx.getOperationContext().getAxisOperation().
getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
responseMsgCtx.setTo(null);
//msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isNonBlocking);
// are these relevant?
//msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
// this is required to support Sandesha 2
//msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
// new HttpCoreRequestResponseTransport(msgContext));
return responseMsgCtx;
}
/**
* Should the transport sender wait for a synchronous response to be received?
* @param msgCtx the outgoing message context
* @return true if a sync response is expected
*/
protected boolean waitForSynchronousResponse(MessageContext msgCtx) {
return
msgCtx.getOperationContext() != null &&
WSDL2Constants.MEP_URI_OUT_IN.equals(
msgCtx.getOperationContext().getAxisOperation().getMessageExchangePattern());
}
public String getTransportName() {
return transportOut.getName();
}
protected void handleException(String msg, Exception e) throws AxisFault {
log.error(msg, e);
throw new AxisFault(msg, e);
}
protected void handleException(String msg) throws AxisFault {
log.error(msg);
throw new AxisFault(msg);
}
protected void logException(String msg, Exception e) {
log.error(msg, e);
}
//--- jmx/management methods ---
public void pause() throws AxisFault {
if (state != BaseConstants.STARTED) return;
state = BaseConstants.PAUSED;
log.info("Sender paused");
}
public void resume() throws AxisFault {
if (state != BaseConstants.PAUSED) return;
state = BaseConstants.STARTED;
log.info("Sender resumed");
}
public void maintenenceShutdown(long millis) throws AxisFault {
if (state != BaseConstants.STARTED) return;
long start = System.currentTimeMillis();
stop();
state = BaseConstants.STOPPED;
log.info("Sender shutdown in : " + (System.currentTimeMillis() - start) / 1000 + "s");
}
/**
* Returns the number of active threads processing messages
* @return number of active threads processing messages
*/
public int getActiveThreadCount() {
return 0;
}
/**
* Return the number of requests queued in the thread pool
* @return queue size
*/
public int getQueueSize() {
return 0;
}
// -- jmx/management methods--
public long getMessagesReceived() {
if (metrics != null) {
return metrics.getMessagesReceived();
}
return -1;
}
public long getFaultsReceiving() {
if (metrics != null) {
return metrics.getFaultsReceiving();
}
return -1;
}
public long getBytesReceived() {
if (metrics != null) {
return metrics.getBytesReceived();
}
return -1;
}
public long getMessagesSent() {
if (metrics != null) {
return metrics.getMessagesSent();
}
return -1;
}
public long getFaultsSending() {
if (metrics != null) {
return metrics.getFaultsSending();
}
return -1;
}
public long getBytesSent() {
if (metrics != null) {
return metrics.getBytesSent();
}
return -1;
}
public long getTimeoutsReceiving() {
if (metrics != null) {
return metrics.getTimeoutsReceiving();
}
return -1;
}
public long getTimeoutsSending() {
if (metrics != null) {
return metrics.getTimeoutsSending();
}
return -1;
}
public long getMinSizeReceived() {
if (metrics != null) {
return metrics.getMinSizeReceived();
}
return -1;
}
public long getMaxSizeReceived() {
if (metrics != null) {
return metrics.getMaxSizeReceived();
}
return -1;
}
public double getAvgSizeReceived() {
if (metrics != null) {
return metrics.getAvgSizeReceived();
}
return -1;
}
public long getMinSizeSent() {
if (metrics != null) {
return metrics.getMinSizeSent();
}
return -1;
}
public long getMaxSizeSent() {
if (metrics != null) {
return metrics.getMaxSizeSent();
}
return -1;
}
public double getAvgSizeSent() {
if (metrics != null) {
return metrics.getAvgSizeSent();
}
return -1;
}
public Map getResponseCodeTable() {
if (metrics != null) {
return metrics.getResponseCodeTable();
}
return null;
}
public void resetStatistics() {
if (metrics != null) {
metrics.reset();
}
}
public long getLastResetTime() {
if (metrics != null) {
return metrics.getLastResetTime();
}
return -1;
}
public long getMetricsWindow() {
if (metrics != null) {
return System.currentTimeMillis() - metrics.getLastResetTime();
}
return -1;
}
private void registerMBean(MBeanServer mbs, Object mbeanInstance, String objectName) {
try {
ObjectName name = new ObjectName(objectName);
Set set = mbs.queryNames(name, null);
if (set != null && set.isEmpty()) {
mbs.registerMBean(mbeanInstance, name);
} else {
mbs.unregisterMBean(name);
mbs.registerMBean(mbeanInstance, name);
}
} catch (Exception e) {
log.warn("Error registering a MBean with objectname ' " + objectName +
" ' for JMX management", e);
}
}
}