blob: d3b5203342c57d34ac91aae5f6fb83268ca89e68 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.nhttp;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.axis2.transport.TransportSender;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.protocol.HTTP;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.xml.stream.XMLStreamException;
import javax.net.ssl.SSLContext;
import java.io.OutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetSocketAddress;
/**
* NIO transport sender for Axis2 based on HttpCore and NIO extensions
*/
public class HttpCoreNIOSender extends AbstractHandler implements TransportSender {
private static final Log log = LogFactory.getLog(HttpCoreNIOSender.class);
/** The Axis2 configuration context */
private ConfigurationContext cfgCtx;
/** The IOReactor */
private ConnectingIOReactor ioReactor = null;
/** The client handler */
private NHttpClientHandler handler = null;
/** The SSL Context to be used */
SSLContext sslContext = null;
/**
* Initialize the transport sender, and execute reactor in new seperate thread
* @param cfgCtx the Axis2 configuration context
* @param transportOut the description of the http/s transport from Axis2 configuration
* @throws AxisFault thrown on an error
*/
public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault {
this.cfgCtx = cfgCtx;
// is this an SSL Sender?
sslContext = getSSLContext(transportOut);
// start the Sender in a new seperate thread
Thread t = new Thread(new Runnable() {
public void run() {
executeClientEngine();
}
}, "HttpCoreNIOSender");
t.start();
log.info("Sender started");
}
/**
* Configure and start the IOReactor
*/
private void executeClientEngine() {
HttpParams params = getClientParameters();
try {
ioReactor = new DefaultConnectingIOReactor(2, params);
} catch (IOException e) {
log.error("Error starting the IOReactor", e);
}
handler = new ClientHandler(cfgCtx, params);
IOEventDispatch ioEventDispatch = getEventDispatch(handler, sslContext, params);
try {
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
log.fatal("Reactor Interrupted");
} catch (IOException e) {
log.fatal("Encountered an I/O error: " + e.getMessage(), e);
}
log.info("Sender Shutdown");
}
/**
* Return the IOEventDispatch implementation to be used. This is overridden by the
* SSL sender
* @param handler
* @param sslContext
* @param params
* @return
*/
protected IOEventDispatch getEventDispatch(
NHttpClientHandler handler, SSLContext sslContext, HttpParams params) {
return new DefaultClientIOEventDispatch(handler, params);
}
/**
* Always return null, as this implementation does not support outgoing SSL
* @param transportOut
* @return null
* @throws AxisFault
*/
protected SSLContext getSSLContext(TransportOutDescription transportOut) throws AxisFault {
return null;
}
/**
* get HTTP protocol parameters to which the sender must adhere to
* @return the applicable HTTP protocol parameters
*/
private HttpParams getClientParameters() {
HttpParams params = new BasicHttpParams();
params
.setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
.setIntParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 10000)
.setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
.setParameter(HttpProtocolParams.USER_AGENT, "Synapse-HttpComponents-NIO");
return params;
}
/**
* transport sender invocation from Axis2 core
* @param msgContext message to be sent
* @return the invocation response (always InvocationResponse.CONTINUE)
* @throws AxisFault on error
*/
public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
EndpointReference epr = Util.getDestinationEPR(msgContext);
if (epr != null) {
if (!AddressingConstants.Final.WSA_NONE_URI.equals(epr.getAddress())) {
sendAsyncRequest(epr, msgContext);
} else {
handleException("Cannot send message to " + AddressingConstants.Final.WSA_NONE_URI);
}
} else {
if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) != null) {
if (msgContext.getProperty(Constants.OUT_TRANSPORT_INFO) instanceof ServerWorker) {
sendAsyncResponse(msgContext);
} else {
sendUsingOutputStream(msgContext);
}
} else {
handleException("No valid destination EPR or OutputStream to send message");
}
}
if (msgContext.getOperationContext() != null) {
msgContext.getOperationContext().setProperty(
Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
}
return InvocationResponse.CONTINUE;
}
/**
* Send the request message asynchronously to the given EPR
* @param epr the destination EPR for the message
* @param msgContext the message being sent
* @throws AxisFault on error
*/
private void sendAsyncRequest(EndpointReference epr, MessageContext msgContext) throws AxisFault {
try {
URL url = new URL(epr.getAddress());
HttpHost httpHost = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr, httpHost, msgContext);
NHttpClientConnection conn = ConnectionPool.getConnection(url.getHost(), url.getPort());
if (conn == null) {
SessionRequest req = ioReactor.connect(
new InetSocketAddress(url.getHost(), url.getPort()), null, axis2Req);
log.debug("A new connection established");
} else {
((ClientHandler) handler).submitRequest(conn, axis2Req);
log.debug("An existing connection reused");
}
axis2Req.streamMessageContents();
} catch (MalformedURLException e) {
handleException("Malformed destination EPR : " + epr.getAddress(), e);
} catch (IOException e) {
handleException("IO Error while submiting request message for sending", e);
}
}
/**
* Send the passed in response message, asynchronously
* @param msgContext the message context to be sent
* @throws AxisFault on error
*/
private void sendAsyncResponse(MessageContext msgContext) throws AxisFault {
ServerWorker worker = (ServerWorker) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
HttpResponse response = worker.getResponse();
OMOutputFormat format = Util.getOMOutputFormat(msgContext);
response.setHeader(HTTP.CONTENT_TYPE, Util.getContentType(msgContext) + "; charset=" + format.getCharSetEncoding());
worker.getServiceHandler().commitResponse(worker.getConn(), response);
OutputStream out = worker.getOutputStream();
format.setDoOptimize(msgContext.isDoingMTOM());
try {
(msgContext.isDoingREST() ?
msgContext.getEnvelope().getBody().getFirstElement() : msgContext.getEnvelope())
.serializeAndConsume(out, format);
out.close();
} catch (XMLStreamException e) {
handleException("Error serializing response message", e);
} catch (IOException e) {
handleException("IO Error sending response message", e);
}
try {
worker.getIs().close();
} catch (IOException ignore) {}
}
private void sendUsingOutputStream(MessageContext msgContext) throws AxisFault {
OMOutputFormat format = Util.getOMOutputFormat(msgContext);
OutputStream out =
(OutputStream) msgContext
.getProperty(MessageContext.TRANSPORT_OUT);
if (msgContext.isServerSide()) {
OutTransportInfo transportInfo =
(OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
if (transportInfo != null) {
String encoding = Util.getContentType(msgContext) + "; charset=" + format.getCharSetEncoding();
transportInfo.setContentType(encoding);
} else {
throw new AxisFault(Constants.OUT_TRANSPORT_INFO + " has not been set");
}
}
try {
(msgContext.isDoingREST() ?
msgContext.getEnvelope().getBody().getFirstElement() : msgContext.getEnvelope())
.serializeAndConsume(out, format);
} catch (XMLStreamException e) {
handleException("Error serializing response message", e);
}
}
public void cleanup(MessageContext msgContext) throws AxisFault {
// do nothing
}
public void stop() {
try {
ioReactor.shutdown();
log.info("Sender shut down");
} catch (IOException e) {
log.warn("Error shutting down IOReactor", e);
}
}
// -------------- utility methods -------------
private void handleException(String msg, Exception e) throws AxisFault {
log.error(msg, e);
throw new AxisFault(msg, e);
}
private void handleException(String msg) throws AxisFault {
log.error(msg);
throw new AxisFault(msg);
}
}