blob: 3b162b60c76a5bc598241fd770af14a612cfc573 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.synapse.transport.passthru.connections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.synapse.transport.passthru.ConnectCallback;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.apache.synapse.transport.passthru.TargetContext;
import org.apache.synapse.transport.passthru.config.TargetConfiguration;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.net.InetSocketAddress;
/**
* Manages the connection from transport to the back end servers. It keeps track of the
* connections for host:port pair.
*/
public class TargetConnections {
private static final Log log = LogFactory.getLog(TargetConnections.class);
/** map to hold the ConnectionPools. The key is host:port */
private final Map<String, HostConnections> poolMap =
new ConcurrentHashMap<String, HostConnections>();
/** max connections per host:port pair. At the moment all the host:ports can
* have the same max */
private int maxConnections;
/** io-reactor to use for creating connections */
private ConnectingIOReactor ioReactor;
/** callback invoked when a connection is made */
private ConnectCallback callback = null;
/**
* Create a TargetConnections with the given IO-Reactor
*
* @param ioReactor the IO-Reactor
* @param targetConfiguration the configuration of the sender
* @param callback the callback
*/
public TargetConnections(ConnectingIOReactor ioReactor,
TargetConfiguration targetConfiguration,
ConnectCallback callback) {
this.maxConnections = targetConfiguration.getMaxConnections();
this.ioReactor = ioReactor;
this.callback = callback;
}
/**
* Return a connection to the host:port pair. If a connection is not available
* return <code>null</code>. If the particular host:port allows to create more connections
* this method will try to connect asynchronously. If the connection is successful it will
* be notified in a separate thread.
*
* @param host host
* @param port port
* @return Either returns a connection if already available or returns null and notifies
* the delivery agent when the connection is available
*/
public NHttpClientConnection getConnection(String host, int port) {
if (log.isDebugEnabled()) {
log.debug("Trying to get a connection to the " + host + ":" + port);
}
HostConnections pool = getConnectionPool(host, port);
// trying to get an existing connection
NHttpClientConnection connection = pool.getConnection();
if (connection == null) {
if (pool.canHaveMoreConnections()) {
ioReactor.connect(new InetSocketAddress(host, port), null, pool, callback);
} else {
log.warn("Connection pool reached maximum allowed connections for: "
+ host + ":" + port + ". Target server may have become slow");
}
}
return connection;
}
/**
* Return an existing connection to the host:port pair from connection pool.
* If a connection is not available, return <code>null</code>
*
* @param host host
* @param port port
* @return returns a connection if already available in the pool
*/
public NHttpClientConnection getExistingConnection(String host, int port) {
if (log.isDebugEnabled()) {
log.debug("Trying to get an existing connection to the " + host + ":" + port);
}
HostConnections pool = getConnectionPool(host, port);
return pool.getConnection();
}
/**
* This connection is no longer needed. So we need to close connection.
*
* @param conn connection to shutdownConnection
*/
public void closeConnection(NHttpClientConnection conn) {
HostConnections pool = (HostConnections) conn.getContext().getAttribute(
PassThroughConstants.CONNECTION_POOL);
TargetContext.get(conn).reset();
if (pool != null) {
pool.forget(conn);
} else {
// we shouldn't get here
log.fatal("Connection without a pool. Something wrong. Need to fix.");
}
try {
conn.close();
} catch (IOException ignored) {
}
}
/**
* This connection is no longer valid. So we need to shutdown connection.
*
* @param conn connection to shutdownConnection
*/
public void shutdownConnection(NHttpClientConnection conn) {
shutdownConnection(conn, false);
}
/**
* This connection is no longer valid. So we need to shutdown connection.
*
* @param conn connection to shutdownConnection
* @param isError whether an error is causing this shutdown of the connection.
* It is very important to set this flag correctly.
* When an error causing the shutdown of the connections we should not
* release associated writer buffer to the pool as it might lead into
* situations like same buffer is getting released to both source and target
* buffer factories
*/
public void shutdownConnection(NHttpClientConnection conn, boolean isError) {
HostConnections pool = (HostConnections) conn.getContext().getAttribute(
PassThroughConstants.CONNECTION_POOL);
TargetContext.get(conn).reset(isError);
if (pool != null) {
pool.forget(conn);
} else {
// we shouldn't get here
log.fatal("Connection without a pool. Something wrong. Need to fix.");
}
try {
conn.shutdown();
} catch (IOException ignored) {
}
}
/**
* Release an active connection to the pool
*
* @param conn connection to be released
*/
public void releaseConnection(NHttpClientConnection conn) {
HostConnections pool = (HostConnections) conn.getContext().getAttribute(
PassThroughConstants.CONNECTION_POOL);
TargetContext.get(conn).reset();
if (pool != null) {
pool.release(conn);
} else {
// we shouldn't get here
log.fatal("Connection without a pool. Something wrong. Need to fix.");
}
}
/**
* This method is called when a new connection is made.
*
* @param conn connection to the target server
*/
public void addConnection(NHttpClientConnection conn) {
HostConnections pool = (HostConnections) conn.getContext().getAttribute(
PassThroughConstants.CONNECTION_POOL);
if (pool != null) {
pool.addConnection(conn);
} else {
// we shouldn't get here
log.fatal("Connection without a pool. Something wrong. Need to fix.");
}
}
private HostConnections getConnectionPool(String host, int port) {
String key = host + ":" + port;
if (!poolMap.containsKey(key)) {
synchronized (poolMap) {
// see weather a pool already exists for this host:port
if (!poolMap.containsKey(key)) {
HostConnections pool = new HostConnections(host, port, maxConnections);
poolMap.put(key, pool);
}
}
}
return poolMap.get(key);
}
}