blob: 906e7f9016bd51a0e94b2a61205bea8a6155be23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.converter.IOConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyProducer extends DefaultAsyncProducer implements ServicePoolAware {
private static final transient Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
private CamelContext context;
private NettyConfiguration configuration;
private ChannelFactory channelFactory;
private DatagramChannelFactory datagramChannelFactory;
private CamelLogger noReplyLogger;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
super(nettyEndpoint);
this.configuration = configuration;
this.context = this.getEndpoint().getCamelContext();
this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
}
@Override
public NettyEndpoint getEndpoint() {
return (NettyEndpoint) super.getEndpoint();
}
@Override
public boolean isSingleton() {
// the producer should not be singleton otherwise cannot use concurrent producers and safely
// use request/reply with correct correlation
return false;
}
public CamelContext getContext() {
return context;
}
protected boolean isTcp() {
return configuration.getProtocol().equalsIgnoreCase("tcp");
}
@Override
protected void doStart() throws Exception {
super.doStart();
if (isTcp()) {
setupTCPCommunication();
} else {
setupUDPCommunication();
}
if (!configuration.isLazyChannelCreation()) {
// ensure the connection can be established when we start up
openAndCloseConnection();
}
}
@Override
protected void doStop() throws Exception {
LOG.debug("Stopping producer at address: {}", configuration.getAddress());
// close all channels
ChannelGroupFuture future = ALL_CHANNELS.close();
future.awaitUninterruptibly();
// and then release other resources
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
super.doStop();
}
public boolean process(final Exchange exchange, final AsyncCallback callback) {
if (!isRunAllowed()) {
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
callback.done(true);
return true;
}
Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
if (body == null) {
noReplyLogger.log("No payload to send for exchange: " + exchange);
callback.done(true);
return true;
}
// if textline enabled then covert to a String which must be used for textline
if (getConfiguration().isTextline()) {
try {
body = NettyHelper.getTextlineBody(body, exchange, getConfiguration().getDelimiter(), getConfiguration().isAutoAppendDelimiter());
} catch (NoTypeConversionAvailableException e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
// set the exchange encoding property
if (getConfiguration().getCharsetName() != null) {
exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
}
ChannelFuture channelFuture;
final Channel channel;
try {
channelFuture = openConnection(exchange, callback);
channel = openChannel(channelFuture);
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
return true;
}
// log what we are writing
LOG.debug("Writing body: {}", body);
// write the body asynchronously
ChannelFuture future = channel.write(body);
// add listener which handles the operation
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
LOG.debug("Operation complete {}", channelFuture);
if (!channelFuture.isSuccess()) {
// no success the set the caused exception and signal callback and break
exchange.setException(channelFuture.getCause());
callback.done(false);
return;
}
// if we do not expect any reply then signal callback to continue routing
if (!configuration.isSync()) {
try {
// should channel be closed after complete?
Boolean close;
if (ExchangeHelper.isOutCapable(exchange)) {
close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
} else {
close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
}
// should we disconnect, the header can override the configuration
boolean disconnect = getConfiguration().isDisconnect();
if (close != null) {
disconnect = close;
}
if (disconnect) {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
}
NettyHelper.close(channel);
}
} finally {
// signal callback to continue routing
callback.done(false);
}
}
}
});
// continue routing asynchronously
return false;
}
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
ExecutorService bossExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
}
}
protected void setupUDPCommunication() throws Exception {
if (datagramChannelFactory == null) {
ExecutorService workerExecutor = context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
configuration.getCorePoolSize(), configuration.getMaxPoolSize());
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
}
}
private ChannelFuture openConnection(Exchange exchange, AsyncCallback callback) throws Exception {
ChannelFuture answer;
ChannelPipeline clientPipeline;
if (configuration.getClientPipelineFactory() != null) {
// initialize user defined client pipeline factory
configuration.getClientPipelineFactory().setProducer(this);
configuration.getClientPipelineFactory().setExchange(exchange);
configuration.getClientPipelineFactory().setCallback(callback);
clientPipeline = configuration.getClientPipelineFactory().getPipeline();
} else {
// initialize client pipeline factory
ClientPipelineFactory clientPipelineFactory = new DefaultClientPipelineFactory(this, exchange, callback);
// must get the pipeline from the factory when opening a new connection
clientPipeline = clientPipelineFactory.getPipeline();
}
if (isTcp()) {
ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
clientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
clientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
clientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
clientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
// set the pipeline on the bootstrap
clientBootstrap.setPipeline(clientPipeline);
answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
return answer;
} else {
ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
// set the pipeline on the bootstrap
connectionlessClientBootstrap.setPipeline(clientPipeline);
connectionlessClientBootstrap.bind(new InetSocketAddress(0));
answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
return answer;
}
}
private Channel openChannel(ChannelFuture channelFuture) throws Exception {
// wait until we got connection
channelFuture.awaitUninterruptibly();
if (!channelFuture.isSuccess()) {
throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
}
Channel channel = channelFuture.getChannel();
// to keep track of all channels in use
ALL_CHANNELS.add(channel);
LOG.debug("Creating connector to address: {}", configuration.getAddress());
return channel;
}
private void openAndCloseConnection() throws Exception {
ChannelFuture future = openConnection(new DefaultExchange(context), new AsyncCallback() {
public void done(boolean doneSync) {
// noop
}
});
Channel channel = openChannel(future);
NettyHelper.close(channel);
}
public NettyConfiguration getConfiguration() {
return configuration;
}
public void setConfiguration(NettyConfiguration configuration) {
this.configuration = configuration;
}
public ChannelFactory getChannelFactory() {
return channelFactory;
}
public void setChannelFactory(ChannelFactory channelFactory) {
this.channelFactory = channelFactory;
}
public ChannelGroup getAllChannels() {
return ALL_CHANNELS;
}
}