blob: 32d64c2f55ccd1c8a12ec9297299d3962bc37ff8 [file] [log] [blame]
/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.core.throttler;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.system.ManifoldCF;
import java.util.concurrent.atomic.*;
import java.util.*;
/** Connection tracking for a bin.
*
* This class keeps track of information needed to figure out throttling for connections,
* on a bin-by-bin basis. It is *not*, however, a connection pool. Actually establishing
* connections, and pooling established connections, is functionality that must reside in the
* caller.
*
* The 'connections' each connection bin tracks are connections outstanding that share this bin name.
* Not all such connections are identical; some may in fact have entirely different sets of
* bins associated with them, but they all have the specific bin in common. Since each bin has its
* own unique limit, this effectively means that in order to get a connection, you need to find an
* available slot in ALL of its constituent connection bins. If the connections are pooled, it makes
* the most sense to divide the pool up by characteristics such that identical connections are all
* handled together - and it is reasonable to presume that an identical connection has identical
* connection bins.
*
* NOTE WELL: This is entirely local in operation
*/
public class ConnectionBin
{
/** True if this bin is alive still */
protected boolean isAlive = true;
/** This is the bin name which this connection pool belongs to */
protected final String binName;
/** Service type name */
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
/** The target calculation lock name */
protected final String targetCalcLockName;
/** This is the maximum number of active connections allowed for this bin */
protected int maxActiveConnections = 0;
/** This is the local maximum number of active connections allowed for this bin */
protected int localMax = 0;
/** This is the number of connections in this bin that have been reserved - that is, they
* are promised to various callers, but those callers have not yet committed to obtaining them. */
protected int reservedConnections = 0;
/** This is the number of connections in this bin that are connected; immaterial whether they are
* in use or in a pool somewhere. */
protected int inUseConnections = 0;
/** The service type prefix for connection bins */
protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
/** The target calculation lock prefix */
protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
/** Random number */
protected final static Random randomNumberGenerator = new Random();
/** Constructor. */
public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
throws ManifoldCFException
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
{
return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
}
/** Get the bin name. */
public String getBinName()
{
return binName;
}
/** Update the maximum number of active connections.
*/
public synchronized void updateMaxActiveConnections(int maxActiveConnections)
{
// Update the number; the poller will wake up any waiting threads.
this.maxActiveConnections = maxActiveConnections;
}
/** Wait for a connection to become available, in the context of an existing connection pool.
*@param poolCount is the number of connections in the pool times the number of bins per connection.
* This parameter is only ever changed in this class!!
*@return a recommendation as to how to proceed, using the IConnectionThrottler values. If the
* recommendation is to create a connection, a slot will be reserved for that purpose. A
* subsequent call to noteConnectionCreation() will be needed to confirm the reservation, or clearReservation() to
* release the reservation.
*/
public synchronized int waitConnectionAvailable(AtomicInteger poolCount)
throws InterruptedException
{
// Reserved connections keep a slot available which can't be used by anyone else.
// Connection bins are always sorted so that deadlocks can't occur.
// Once all slots are reserved, the caller will go ahead and create the necessary connection
// and convert the reservation to a new connection.
while (true)
{
if (!isAlive)
return IConnectionThrottler.CONNECTION_FROM_NOWHERE;
int currentPoolCount = poolCount.get();
if (currentPoolCount > 0)
{
// Recommendation is to pull the connection from the pool.
poolCount.set(currentPoolCount - 1);
return IConnectionThrottler.CONNECTION_FROM_POOL;
}
if (inUseConnections + reservedConnections < localMax)
{
reservedConnections++;
return IConnectionThrottler.CONNECTION_FROM_CREATION;
}
// Wait for a connection to free up. Note that it is up to the caller to free stuff up.
wait();
}
}
/** Undo what we had decided to do before.
*@param recommendation is the decision returned by waitForConnection() above.
*/
public synchronized void undoReservation(int recommendation, AtomicInteger poolCount)
{
if (recommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
{
if (reservedConnections == 0)
throw new IllegalStateException("Can't clear a reservation we don't have");
reservedConnections--;
notifyAll();
}
else if (recommendation == IConnectionThrottler.CONNECTION_FROM_POOL)
{
poolCount.set(poolCount.get() + 1);
notifyAll();
}
}
/** Note the creation of an active connection that belongs to this bin. The connection MUST
* have been reserved prior to the connection being created.
*/
public synchronized void noteConnectionCreation()
{
if (reservedConnections == 0)
throw new IllegalStateException("Creating a connection when no connection slot reserved!");
reservedConnections--;
inUseConnections++;
// No notification needed because the total number of reserved+active connections did not change.
}
/** Figure out whether we are currently over target or not for this bin.
*/
public synchronized boolean shouldReturnedConnectionBeDestroyed()
{
// We don't count reserved connections here because those are not yet committed
return inUseConnections > localMax;
}
public static final int CONNECTION_DESTROY = 0;
public static final int CONNECTION_POOLEMPTY = 1;
public static final int CONNECTION_WITHINBOUNDS = 2;
/** Figure out whether we are currently over target or not for this bin, and whether a
* connection should be pulled from the pool and destroyed.
* Note that this is tricky in conjunction with other bins, because those other bins
* may conclude that we can't destroy a connection. If so, we just return the stolen
* connection back to the pool.
*@return CONNECTION_DESTROY, CONNECTION_POOLEMPTY, or CONNECTION_WITHINBOUNDS.
*/
public synchronized int shouldPooledConnectionBeDestroyed(AtomicInteger poolCount)
{
int currentPoolCount = poolCount.get();
if (currentPoolCount > 0)
{
// Consider it removed from the pool for the purposes of consideration. If we change our minds, we'll
// return it, and no harm done.
poolCount.set(currentPoolCount-1);
// We don't count reserved connections here because those are not yet committed.
if (inUseConnections > localMax)
{
return CONNECTION_DESTROY;
}
return CONNECTION_WITHINBOUNDS;
}
return CONNECTION_POOLEMPTY;
}
/** Check only if there's a pooled connection, and make moves to take it from the pool.
*/
public synchronized boolean hasPooledConnection(AtomicInteger poolCount)
{
int currentPoolCount = poolCount.get();
if (currentPoolCount > 0)
{
poolCount.set(currentPoolCount-1);
return true;
}
return false;
}
/** Undo the decision to destroy a pooled connection.
*/
public synchronized void undoPooledConnectionDecision(AtomicInteger poolCount)
{
poolCount.set(poolCount.get() + 1);
notifyAll();
}
/** Note a connection returned to the pool.
*/
public synchronized void noteConnectionReturnedToPool(AtomicInteger poolCount)
{
poolCount.set(poolCount.get() + 1);
// Wake up threads possibly waiting on a pool return.
notifyAll();
}
/** Note the destruction of an active connection that belongs to this bin.
*/
public synchronized void noteConnectionDestroyed()
{
inUseConnections--;
notifyAll();
}
/** Poll this bin */
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
// The meat of the cross-cluster apportionment algorithm goes here!
// Two global numbers each service posts: "in-use" and "target". At no time does a service *ever* post either a "target"
// that, together with all other active service targets, is in excess of the max. Also, at no time a service post
// a target that, when added to the other "in-use" values, exceeds the max. If the "in-use" values everywhere else
// already equal or exceed the max, then the target will be zero.
// The target quota is calculated as follows:
// (1) Target is summed, excluding ours. This is GlobalTarget.
// (2) In-use is summed, excluding ours. This is GlobalInUse.
// (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
// smaller, but never less than zero.
// (4) Our FairTarget is computed. The FairTarget divides the Maximum by the number of services, and adds
// 1 randomly based on the remainder.
// (5) We compute OptimalTarget as follows: We start with current local target. If current local target
// exceeds current local in-use count, we adjust OptimalTarget downward by one. Otherwise we increase it
// by one.
// (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.enterWriteLock(targetCalcLockName);
try
{
// Compute MaximumTarget
SumClass sumClass = new SumClass(serviceName);
lockManager.scanServiceData(serviceTypeName, sumClass);
//System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
int numServices = sumClass.getNumServices();
if (numServices == 0)
return;
int globalTarget = sumClass.getGlobalTarget();
int globalInUse = sumClass.getGlobalInUse();
int maximumTarget = maxActiveConnections - globalTarget;
if (maximumTarget > maxActiveConnections - globalInUse)
maximumTarget = maxActiveConnections - globalInUse;
if (maximumTarget < 0)
maximumTarget = 0;
// Compute FairTarget
int fairTarget = maxActiveConnections / numServices;
int remainder = maxActiveConnections % numServices;
// Randomly choose whether we get an addition to the FairTarget
if (randomNumberGenerator.nextInt(numServices) < remainder)
fairTarget++;
// Compute OptimalTarget
int localInUse = inUseConnections;
int optimalTarget = localMax;
if (localMax > localInUse)
optimalTarget--;
else
{
// We want a fast ramp up, so make this proportional to maxActiveConnections
int increment = maxActiveConnections >> 2;
if (increment == 0)
increment = 1;
optimalTarget += increment;
}
//System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
// Now compute actual target
int target = maximumTarget;
if (target > fairTarget)
target = fairTarget;
if (target > optimalTarget)
target = optimalTarget;
// Write these values to the service data variables.
// NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
// So, that's why we have a write lock around the pool calculations.
lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
// Now, update our localMax, if it needs it.
if (target == localMax)
return;
localMax = target;
notifyAll();
}
finally
{
lockManager.leaveWriteLock(targetCalcLockName);
}
}
/** Shut down the bin, and release everything that is waiting on it.
*/
public synchronized void shutDown(IThreadContext threadContext)
throws ManifoldCFException
{
isAlive = false;
notifyAll();
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.endServiceActivity(serviceTypeName, serviceName);
}
// Protected classes and methods
protected static class SumClass implements IServiceDataAcceptor
{
protected final String serviceName;
protected int numServices = 0;
protected int globalTargetTally = 0;
protected int globalInUseTally = 0;
public SumClass(String serviceName)
{
this.serviceName = serviceName;
}
@Override
public boolean acceptServiceData(String serviceName, byte[] serviceData)
throws ManifoldCFException
{
numServices++;
if (!serviceName.equals(this.serviceName))
{
globalTargetTally += unpackTarget(serviceData);
globalInUseTally += unpackInUse(serviceData);
}
return false;
}
public int getNumServices()
{
return numServices;
}
public int getGlobalTarget()
{
return globalTargetTally;
}
public int getGlobalInUse()
{
return globalInUseTally;
}
}
protected static int unpackTarget(byte[] data)
{
if (data == null || data.length != 8)
return 0;
return (((int)data[0]) & 0xff) +
((((int)data[1]) << 8) & 0xff00) +
((((int)data[2]) << 16) & 0xff0000) +
((((int)data[3]) << 24) & 0xff000000);
}
protected static int unpackInUse(byte[] data)
{
if (data == null || data.length != 8)
return 0;
return (((int)data[4]) & 0xff) +
((((int)data[5]) << 8) & 0xff00) +
((((int)data[6]) << 16) & 0xff0000) +
((((int)data[7]) << 24) & 0xff000000);
}
protected static byte[] pack(int target, int inUse)
{
byte[] rval = new byte[8];
rval[0] = (byte)(target & 0xff);
rval[1] = (byte)((target >> 8) & 0xff);
rval[2] = (byte)((target >> 16) & 0xff);
rval[3] = (byte)((target >> 24) & 0xff);
rval[4] = (byte)(inUse & 0xff);
rval[5] = (byte)((inUse >> 8) & 0xff);
rval[6] = (byte)((inUse >> 16) & 0xff);
rval[7] = (byte)((inUse >> 24) & 0xff);
return rval;
}
}