blob: 3e75fb66684ef55124596d9bdd1cd076e7774ab4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.axis2.transport.base;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.SessionContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.*;
import org.apache.axis2.AxisFault;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
import org.apache.axis2.transport.base.tracker.AxisServiceFilter;
import org.apache.axis2.transport.base.tracker.AxisServiceTracker;
import org.apache.axis2.transport.base.tracker.AxisServiceTrackerListener;
import org.apache.axis2.transport.TransportListener;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axiom.util.UIDGenerator;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.*;
import java.lang.management.ManagementFactory;
public abstract class AbstractTransportListener implements TransportListener {
/** the reference to the actual commons logger to be used for log messages */
protected Log log = null;
/** the axis2 configuration context */
protected ConfigurationContext cfgCtx = null;
/** transport in description */
private TransportInDescription transportIn = null;
/** transport out description */
private TransportOutDescription transportOut = null;
/** state of the listener */
protected int state = BaseConstants.STOPPED;
/** is this transport non-blocking? */
protected boolean isNonBlocking = false;
/**
* Service tracker used to invoke {@link #internalStartListeningForService(AxisService)}
* and {@link #internalStopListeningForService(AxisService)}. */
private AxisServiceTracker serviceTracker;
/** the thread pool to execute actual poll invocations */
protected WorkerPool workerPool = null;
/** use the thread pool available in the axis2 configuration context */
protected boolean useAxis2ThreadPool = false;
/** JMX support */
private TransportMBeanSupport mbeanSupport;
/** Metrics collector for this transport */
protected MetricsCollector metrics = new MetricsCollector();
/** Transport Configuration for the respective transports */
protected TransportConfiguration config;
/**
* A constructor that makes subclasses pick up the correct logger
*/
protected AbstractTransportListener() {
log = LogFactory.getLog(this.getClass());
}
/**
* Initialize the generic transport. Sets up the transport and the thread pool to be used
* for message processing. Also creates an AxisObserver that gets notified of service
* life cycle events for the transport to act on
* @param cfgCtx the axis configuration context
* @param transportIn the transport-in description
* @throws AxisFault on error
*/
public void init(ConfigurationContext cfgCtx, TransportInDescription transportIn)
throws AxisFault {
this.cfgCtx = cfgCtx;
this.transportIn = transportIn;
this.transportOut = cfgCtx.getAxisConfiguration().getTransportOut(getTransportName());
this.config = TransportConfiguration.getConfiguration(getTransportName());
if (useAxis2ThreadPool) {
//this.workerPool = cfgCtx.getThreadPool(); not yet implemented
throw new AxisFault("Unsupported thread pool for task execution - Axis2 thread pool");
} else {
if (this.workerPool == null) { // FIXME <-- workaround for AXIS2-4552
this.workerPool = WorkerPoolFactory.getWorkerPool(
config.getServerCoreThreads(),
config.getServerMaxThreads(),
config.getServerKeepalive(),
config.getServerQueueLen(),
getTransportName() + "Server Worker thread group",
getTransportName() + "-Worker");
}
}
// register to receive updates on services for lifetime management
serviceTracker = new AxisServiceTracker(
cfgCtx.getAxisConfiguration(),
new AxisServiceFilter() {
public boolean matches(AxisService service) {
return !service.getName().startsWith("__") // these are "private" services
&& BaseUtils.isUsingTransport(service, getTransportName());
}
},
new AxisServiceTrackerListener() {
public void serviceAdded(AxisService service) {
internalStartListeningForService(service);
}
public void serviceRemoved(AxisService service) {
internalStopListeningForService(service);
}
});
// register with JMX
if (mbeanSupport == null) { // FIXME <-- workaround for AXIS2-4552
mbeanSupport = new TransportMBeanSupport(this, getTransportName());
mbeanSupport.register();
}
}
public void destroy() {
try {
if (state == BaseConstants.STARTED) {
try {
stop();
} catch (AxisFault ignore) {
log.warn("Error stopping the transport : " + getTransportName());
}
}
} finally {
state = BaseConstants.STOPPED;
mbeanSupport.unregister();
}
try {
workerPool.shutdown(10000);
} catch (InterruptedException ex) {
log.warn("Thread interrupted while waiting for worker pool to shut down");
}
}
public void stop() throws AxisFault {
if (state == BaseConstants.STARTED) {
state = BaseConstants.STOPPED;
// cancel receipt of service lifecycle events
log.info(getTransportName().toUpperCase() + " Listener Shutdown");
serviceTracker.stop();
}
}
public void start() throws AxisFault {
if (state != BaseConstants.STARTED) {
state = BaseConstants.STARTED;
// register to receive updates on services for lifetime management
// cfgCtx.getAxisConfiguration().addObservers(axisObserver);
log.info(getTransportName().toUpperCase() + " listener started");
// iterate through deployed services and start
serviceTracker.start();
}
}
public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
return getEPRsForService(serviceName);
}
protected EndpointReference[] getEPRsForService(String serviceName) {
return null;
}
public void disableTransportForService(AxisService service) {
log.warn("Disabling the " + getTransportName() + " transport for the service "
+ service.getName() + ", because it is not configured properly for the service");
if (service.isEnableAllTransports()) {
ArrayList<String> exposedTransports = new ArrayList<String>();
for(Object obj: cfgCtx.getAxisConfiguration().getTransportsIn().values()) {
String transportName = ((TransportInDescription) obj).getName();
if (!transportName.equals(getTransportName())) {
exposedTransports.add(transportName);
}
}
service.setEnableAllTransports(false);
service.setExposedTransports(exposedTransports);
} else {
service.removeExposedTransport(getTransportName());
}
}
void internalStartListeningForService(AxisService service) {
String serviceName = service.getName();
try {
startListeningForService(service);
} catch (AxisFault ex) {
String transportName = getTransportName().toUpperCase();
String msg = "Unable to configure the service " + serviceName + " for the " +
transportName + " transport: " + ex.getMessage() + ". " +
"This service is being marked as faulty and will not be available over the " +
transportName + " transport.";
// Only log the message at level WARN and log the full stack trace at level DEBUG.
// TODO: We should have a way to distinguish a missing configuration
// from an error. This may be addressed when implementing the enhancement
// described in point 3 of http://markmail.org/message/umhenrurlrekk5jh
log.warn(msg);
log.debug("Disabling service " + serviceName + " for the " + transportName +
"transport", ex);
BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
disableTransportForService(service);
return;
} catch (Throwable ex) {
String msg = "Unexpected error when configuring service " + serviceName +
" for the " + getTransportName().toUpperCase() + " transport. It will be" +
" disabled for this transport and marked as faulty.";
log.error(msg, ex);
BaseUtils.markServiceAsFaulty(serviceName, msg, service.getAxisConfiguration());
disableTransportForService(service);
return;
}
registerMBean(new TransportListenerEndpointView(this, serviceName),
getEndpointMBeanName(serviceName));
}
void internalStopListeningForService(AxisService service) {
unregisterMBean(getEndpointMBeanName(service.getName()));
stopListeningForService(service);
}
protected abstract void startListeningForService(AxisService service) throws AxisFault;
protected abstract void stopListeningForService(AxisService service);
/**
* This is a deprecated method in Axis2 and this default implementation returns the first
* result from the getEPRsForService() method
*/
public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault {
return getEPRsForService(serviceName, ip)[0];
}
public SessionContext getSessionContext(MessageContext messageContext) {
return null;
}
/**
* Create a new axis MessageContext for an incoming message through this transport
* @return the newly created message context
*/
public MessageContext createMessageContext() {
MessageContext msgCtx = new MessageContext();
msgCtx.setConfigurationContext(cfgCtx);
msgCtx.setIncomingTransportName(getTransportName());
msgCtx.setTransportOut(transportOut);
msgCtx.setTransportIn(transportIn);
msgCtx.setServerSide(true);
msgCtx.setMessageID(UIDGenerator.generateURNString());
// There is a discrepency in what I thought, Axis2 spawns a nes threads to
// send a message is this is TRUE - and I want it to be the other way
msgCtx.setProperty(MessageContext.CLIENT_API_NON_BLOCKING, Boolean.valueOf(!isNonBlocking));
// are these relevant?
//msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
// this is required to support Sandesha 2
//msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
// new HttpCoreRequestResponseTransport(msgContext));
return msgCtx;
}
/**
* Process a new incoming message through the axis engine
* @param msgCtx the axis MessageContext
* @param trpHeaders the map containing transport level message headers
* @param soapAction the optional soap action or null
* @param contentType the optional content-type for the message
*/
public void handleIncomingMessage(
MessageContext msgCtx, Map trpHeaders,
String soapAction, String contentType) throws AxisFault {
// set the soapaction if one is available via a transport header
if (soapAction != null) {
msgCtx.setSoapAction(soapAction);
}
// set the transport headers to the message context
msgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, trpHeaders);
// send the message context through the axis engine
try {
// check if an Axis2 callback has been registered for this message
Map callBackMap = (Map) msgCtx.getConfigurationContext().
getProperty(BaseConstants.CALLBACK_TABLE);
// FIXME: transport headers are protocol specific; the correlation ID should be
// passed as argument to this method
Object replyToMessageID = trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO);
// if there is a callback registerd with this replyto ID then this has to
// be handled as a synchronous incoming message, via the
if (replyToMessageID != null && callBackMap != null &&
callBackMap.get(replyToMessageID) != null) {
SynchronousCallback synchronousCallback =
(SynchronousCallback) callBackMap.get(replyToMessageID);
synchronousCallback.setInMessageContext(msgCtx);
callBackMap.remove(replyToMessageID);
} else {
AxisEngine.receive(msgCtx);
}
} catch (AxisFault e) {
if (log.isDebugEnabled()) {
log.debug("Error receiving message", e);
}
if (msgCtx.isServerSide()) {
AxisEngine.sendFault(MessageContextBuilder.createFaultMessageContext(msgCtx, e));
}
}
}
protected void handleException(String msg, Exception e) throws AxisFault {
log.error(msg, e);
throw new AxisFault(msg, e);
}
protected void logException(String msg, Exception e) {
log.error(msg, e);
}
public TransportInDescription getTransportInDescription() {
return transportIn;
}
public String getTransportName() {
return transportIn.getName();
}
public ConfigurationContext getConfigurationContext() {
return cfgCtx;
}
public MetricsCollector getMetricsCollector() {
return metrics;
}
// -- jmx/management methods--
/**
* Pause the listener - Stop accepting/processing new messages, but continues processing existing
* messages until they complete. This helps bring an instance into a maintenence mode
* @throws AxisFault on error
*/
public void pause() throws AxisFault {}
/**
* Resume the lister - Brings the lister into active mode back from a paused state
* @throws AxisFault on error
*/
public void resume() throws AxisFault {}
/**
* Stop processing new messages, and wait the specified maximum time for in-flight
* requests to complete before a controlled shutdown for maintenence
*
* @param millis a number of milliseconds to wait until pending requests are allowed to complete
* @throws AxisFault on error
*/
public void maintenenceShutdown(long millis) throws AxisFault {}
/**
* Returns the number of active threads processing messages
* @return number of active threads processing messages
*/
public int getActiveThreadCount() {
return workerPool.getActiveCount();
}
/**
* Return the number of requests queued in the thread pool
* @return queue size
*/
public int getQueueSize() {
return workerPool.getQueueSize();
}
public long getMessagesReceived() {
if (metrics != null) {
return metrics.getMessagesReceived();
}
return -1;
}
public long getFaultsReceiving() {
if (metrics != null) {
return metrics.getFaultsReceiving();
}
return -1;
}
public long getBytesReceived() {
if (metrics != null) {
return metrics.getBytesReceived();
}
return -1;
}
public long getMessagesSent() {
if (metrics != null) {
return metrics.getMessagesSent();
}
return -1;
}
public long getFaultsSending() {
if (metrics != null) {
return metrics.getFaultsSending();
}
return -1;
}
public long getBytesSent() {
if (metrics != null) {
return metrics.getBytesSent();
}
return -1;
}
public long getTimeoutsReceiving() {
if (metrics != null) {
return metrics.getTimeoutsReceiving();
}
return -1;
}
public long getTimeoutsSending() {
if (metrics != null) {
return metrics.getTimeoutsSending();
}
return -1;
}
public long getMinSizeReceived() {
if (metrics != null) {
return metrics.getMinSizeReceived();
}
return -1;
}
public long getMaxSizeReceived() {
if (metrics != null) {
return metrics.getMaxSizeReceived();
}
return -1;
}
public double getAvgSizeReceived() {
if (metrics != null) {
return metrics.getAvgSizeReceived();
}
return -1;
}
public long getMinSizeSent() {
if (metrics != null) {
return metrics.getMinSizeSent();
}
return -1;
}
public long getMaxSizeSent() {
if (metrics != null) {
return metrics.getMaxSizeSent();
}
return -1;
}
public double getAvgSizeSent() {
if (metrics != null) {
return metrics.getAvgSizeSent();
}
return -1;
}
public Map getResponseCodeTable() {
if (metrics != null) {
return metrics.getResponseCodeTable();
}
return null;
}
public void resetStatistics() {
if (metrics != null) {
metrics.reset();
}
}
public long getLastResetTime() {
if (metrics != null) {
return metrics.getLastResetTime();
}
return -1;
}
public long getMetricsWindow() {
if (metrics != null) {
return System.currentTimeMillis() - metrics.getLastResetTime();
}
return -1;
}
private String getEndpointMBeanName(String serviceName) {
return mbeanSupport.getMBeanName() + ",Group=Services,Service=" + serviceName;
}
/**
* Utility method to allow transports to register MBeans
* @param mbeanInstance bean instance
* @param objectName name
*/
private void registerMBean(Object mbeanInstance, String objectName) {
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName(objectName);
Set set = mbs.queryNames(name, null);
if (set != null && set.isEmpty()) {
mbs.registerMBean(mbeanInstance, name);
} else {
mbs.unregisterMBean(name);
mbs.registerMBean(mbeanInstance, name);
}
} catch (Exception e) {
log.warn("Error registering a MBean with objectname ' " + objectName +
" ' for JMX management", e);
}
}
private void unregisterMBean(String objectName) {
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objName = new ObjectName(objectName);
if (mbs.isRegistered(objName)) {
mbs.unregisterMBean(objName);
}
} catch (Exception e) {
log.warn("Error un-registering a MBean with objectname ' " + objectName +
" ' for JMX management", e);
}
}
}