blob: 66c8c54826d7119acccc37ebead606a3a5419844 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.transport.nhttp;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.soap.SOAP11Constants;
import org.apache.axiom.soap.SOAP12Constants;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.transport.base.MetricsCollector;
import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
import org.apache.axis2.util.MessageContextBuilder;
import org.apache.axis2.util.JavaUtils;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.*;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultNHttpClientConnection;
import org.apache.http.nio.*;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.nio.util.ContentOutputBuffer;
import org.apache.http.nio.util.ContentInputBuffer;
import org.apache.http.nio.util.SharedInputBuffer;
import org.apache.http.nio.util.SharedOutputBuffer;
import org.apache.http.nio.entity.ContentInputStream;
import org.apache.http.protocol.*;
import org.apache.synapse.transport.nhttp.debug.ClientConnectionDebug;
import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
import org.apache.synapse.commons.jmx.ThreadingView;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Comparator;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* The client connection handler. An instance of this class is used by each IOReactor, to
* process every connection. Hence this class should not store any data related to a single
* connection - as this is being shared.
*/
public class ClientHandler implements NHttpClientEventHandler {
private static final Log log = LogFactory.getLog(ClientHandler.class);
/** the HttpProcessor for response messages received */
private final HttpProcessor httpProcessor;
/** the connection re-use strategy */
private final ConnectionReuseStrategy connStrategy;
/** the buffer allocator */
private final ByteBufferAllocator allocator;
/** the Axis2 configuration context */
ConfigurationContext cfgCtx = null;
/** the nhttp configuration */
private NHttpConfiguration cfg = null;
private WorkerPool workerPool = null;
/** the metrics collector */
private NhttpMetricsCollector metrics = null;
/** Array of content types for which warnings are logged if HTTP status code is 500. */
private String[] warnOnHttp500;
/** weather we are counting the connections to the back end servers */
private boolean countConnections = false;
/** lock to update the connection counts in a thread safe way */
private Lock lock = new ReentrantLock();
private ThreadingView threadingView = null;
/** A map for holding the number of open connections for a host:port pair */
private Map<String, AtomicInteger> openConnections = new HashMap<String, AtomicInteger>();
public static final String OUTGOING_MESSAGE_CONTEXT = "synapse.axis2_message_context";
public static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
public static final String CLIENT_CONNECTION_DEBUG = "synapse.client-connection-debug";
public static final String CONNECTION_CREATION_TIME = "synapse.connectionCreationTime";
public static final String REQUEST_SOURCE_BUFFER = "synapse.request-source-buffer";
public static final String RESPONSE_SINK_BUFFER = "synapse.response-sink-buffer";
private static final String CONTENT_TYPE = "Content-Type";
/**
* Create an instance of this client connection handler using the Axis2 configuration
* context and Http protocol parameters given
*
* @param cfgCtx the Axis2 configuration context
* @param metrics statistics collection metrics
*/
public ClientHandler(final ConfigurationContext cfgCtx, final NhttpMetricsCollector metrics) {
super();
this.cfgCtx = cfgCtx;
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
this.metrics = metrics;
this.allocator = new HeapByteBufferAllocator();
this.threadingView = new ThreadingView("HttpClientWorker", true, 50);
this.cfg = NHttpConfiguration.getInstance();
workerPool = WorkerPoolFactory.getWorkerPool(
cfg.getClientCoreThreads(),
cfg.getClientMaxThreads(),
cfg.getClientKeepalive(),
cfg.getClientQueueLen(),
"Client Worker thread group", "HttpClientWorker");
Object contentTypeList = cfgCtx.getLocalProperty("warnOnHTTP500");
if (contentTypeList != null) {
warnOnHttp500 = (String[]) contentTypeList;
}
// check weather we count the connections
this.countConnections = NHttpConfiguration.getInstance().isCountConnections();
// set the connection map to the configuration context
cfgCtx.setProperty(NhttpConstants.OPEN_CONNNECTIONS_MAP, openConnections);
// set the latest openConnections map to MBean data during connection creation
metrics.setConnectionsPerHosts(openConnections);
}
public void requestReady(final NHttpClientConnection conn) {
// The connection is ready for submission of a new request
}
/**
* Submit a new request over an already established connection, which has been
* 'kept alive'
*
* @param conn the connection to use to send the request, which has been kept open
* @param axis2Req the new request
* @throws ConnectionClosedException if the connection is closed by the other party
*/
public void submitRequest(final NHttpClientConnection conn, Axis2HttpRequest axis2Req)
throws ConnectionClosedException {
processConnection(conn, axis2Req);
}
/**
* Invoked when the destination is connected
*
* @param conn the connection being processed
* @param attachment the attachment set previously
*/
public void connected(final NHttpClientConnection conn, final Object attachment) {
if (log.isDebugEnabled() ) {
log.debug("ClientHandler connected : " + conn);
}
metrics.connected();
// record connection creation time for debug logging
conn.getContext().setAttribute(CONNECTION_CREATION_TIME, System.currentTimeMillis());
if (countConnections) {
recordConnection(conn);
}
try {
processConnection(conn, (Axis2HttpRequest) attachment);
} catch (ConnectionClosedException e) {
metrics.incrementFaultsSending();
handleException("I/O Error submitting request : " + e.getMessage(), e, conn);
}
}
/**
* Process a new connection over an existing TCP connection or new
*
* @param conn HTTP connection to be processed
* @param axis2Req axis2 representation of the message in the connection
* @throws ConnectionClosedException if the connection is closed
*/
private void processConnection(final NHttpClientConnection conn,
final Axis2HttpRequest axis2Req) throws ConnectionClosedException {
// record start time of request
ClientConnectionDebug cd = (ClientConnectionDebug)
axis2Req.getMsgContext().getProperty(CLIENT_CONNECTION_DEBUG);
if (cd != null) {
cd.recordRequestStartTime(conn, axis2Req);
conn.getContext().setAttribute(CLIENT_CONNECTION_DEBUG, cd);
}
try {
// Reset connection metrics
conn.getMetrics().reset();
HttpContext context = conn.getContext();
ContentOutputBuffer outputBuffer
= new SharedOutputBuffer(cfg.getBufferSize(), allocator);
axis2Req.setOutputBuffer(outputBuffer);
context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
context.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
HttpRequest request = axis2Req.getRequest();
this.httpProcessor.process(request, context);
if (axis2Req.getTimeout() > 0) {
conn.setSocketTimeout(axis2Req.getTimeout());
}
context.setAttribute(NhttpConstants.ENDPOINT_PREFIX, axis2Req.getEndpointURLPrefix());
context.setAttribute(NhttpConstants.HTTP_REQ_METHOD, request.getRequestLine().getMethod());
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
setServerContextAttribute(NhttpConstants.REQ_DEPARTURE_TIME,
System.currentTimeMillis(), conn);
conn.submitRequest(request);
} catch (ConnectionClosedException e) {
throw e;
} catch (IOException e) {
if (metrics != null) {
metrics.incrementFaultsSending();
}
handleException("I/O Error submitting request : " + e.getMessage(), e, conn);
} catch (HttpException e) {
if (metrics != null) {
metrics.incrementFaultsSending();
}
handleException("HTTP protocol error submitting request : " + e.getMessage(), e, conn);
} finally {
synchronized(axis2Req) {
axis2Req.setReadyToStream(true);
axis2Req.notifyAll();
}
}
}
/**
* Handle connection close events
*
* @param conn HTTP connection to be closed
*/
public void closed(final NHttpClientConnection conn) {
ConnectionPool.forget(conn);
String message = getErrorMessage("Connection close", conn);
if (log.isTraceEnabled()) {
log.trace(message);
}
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
if (axis2Request != null && !axis2Request.isCompleted()) {
checkAxisRequestComplete(conn, NhttpConstants.CONNECTION_CLOSED, message, null);
} else {
if (log.isDebugEnabled()) {
log.debug(getErrorMessage("Keep-alive connection closed", conn));
}
}
HttpContext context = conn.getContext();
shutdownConnection(conn);
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
metrics.disconnected();
}
/**
* Handle connection timeouts by shutting down the connections. These are established
* that have reached the SO_TIMEOUT of the socket
*
* @param conn the connection being processed
*/
public void timeout(final NHttpClientConnection conn) {
String message = getErrorMessage("Connection timeout", conn);
if (log.isDebugEnabled()) {
log.debug(message);
}
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
if (axis2Request != null && !axis2Request.isCompleted()) {
checkAxisRequestComplete(conn, NhttpConstants.CONNECTION_TIMEOUT, message, null);
} else {
if (log.isDebugEnabled()) {
log.debug(getErrorMessage("Keep-alive connection timed out", conn));
}
}
HttpContext context = conn.getContext();
shutdownConnection(conn);
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
}
public void endOfInput(NHttpClientConnection conn) throws IOException {
closed(conn);
}
public void exception(NHttpClientConnection conn, Exception e) {
if (e instanceof HttpException) {
exception(conn, (HttpException) e);
} else if (e instanceof IOException) {
exception(conn, (IOException) e);
} else {
log.error(e.getMessage(), e);
shutdownConnection(conn);
}
}
/**
* Handle Http protocol violations encountered while reading from underlying channels
*
* @param conn the connection being processed
* @param e the exception encountered
*/
public void exception(final NHttpClientConnection conn, final HttpException e) {
String message = getErrorMessage("HTTP protocol violation : " + e.getMessage(), conn);
log.error(message, e);
checkAxisRequestComplete(conn, NhttpConstants.PROTOCOL_VIOLATION, message, e);
shutdownConnection(conn);
}
/**
* Handle IO errors while reading or writing to underlying channels
*
* @param conn the connection being processed
* @param e the exception encountered
*/
public void exception(final NHttpClientConnection conn, final IOException e) {
String message = getErrorMessage("I/O error : " + e.getMessage(), conn);
if (message.toLowerCase().indexOf("reset") != -1) {
log.warn(message);
} else {
log.error(message, e);
}
checkAxisRequestComplete(conn, NhttpConstants.SND_IO_ERROR_SENDING, message, e);
shutdownConnection(conn);
}
/**
* Include remote host and port information to an error message
*
* @param message the initial message
* @param conn the connection encountering the error
* @return the updated error message
*/
private String getErrorMessage(String message, NHttpClientConnection conn) {
if (conn != null && conn instanceof DefaultNHttpClientConnection) {
DefaultNHttpClientConnection c = ((DefaultNHttpClientConnection) conn);
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
if (c.getRemoteAddress() != null) {
return message + " For : " + c.getRemoteAddress().getHostAddress() + ":" +
c.getRemotePort() + (axis2Request != null ? " For Request : "
+ axis2Request : "");
}
}
return message;
}
/**
* check to see if http request-response has completed, if not completed yet,
* notify an exception to the message-receiver
*
* @param conn the connection being checked for completion
* @param errorCode the error code to raise
* @param errorMessage the text for an error message to be returned to the MR on failure
* @param exceptionToRaise an Exception to be returned to the MR on failure
*/
private void checkAxisRequestComplete(NHttpClientConnection conn,
final int errorCode, final String errorMessage, final Exception exceptionToRaise) {
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
if (axis2Request != null && !axis2Request.isCompleted()) {
markRequestCompletedWithError(axis2Request, errorCode, errorMessage, exceptionToRaise);
}
}
/**
* Mark request to send failed with error
*
* @param axis2Request the Axis2HttpRequest to be marked as completed with an error
* @param errorCode the error code to raise
* @param errorMessage the text for an error message to be returned to the MR on failure
* @param exceptionToRaise an Exception to be returned to the MR on failure
*/
protected void markRequestCompletedWithError(Axis2HttpRequest axis2Request, final int errorCode,
final String errorMessage, final Exception exceptionToRaise) {
axis2Request.setCompleted(true);
if (errorCode == -1 && errorMessage == null && exceptionToRaise == null) {
return; // no need to continue
}
final MessageContext mc = axis2Request.getMsgContext();
// if the request message is a sandesha messag we ignore the
// exception handling
// we cannot use the declared sandesha2 constant since
// nhttp transport shouldn't take a sandesha2 dependency
String done = (String) mc.getProperty("Sandesha2AppProcessingDone");
if (JavaUtils.isTrueExplicitly(done)) {
return;
}
if (mc.getAxisOperation() != null &&
mc.getAxisOperation().getMessageReceiver() != null) {
if (metrics != null) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
if (errorCode == NhttpConstants.CONNECTION_TIMEOUT) {
metrics.incrementTimeoutsReceiving(mc);
} else {
metrics.incrementFaultsSending(errorCode, mc);
}
} else {
if (errorCode == NhttpConstants.CONNECTION_TIMEOUT) {
metrics.incrementTimeoutsReceiving();
} else {
metrics.incrementFaultsSending();
}
}
}
workerPool.execute( new Runnable() {
public void run() {
MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
try {
// This AxisFault is created to create the fault message context
// noinspection ThrowableInstanceNeverThrown
AxisFault axisFault = exceptionToRaise != null ?
new AxisFault(errorMessage, exceptionToRaise) :
new AxisFault(errorMessage);
MessageContext nioFaultMessageContext =
MessageContextBuilder.createFaultMessageContext(mc, axisFault);
SOAPEnvelope envelope = nioFaultMessageContext.getEnvelope();
if (log.isDebugEnabled()) {
log.debug("Sending Fault for Request with Message ID : "
+ mc.getMessageID());
}
nioFaultMessageContext.setProperty(
NhttpConstants.SENDING_FAULT, Boolean.TRUE);
nioFaultMessageContext.setProperty(
NhttpConstants.ERROR_MESSAGE, errorMessage);
if (errorCode != -1) {
nioFaultMessageContext.setProperty(
NhttpConstants.ERROR_CODE, errorCode);
}
if (exceptionToRaise != null) {
nioFaultMessageContext.setProperty(
NhttpConstants.ERROR_DETAIL, exceptionToRaise.toString());
nioFaultMessageContext.setProperty(
NhttpConstants.ERROR_EXCEPTION, exceptionToRaise);
envelope.getBody().getFault().getDetail().setText(
exceptionToRaise.toString());
} else {
nioFaultMessageContext.setProperty(
NhttpConstants.ERROR_DETAIL, errorMessage);
envelope.getBody().getFault().getDetail().setText(errorMessage);
}
nioFaultMessageContext.setProperty(CLIENT_CONNECTION_DEBUG,
mc.getProperty(CLIENT_CONNECTION_DEBUG));
mr.receive(nioFaultMessageContext);
} catch (AxisFault af) {
log.error("Unable to report back failure to the message receiver", af);
}
}
});
}
}
/**
* Process ready input (i.e. response from remote server)
*
* @param conn connection being processed
* @param decoder the content decoder in use
*/
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
HttpContext context = conn.getContext();
HttpResponse response = conn.getHttpResponse();
SharedInputBuffer inBuf = (SharedInputBuffer) context.getAttribute(RESPONSE_SINK_BUFFER);
try {
int bytesRead = inBuf.consumeContent(decoder, conn);
if (metrics != null && bytesRead > 0) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
metrics.incrementBytesReceived(getMessageContext(conn), bytesRead);
} else {
metrics.incrementBytesReceived(bytesRead);
}
}
if (decoder.isCompleted()) {
setServerContextAttribute(NhttpConstants.RES_ARRIVAL_TIME,
System.currentTimeMillis(), conn);
ClientConnectionDebug ccd = (ClientConnectionDebug)
conn.getContext().getAttribute(CLIENT_CONNECTION_DEBUG);
if (ccd != null) {
ccd.recordResponseCompletionTime();
}
if (metrics != null) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
MessageContext mc = getMessageContext(conn);
metrics.incrementMessagesReceived(mc);
metrics.notifyReceivedMessageSize(
mc, conn.getMetrics().getReceivedBytesCount());
metrics.notifySentMessageSize(mc, conn.getMetrics().getSentBytesCount());
metrics.reportResponseCode(mc, response.getStatusLine().getStatusCode());
} else {
metrics.incrementMessagesReceived();
metrics.notifyReceivedMessageSize(
conn.getMetrics().getReceivedBytesCount());
metrics.notifySentMessageSize(conn.getMetrics().getSentBytesCount());
}
}
// reset metrics on connection
conn.getMetrics().reset();
if (context.getAttribute(NhttpConstants.DISCARD_ON_COMPLETE) != null) {
try {
// this is a connection we should not re-use
ConnectionPool.forget(conn);
shutdownConnection(conn);
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
} catch (Exception ignore) {}
} else if (!connStrategy.keepAlive(response, context)) {
shutdownConnection(conn);
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
} else {
ConnectionPool.release(conn);
}
}
} catch (IOException e) {
if (metrics != null) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
metrics.incrementFaultsReceiving(
NhttpConstants.SND_IO_ERROR_RECEIVING, getMessageContext(conn));
} else {
metrics.incrementFaultsReceiving();
}
}
handleException("I/O Error at inputReady : " + e.getMessage(), e, conn);
}
}
/**
* Process ready output (i.e. write request to remote server)
*
* @param conn the connection being processed
* @param encoder the encoder in use
*/
public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
HttpContext context = conn.getContext();
SharedOutputBuffer outBuf
= (SharedOutputBuffer) context.getAttribute(REQUEST_SOURCE_BUFFER);
if (outBuf == null) return;
try {
int bytesWritten = outBuf.produceContent(encoder, conn);
if (metrics != null) {
if (bytesWritten > 0) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
metrics.incrementBytesSent(getMessageContext(conn), bytesWritten);
} else {
metrics.incrementBytesSent(bytesWritten);
}
}
if (encoder.isCompleted()) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
metrics.incrementMessagesSent(getMessageContext(conn));
} else {
metrics.incrementMessagesSent();
}
}
}
if (encoder.isCompleted()) {
ClientConnectionDebug ccd = (ClientConnectionDebug)
context.getAttribute(CLIENT_CONNECTION_DEBUG);
if (ccd != null) {
ccd.recordRequestCompletionTime();
}
}
} catch (IOException e) {
if (metrics != null) {
if (metrics.getLevel() == MetricsCollector.LEVEL_FULL) {
metrics.incrementFaultsSending(
NhttpConstants.SND_IO_ERROR_SENDING, getMessageContext(conn));
} else {
metrics.incrementFaultsSending();
}
}
handleException("I/O Error at outputReady : " + e.getMessage(), e, conn);
}
}
/**
* Process a response received for the request sent out
*
* @param conn the connection being processed
*/
public void responseReceived(final NHttpClientConnection conn) {
setServerContextAttribute(NhttpConstants.RES_HEADER_ARRIVAL_TIME,
System.currentTimeMillis(), conn);
HttpContext context = conn.getContext();
HttpResponse response = conn.getHttpResponse();
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_CONTINUE) {
if (log.isDebugEnabled()) {
log.debug("Received a 100 Continue response");
}
// according to the HTTP 1.1 specification HTTP status 100 continue implies that
// the response will be followed, and the client should just ignore the 100 Continue
// and wait for the response
return;
}
ClientConnectionDebug ccd = (ClientConnectionDebug)
conn.getContext().getAttribute(CLIENT_CONNECTION_DEBUG);
if (ccd != null) {
ccd.recordResponseStartTime(response.getStatusLine().toString());
}
// Have we sent out our request fully in the first place? if not, forget about it now..
Axis2HttpRequest req
= (Axis2HttpRequest) conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
if (req != null) {
req.setCompleted(true);
if (log.isDebugEnabled()) {
log.debug("Response Received for Request : " + req);
}
if (!req.isSendingCompleted()) {
req.getMsgContext().setProperty(
NhttpConstants.ERROR_CODE, NhttpConstants.SEND_ABORT);
SharedOutputBuffer outputBuffer = (SharedOutputBuffer)
conn.getContext().getAttribute(REQUEST_SOURCE_BUFFER);
if (outputBuffer != null) {
outputBuffer.shutdown();
}
if (log.isDebugEnabled()) {
log.debug("Remote server aborted request being sent and replied : " + conn
+ " for request : " + conn.getContext().getAttribute(
NhttpConstants.HTTP_REQ_METHOD));
}
context.setAttribute(NhttpConstants.DISCARD_ON_COMPLETE, Boolean.TRUE);
if (metrics != null) {
metrics.incrementFaultsSending(NhttpConstants.SEND_ABORT, req.getMsgContext());
}
}
}
switch (response.getStatusLine().getStatusCode()) {
case HttpStatus.SC_ACCEPTED : {
if (log.isDebugEnabled()) {
log.debug("Received a 202 Accepted response");
}
// sometimes, some http clients sends an "\r\n" as the content body with a
// HTTP 202 OK.. we will just get it into this temp buffer and ignore it..
ContentInputBuffer inputBuffer = new SharedInputBuffer(8, allocator);
context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
// create a dummy message with an empty SOAP envelope and a property
// NhttpConstants.SC_ACCEPTED set to Boolean.TRUE to indicate this is a
// placeholder message for the transport to send a HTTP 202 to the
// client. Should / would be ignored by any transport other than
// nhttp. For example, JMS would not send a reply message for one-way
// operations.
MessageContext outMsgCtx =
(MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT);
MessageReceiver mr = outMsgCtx.getAxisOperation().getMessageReceiver();
// the following check is to support the dual channel invocation. Hence the
// response will be sent as a new request to the client over a different channel
// client sends back a 202 Accepted response to synapse and we need to neglect that
// 202 Accepted message
if (!outMsgCtx.isPropertyTrue(NhttpConstants.IGNORE_SC_ACCEPTED)) {
try {
MessageContext responseMsgCtx = outMsgCtx.getOperationContext().
getMessageContext(WSDL2Constants.MESSAGE_LABEL_IN);
if (responseMsgCtx == null ||
outMsgCtx.getOptions().isUseSeparateListener() ||
outMsgCtx.getOperationContext().isComplete()) {
if (responseMsgCtx != null &&
responseMsgCtx.getProperty("synapse.send") == null) {
return;
}
} else if (outMsgCtx.getOptions().isUseSeparateListener()) {
// Since we need to notify the SynapseCallback receiver to remove the
// call backs registered we set a custom property
setHeaders(context, response, outMsgCtx, responseMsgCtx);
outMsgCtx.setProperty(NhttpConstants.HTTP_202_RECEIVED, "true");
mr.receive(outMsgCtx);
return;
}
if (responseMsgCtx == null) {
return;
}
setHeaders(context, response, outMsgCtx, responseMsgCtx);
responseMsgCtx.setServerSide(true);
responseMsgCtx.setDoingREST(outMsgCtx.isDoingREST());
responseMsgCtx.setProperty(MessageContext.TRANSPORT_IN,
outMsgCtx.getProperty(MessageContext.TRANSPORT_IN));
responseMsgCtx.setTransportIn(outMsgCtx.getTransportIn());
responseMsgCtx.setTransportOut(outMsgCtx.getTransportOut());
responseMsgCtx.setAxisMessage(outMsgCtx.getAxisOperation().
getMessage(WSDLConstants.MESSAGE_LABEL_IN_VALUE));
responseMsgCtx.setOperationContext(outMsgCtx.getOperationContext());
responseMsgCtx.setConfigurationContext(outMsgCtx.getConfigurationContext());
responseMsgCtx.setTo(null);
if (!outMsgCtx.isDoingREST() && !outMsgCtx.isSOAP11()) {
responseMsgCtx.setEnvelope(OMAbstractFactory.getSOAP12Factory().getDefaultEnvelope());
} else {
responseMsgCtx.setEnvelope(OMAbstractFactory.getSOAP11Factory().getDefaultEnvelope());
}
responseMsgCtx.setProperty(AddressingConstants.
DISABLE_ADDRESSING_FOR_OUT_MESSAGES, Boolean.TRUE);
responseMsgCtx.setProperty(NhttpConstants.SC_ACCEPTED, Boolean.TRUE);
mr.receive(responseMsgCtx);
} catch (org.apache.axis2.AxisFault af) {
log.debug("Unable to report back " +
"202 Accepted state to the message receiver");
}
}
return;
}
case HttpStatus.SC_OK : {
processResponse(conn, context, response);
return;
}
case HttpStatus.SC_INTERNAL_SERVER_ERROR: {
if (warnOnHttp500(response)) {
log.warn(getErrorMessage("Received an internal server error : "
+ response.getStatusLine().getReasonPhrase(), conn));
}
processResponse(conn, context, response);
return;
}
default : {
if (log.isDebugEnabled()) {
log.debug(getErrorMessage("HTTP status code received : " +
response.getStatusLine().getStatusCode() + " :: " +
response.getStatusLine().getReasonPhrase(), conn));
}
Header contentType = response.getFirstHeader(HTTP.CONTENT_TYPE);
if (contentType != null) {
if ((contentType.getValue().indexOf(SOAP11Constants.SOAP_11_CONTENT_TYPE) >= 0)
|| contentType.getValue().indexOf(
SOAP12Constants.SOAP_12_CONTENT_TYPE) >=0) {
if (log.isDebugEnabled()) {
log.debug("Received an unexpected response with a SOAP payload");
}
} else if (contentType.getValue().indexOf("html") == -1) {
if (log.isDebugEnabled()) {
log.debug("Received an unexpected response with a POX/REST payload");
}
} else {
log.warn(getErrorMessage("Received an unexpected response - " +
"of content type : " + contentType.getValue() +
" and status code : " + response.getStatusLine().getStatusCode() +
" with reason : " +
response.getStatusLine().getReasonPhrase(), conn));
}
} else {
if (log.isDebugEnabled()) {
log.debug(getErrorMessage("Received a response - " +
"without a content type with status code : " +
response.getStatusLine().getStatusCode() + " and reason : " +
response.getStatusLine().getReasonPhrase(), conn));
}
}
processResponse(conn, context, response);
}
}
}
private void setHeaders(HttpContext context, HttpResponse response,
MessageContext outMsgCtx, MessageContext responseMsgCtx) {
Header[] headers = response.getAllHeaders();
if (headers != null && headers.length > 0) {
Map<String, String> headerMap
= new TreeMap<String, String>(new Comparator<String>() {
public int compare(String o1, String o2) {
return o1.compareToIgnoreCase(o2);
}
});
String endpointURLPrefix = (String) context.getAttribute(NhttpConstants.ENDPOINT_PREFIX);
String servicePrefix = (String) outMsgCtx.getProperty(NhttpConstants.SERVICE_PREFIX);
for (int i = 0; i < headers.length; i++) {
Header header = headers[i];
if ("Location".equals(header.getName())
&& endpointURLPrefix != null && servicePrefix != null) {
//Here, we are changing only the host name and the port of the new URI - value of the Location
//header.
//If the new URI is again referring to a resource in the server to which the original request
//is sent, then replace the hostname and port of the URI with the hostname and port of synapse
//We are not changing the request url here, only the host name and the port.
try {
URI serviceURI = new URI(servicePrefix);
URI endpointURI = new URI(endpointURLPrefix);
URI locationURI = new URI(header.getValue());
if (locationURI.getHost().equalsIgnoreCase(endpointURI.getHost())) {
URI newURI = new URI(locationURI.getScheme(), locationURI.getUserInfo(),
serviceURI.getHost(), serviceURI.getPort(), locationURI.getPath(),
locationURI.getQuery(), locationURI.getFragment());
headerMap.put(header.getName(), newURI.toString());
responseMsgCtx.setProperty(NhttpConstants.SERVICE_PREFIX,
outMsgCtx.getProperty(NhttpConstants.SERVICE_PREFIX));
}
} catch (URISyntaxException e) {
log.error(e.getMessage(), e);
}
} else {
headerMap.put(header.getName(), header.getValue());
}
}
responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, headerMap);
}
}
/**
* Checks whether the provided 500 response shall be logged as a warning.
* The behavior can be configured based on the content type of the message via a transport
* parameter in axis2.xml named <code>warnOnHTTP500</code>.
*
* @param response an http 500 response
*
* @return true, if a warning shall be logged, otherwise false
*/
private boolean warnOnHttp500(final HttpResponse response) {
if (warnOnHttp500 == null || warnOnHttp500.length == 0) {
return true;
}
for (String contentType : warnOnHttp500) {
if (contentType == null || contentType.trim().equals("*")) {
return true;
}
}
// determine content type of the response message
Header contentTypeHeader = response.getFirstHeader(CONTENT_TYPE);
String messageContentType;
if (contentTypeHeader == null) {
messageContentType = "none";
} else {
messageContentType = contentTypeHeader.getValue();
if (messageContentType == null || messageContentType.trim().length() == 0) {
messageContentType = "none";
}
}
// test if one of the content types matches
for (String contentType : warnOnHttp500) {
if (messageContentType.startsWith(contentType)) {
return true;
}
}
return false;
}
/**
* Perform processing of the received response though Axis2
*
* @param conn HTTP connection to be processed
* @param context HTTP context associated with the connection
* @param response HTTP response associated with the connection
*/
private void processResponse(final NHttpClientConnection conn, HttpContext context,
HttpResponse response) {
ContentInputBuffer inputBuffer = null;
MessageContext outMsgContext = (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT);
String endptPrefix = (String) context.getAttribute(NhttpConstants.ENDPOINT_PREFIX);
String requestMethod = (String) context.getAttribute(NhttpConstants.HTTP_REQ_METHOD);
int statusCode = response.getStatusLine().getStatusCode();
boolean expectEntityBody = false;
if (!"HEAD".equals(requestMethod) && !"OPTIONS".equals(requestMethod) &&
statusCode >= HttpStatus.SC_OK
&& statusCode != HttpStatus.SC_NO_CONTENT
&& statusCode != HttpStatus.SC_NOT_MODIFIED
&& statusCode != HttpStatus.SC_RESET_CONTENT) {
expectEntityBody = true;
}
if (expectEntityBody) {
inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), allocator);
context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
BasicHttpEntity entity = new BasicHttpEntity();
if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
entity.setChunked(true);
}
response.setEntity(entity);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
} else {
conn.resetInput();
conn.resetOutput();
if (context.getAttribute(NhttpConstants.DISCARD_ON_COMPLETE) != null ||
!connStrategy.keepAlive(response, context)) {
try {
// this is a connection we should not re-use
ConnectionPool.forget(conn);
shutdownConnection(conn);
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
} catch (Exception ignore) {}
} else {
ConnectionPool.release(conn);
}
}
workerPool.execute(
new ClientWorker(cfgCtx,
inputBuffer == null ? null : new ContentInputStream(inputBuffer),
response, outMsgContext, endptPrefix));
}
public void execute(Runnable task) {
workerPool.execute(task);
}
/**
* Shutdown the connection ignoring any IO errors during the process
*
* @param conn the connection to be shutdown
*/
private void shutdownConnection(final NHttpClientConnection conn) {
if (conn instanceof HttpInetConnection) {
HttpInetConnection inetConnection = (HttpInetConnection) conn;
if (log.isDebugEnabled()) {
log.debug("Connection to remote address : " + inetConnection.getRemoteAddress()
+ ":" + inetConnection.getRemotePort() + " from local address : "
+ inetConnection.getLocalAddress() + ":" + inetConnection.getLocalPort() +
" is closed!");
}
if (countConnections) {
removeConnectionRecord(inetConnection);
}
}
HttpContext context = conn.getContext();
SharedOutputBuffer outputBuffer = (SharedOutputBuffer)
context.getAttribute(REQUEST_SOURCE_BUFFER);
if (outputBuffer != null) {
outputBuffer.close();
}
SharedInputBuffer inputBuffer = (SharedInputBuffer)
context.getAttribute(RESPONSE_SINK_BUFFER);
if (inputBuffer != null) {
inputBuffer.close();
}
try {
conn.shutdown();
} catch (IOException ignore) {}
context.removeAttribute(RESPONSE_SINK_BUFFER);
context.removeAttribute(REQUEST_SOURCE_BUFFER);
context.removeAttribute(CLIENT_CONNECTION_DEBUG);
context.removeAttribute(CONNECTION_CREATION_TIME);
}
/**
* Remove a connection record for this host:port pair from active connections records.
*
* @param inetConnection connection that need to be removed from the active connections records
*/
private void removeConnectionRecord(HttpInetConnection inetConnection) {
AtomicInteger connections = openConnections.get(
inetConnection.getRemoteAddress().getHostName() + ":"
+ inetConnection.getRemotePort());
if (connections == null) {
connections = openConnections.get(
inetConnection.getRemoteAddress().getHostAddress() + ":"
+ inetConnection.getRemotePort());
}
if (connections != null) {
int no = connections.getAndDecrement();
lock.lock();
try {
if (no == 0) {
if (null == openConnections.remove(
inetConnection.getRemoteAddress().getHostName()
+ ":" + inetConnection.getRemotePort())) {
} else {
openConnections.remove(
inetConnection.getRemoteAddress().getHostAddress()
+ ":" + inetConnection.getRemotePort());
}
}
} finally {
lock.unlock();
}
}
}
/**
* Record a connection in the active connection records.
*
* @param conn connection to be recorded.
*/
private void recordConnection(NHttpClientConnection conn) {
if (conn instanceof HttpInetConnection) {
HttpInetConnection inetConnection = (HttpInetConnection) conn;
// first we try to get the connection with host_addrss:port key
AtomicInteger connections = openConnections.get(
inetConnection.getRemoteAddress().getHostName() + ":"
+ inetConnection.getRemotePort());
// if we fail try to get the connection with ip_address:port key
if (connections == null) {
connections = openConnections.get(
inetConnection.getRemoteAddress().getHostAddress() + ":"
+ inetConnection.getRemotePort());
}
lock.lock();
try {
if (connections == null) {
connections = new AtomicInteger();
if (inetConnection.getRemoteAddress().getHostName() != null) {
openConnections.put(
inetConnection.getRemoteAddress().getHostName() + ":"
+ inetConnection.getRemotePort(), connections);
} else {
openConnections.put(
inetConnection.getRemoteAddress().getHostAddress() + ":"
+ inetConnection.getRemotePort(), connections);
}
}
} finally {
lock.unlock();
}
connections.getAndIncrement();
}
}
/**
* Return the HttpProcessor for requests
*
* @return the HttpProcessor that processes requests
*/
private HttpProcessor getHttpProcessor() {
return new ImmutableHttpProcessor(new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent("Synapse-HttpComponents-NIO"),
new RequestExpectContinue(false));
}
public int getActiveCount() {
return workerPool.getActiveCount();
}
public int getQueueSize() {
return workerPool.getQueueSize();
}
public void stop() {
threadingView.destroy();
try {
workerPool.shutdown(1000);
} catch (InterruptedException ignore) {}
}
// ----------- utility methods -----------
private void handleException(String msg, Exception e, NHttpClientConnection conn) {
if (msg.toLowerCase().indexOf("reset") != -1) {
log.warn(msg);
} else {
log.error(msg, e);
}
if (conn != null) {
shutdownConnection(conn);
}
}
private MessageContext getMessageContext(final NHttpClientConnection conn) {
HttpContext context = conn.getContext();
Axis2HttpRequest axis2Req = (Axis2HttpRequest) context.getAttribute(AXIS2_HTTP_REQUEST);
if (axis2Req != null) {
return axis2Req.getMsgContext();
}
return null;
}
private void setServerContextAttribute(String key, Object value, NHttpClientConnection conn) {
MessageContext msgCtx = getMessageContext(conn);
if (msgCtx != null) {
Object outTransport = msgCtx.getProperty(Constants.OUT_TRANSPORT_INFO);
if (outTransport != null && outTransport instanceof ServerWorker) {
HttpContext context = ((ServerWorker) outTransport).getConn().getContext();
context.setAttribute(key, value);
}
}
}
}