Pull up changes from old CONNECTORS-553 branch
git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-553-2@1552344 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 99d33e8..e74d1a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,37 @@
======================= 1.5-dev =====================
+CONNECTORS-846: Once a service had grabbed all connector
+instances, but was not using them any more, it would not release
+them for other cluster members to use.
+(Karl Wright)
+
+CONNECTORS-844: Remove "per JVM" from connection maximum
+messages in crawler UI.
+(Karl Wright)
+
+CONNECTORS-842: Update documentation to describe the functioning
+of the org.apache.manifoldcf.mysql.client property.
+(Chris Griffin, Karl Wright)
+
+CONNECTORS-843: Solr Connector methods do not call getSession()
+when they should, leading to unpredictable results.
+(Markus Schuch, Karl Wright)
+
+CONNECTORS-841: Always reset document schedules on job start.
+(David Morana, Karl Wright)
+
+CONNECTORS-839: Fix our CloudSolrServer usage to use multipart
+post instead of putting everything in the URL.
+(Alessandro Benedetti, Raymond Wiker, Karl Wright)
+
+CONNECTORS-838: Character-stuff names that ZooKeeper sees, to
+prevent issues with '/' characters.
+(Karl Wright)
+
+CONNECTORS-837: Specifying 1 connection causes hang.
+(Karl Wright)
+
CONNECTORS-836: Use the same thread context in the registered
shutdown objects.
(Karl Wright)
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
index 066fa67..7cc4b7b 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
@@ -52,7 +52,7 @@
{
public static final String _rcsid = "@(#)$Id: RSSConnector.java 994959 2010-09-08 10:04:42Z kwright $";
-
+ protected final static String rssThrottleGroupType = "_RSS_";
// Usage flag values
protected static final int ROBOTS_NONE = 0;
@@ -105,7 +105,7 @@
protected Robots robots = null;
/** Storage for fetcher objects */
- protected static Map fetcherMap = new HashMap();
+ protected static Map<String,ThrottledFetcher> fetcherMap = new HashMap<String,ThrottledFetcher>();
/** Storage for robots objects */
protected static Map robotsMap = new HashMap();
@@ -231,10 +231,16 @@
}
+ IThrottleGroups tg = ThrottleGroupsFactory.make(currentContext);
+ // Create the throttle group
+ tg.createOrUpdateThrottleGroup(rssThrottleGroupType, throttleGroupName, new ThrottleSpec(maxOpenConnectionsPerServer,
+ minimumMillisecondsPerFetchPerServer, minimumMillisecondsPerBytePerServer));
+
isInitialized = true;
}
}
+
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
@@ -936,11 +942,9 @@
String pathPart = url.getFile();
// Check with robots to see if it's allowed
- if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(protocol,port,hostName,url.getPath(),
+ if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(currentContext,throttleGroupName,
+ protocol,port,hostName,url.getPath(),
userAgent,from,
- minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
activities, connectionLimit))
{
@@ -955,10 +959,9 @@
{
// Now, use the fetcher, and get the file.
- IThrottledConnection connection = fetcher.createConnection(hostName,
- minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
+ IThrottledConnection connection = fetcher.createConnection(currentContext,
+ throttleGroupName,
+ hostName,
connectionLimit,
feedTimeout,
proxyHost,
@@ -5404,7 +5407,7 @@
{
synchronized (fetcherMap)
{
- ThrottledFetcher tf = (ThrottledFetcher)fetcherMap.get(throttleGroupName);
+ ThrottledFetcher tf = fetcherMap.get(throttleGroupName);
if (tf == null)
{
tf = new ThrottledFetcher();
@@ -5497,6 +5500,47 @@
// Protected classes
+ /** The throttle specification class. Each server name is a different bin in this model.
+ */
+ protected static class ThrottleSpec implements IThrottleSpec
+ {
+ protected final int maxOpenConnectionsPerServer;
+ protected final long minimumMillisecondsPerFetchPerServer;
+ protected final double minimumMillisecondsPerBytePerServer;
+
+ public ThrottleSpec(int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer,
+ double minimumMillisecondsPerBytePerServer)
+ {
+ this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
+ this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
+ this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+ }
+
+ /** Given a bin name, find the max open connections to use for that bin.
+ *@return Integer.MAX_VALUE if no limit found.
+ */
+ public int getMaxOpenConnections(String binName)
+ {
+ return maxOpenConnectionsPerServer;
+ }
+
+ /** Look up minimum milliseconds per byte for a bin.
+ *@return 0.0 if no limit found.
+ */
+ public double getMinimumMillisecondsPerByte(String binName)
+ {
+ return minimumMillisecondsPerBytePerServer;
+ }
+
+ /** Look up minimum milliseconds for a fetch for a bin.
+ *@return 0 if no limit found.
+ */
+ public long getMinimumMillisecondsPerFetch(String binName)
+ {
+ return minimumMillisecondsPerFetchPerServer;
+ }
+ }
+
/** Name/value class */
protected static class NameValue
{
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
index d9c0e56..1ff565d 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
@@ -109,10 +109,9 @@
*@param pathString is the path (non-query) part of the URL
*@return true if fetch is allowed, false otherwise.
*/
- public boolean isFetchAllowed(String protocol, int port, String hostName, String pathString,
+ public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+ String protocol, int port, String hostName, String pathString,
String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
IVersionActivity activities, int connectionLimit)
throws ManifoldCFException, ServiceInterruption
@@ -134,9 +133,9 @@
}
}
- return host.isFetchAllowed(System.currentTimeMillis(),pathString,
+ return host.isFetchAllowed(threadContext,throttleGroupName,
+ System.currentTimeMillis(),pathString,
userAgent,from,
- minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,activities,connectionLimit);
}
@@ -257,10 +256,9 @@
*@param pathString is the path string to check.
*@return true if crawling is allowed, false otherwise.
*/
- public boolean isFetchAllowed(long currentTime, String pathString,
+ public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+ long currentTime, String pathString,
String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
IVersionActivity activities, int connectionLimit)
throws ServiceInterruption, ManifoldCFException
@@ -323,9 +321,7 @@
if (readingRobots)
// This doesn't need to be synchronized because readingRobots blocks all other threads from getting at this object
- makeValid(currentTime,userAgent,from,
- minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
+ makeValid(threadContext,throttleGroupName,currentTime,userAgent,from,
proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
hostName, activities, connectionLimit);
@@ -435,9 +431,8 @@
/** Initialize the record. This method reads the robots file on the specified protocol/host/port,
* and parses it according to the rules.
*/
- protected void makeValid(long currentTime, String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
+ protected void makeValid(IThreadContext threadContext, String throttleGroupName,
+ long currentTime, String userAgent, String from,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
String hostName, IVersionActivity activities, int connectionLimit)
throws ServiceInterruption, ManifoldCFException
@@ -445,8 +440,8 @@
invalidTime = currentTime + 24L * 60L * 60L * 1000L;
// Do the fetch
- IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+ IThrottledConnection connection = fetcher.createConnection(threadContext,throttleGroupName,
+ hostName,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
{
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
index 31e0828..83a0e34 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
@@ -88,9 +88,9 @@
/** This is the lock object for that global handle counter */
protected static Integer globalHandleCounterLock = new Integer(0);
- /** This hash maps the server string (without port) to a server object, where
+ /** This hash maps the server string (without port) to a pool throttling object, where
* we can track the statistics and make sure we throttle appropriately */
- protected Map serverMap = new HashMap();
+ protected final Map<String,IConnectionThrottler> serverMap = new HashMap<String,IConnectionThrottler>();
/** Reference count for how many connections to this pool there are */
protected int refCount = 0;
@@ -151,35 +151,25 @@
/** Establish a connection to a specified URL.
* @param serverName is the FQDN of the server, e.g. foo.metacarta.com
- * @param minimumMillisecondsPerBytePerServer is the average number of milliseconds to wait
- * between bytes, on
- * average, over all streams reading from this server. That means that the
- * stream will block on fetch until the number of bytes being fetched, done
- * in the average time interval required for that fetch, would not exceed
- * the desired bandwidth.
- * @param minimumMillisecondsPerFetchPerServer is the number of milliseconds
- * between fetches, as a minimum, on a per-server basis. Set
- * to zero for no limit.
- * @param maxOpenConnectionsPerServer is the maximum number of open connections to allow for a single server.
- * If more than this number of connections would need to be open, then this connection request will block
- * until this number will no longer be exceeded.
* @param connectionLimit is the maximum desired outstanding connections at any one time.
* @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection before timing out.
*/
- public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
- int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds,
+ public synchronized IThrottledConnection createConnection(IThreadContext threadContext, String throttleGroupName,
+ String serverName, int connectionLimit, int connectionTimeoutMilliseconds,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException, ServiceInterruption
{
- Server server;
- server = (Server)serverMap.get(serverName);
+ IConnectionThrottler server;
+ server = serverMap.get(serverName);
if (server == null)
{
- server = new Server(serverName);
+ // Create a connection throttler for this server
+ IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+ server = tg.obtainConnectionThrottler(RSSConnector.rssThrottleGroupType, throttleGroupName, new String[]{serverName});
serverMap.put(serverName,server);
}
- return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+ return new ThrottledConnection(serverName, server,
connectionTimeoutMilliseconds,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
}
@@ -206,14 +196,8 @@
refCount--;
if (refCount == 0)
{
- // Close all the servers one by one
- Iterator iter = serverMap.keySet().iterator();
- while (iter.hasNext())
- {
- String serverName = (String)iter.next();
- Server server = (Server)serverMap.get(serverName);
- server.discard();
- }
+ // Since we don't have any actual pools here, this can be a no-op for now
+ // MHL
serverMap.clear();
}
}
@@ -222,14 +206,12 @@
*/
protected static class ThrottledConnection implements IThrottledConnection
{
- /** The connection bandwidth we want */
- protected final double minimumMillisecondsPerBytePerServer;
- /** The maximum open connections per server */
- protected final int maxOpenConnectionsPerServer;
- /** The minimum time between fetches */
- protected final long minimumMillisecondsPerFetchPerServer;
- /** The server object we use to track connections and fetches. */
- protected final Server server;
+ /** The server fqdn */
+ protected final String serverName;
+ /** The throttling object we use to track connections */
+ protected final IConnectionThrottler connectionThrottler;
+ /** The throttling object we use to track fetches */
+ protected final IFetchThrottler fetchThrottler;
/** Connection timeout in milliseconds */
protected final int connectionTimeoutMilliseconds;
/** The client connection manager */
@@ -259,15 +241,14 @@
/** Constructor.
*/
- public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+ public ThrottledConnection(String serverName,
+ IConnectionThrottler connectionThrottler,
+ int connectionTimeoutMilliseconds, int connectionLimit,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
- this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
- this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
- this.server = server;
+ this.serverName = serverName;
+ this.connectionThrottler = connectionThrottler;
this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
// Create the https scheme for this connection
@@ -330,7 +311,17 @@
httpClient = localHttpClient;
registerGlobalHandle(connectionLimit);
- server.registerConnection(maxOpenConnectionsPerServer);
+ try
+ {
+ int result = connectionThrottler.waitConnectionAvailable();
+ if (result != IConnectionThrottler.CONNECTION_FROM_CREATION)
+ throw new IllegalStateException("Got back unexpected value from waitForAConnection() of "+result);
+ fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+ }
}
/** Begin the fetch process.
@@ -344,7 +335,8 @@
fetchCounter = 0L;
try
{
- server.beginFetch(minimumMillisecondsPerFetchPerServer);
+ if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ throw new IllegalStateException("obtainFetchDocumentPermission() had unexpected return value");
}
catch (InterruptedException e)
{
@@ -386,7 +378,7 @@
{
StringBuilder sb = new StringBuilder(protocol);
- sb.append("://").append(server.getServerName());
+ sb.append("://").append(serverName);
if (port != -1)
sb.append(":").append(Integer.toString(port));
sb.append(urlPath);
@@ -407,8 +399,8 @@
if (lastModified != null)
executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
// Create the execution thread.
- methodThread = new ExecuteMethodThread(this, server,
- minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+ methodThread = new ExecuteMethodThread(this, fetchThrottler,
+ httpClient, executeMethod);
// Start the method thread, which will start the transaction
try
{
@@ -702,7 +694,6 @@
if (methodThread != null && threadStarted)
methodThread.abort();
long endTime = System.currentTimeMillis();
- server.endFetch();
activities.recordActivity(new Long(startFetchTime),RSSConnector.ACTIVITY_FETCH,
new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -749,7 +740,7 @@
{
// Clean up the connection pool. This should do the necessary bookkeeping to release the one connection that's sitting there.
connectionManager.shutdown();
- server.releaseConnection();
+ connectionThrottler.noteConnectionDestroyed();
releaseGlobalHandle();
}
@@ -760,23 +751,20 @@
*/
protected static class ThrottledInputstream extends InputStream
{
- /** Stream throttling parameters */
- protected double minimumMillisecondsPerBytePerServer;
- /** The throttled connection we belong to */
- protected ThrottledConnection throttledConnection;
- /** The server object we use to track throttling */
- protected Server server;
+ /** Throttled connection */
+ protected final ThrottledConnection throttledConnection;
+ /** Stream throttler */
+ protected final IStreamThrottler streamThrottler;
/** The stream we are wrapping. */
- protected InputStream inputStream;
+ protected final InputStream inputStream;
/** Constructor.
*/
- public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer)
+ public ThrottledInputstream(ThrottledConnection throttledConnection, IStreamThrottler streamThrottler, InputStream is)
{
- this.throttledConnection = connection;
- this.server = server;
+ this.throttledConnection = throttledConnection;
+ this.streamThrottler = streamThrottler;
this.inputStream = is;
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
}
/** Read a byte.
@@ -839,7 +827,8 @@
{
try
{
- server.beginRead(len,minimumMillisecondsPerBytePerServer);
+ if (streamThrottler.obtainReadPermission(len) == false)
+ throw new IllegalStateException("Throttler shut down while still active");
int amt = 0;
try
{
@@ -849,10 +838,10 @@
finally
{
if (amt == -1)
- server.endRead(len,0);
+ streamThrottler.releaseReadPermission(len,0);
else
{
- server.endRead(len,amt);
+ streamThrottler.releaseReadPermission(len,amt);
throttledConnection.logFetchCount(amt);
}
}
@@ -909,310 +898,16 @@
public void close()
throws IOException
{
- inputStream.close();
- }
-
- }
-
- /** This class represents the throttling stuff kept around for a single server.
- *
- * In order to calculate
- * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
- * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
- * window length is too long.
- *
- * One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
- * "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
- * smallest intervals.
- *
- * Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
- * at the fastest possible rate without long pauses spent doing something else. The only complication is that
- * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
- * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
- * than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
- * acceptable.
- *
- * For the "maximum open connections" limit, the best thing would be to establish a separate MultiThreadedConnectionPool
- * for each Server. Then, the limit would be automatic.
- *
- * Some notes on the algorithms used to limit server bandwidth impact
- * ==================================================================
- *
- * In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
- * the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
- * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
- * access as a way of estimating how long it will take to fetch those next n bytes.
- *
- * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
- * and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
- *
- * 1) The first chunk of any series should proceed without interference from other connections to the same server.
- * The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
- *
- * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
- * connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
- * took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
- * using less server bandwidth.
- *
- * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
- * new chunks from other connections can start.
- *
- */
- protected class Server
- {
- /** The fqdn of the server */
- protected final String serverName;
- /** This is the time of the next allowed fetch (in ms since epoch) */
- protected long nextFetchTime = 0L;
-
- // Bandwidth throttling variables
- /** Reference count for bandwidth variables */
- protected volatile int refCount = 0;
- /** The inverse rate estimate of the first fetch, in ms/byte */
- protected double rateEstimate = 0.0;
- /** Flag indicating whether a rate estimate is needed */
- protected volatile boolean estimateValid = false;
- /** Flag indicating whether rate estimation is in progress yet */
- protected volatile boolean estimateInProgress = false;
- /** The start time of this series */
- protected long seriesStartTime = -1L;
- /** Total actual bytes read in this series; this includes fetches in progress */
- protected long totalBytesRead = -1L;
-
- /** Outstanding connection counter */
- protected volatile int outstandingConnections = 0;
-
- /** Constructor */
- public Server(String serverName)
- {
- this.serverName = serverName;
- }
-
- /** Get the fqdn of the server */
- public String getServerName()
- {
- return serverName;
- }
-
- /** Register an outstanding connection (and wait until it can be obtained before proceeding) */
- public synchronized void registerConnection(int maxOutstandingConnections)
- throws ManifoldCFException
- {
try
{
- while (outstandingConnections >= maxOutstandingConnections)
- {
- wait();
- }
- outstandingConnections++;
- }
- catch (InterruptedException e)
- {
- throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
- }
- }
-
- /** Release an outstanding connection back into the pool */
- public synchronized void releaseConnection()
- {
- outstandingConnections--;
- notifyAll();
- }
-
- /** Note the start of a fetch operation. Call this method just before the actual stream access begins.
- * May wait until schedule allows.
- */
- public void beginFetch(long minimumMillisecondsPerFetchPerServer)
- throws InterruptedException
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note begin fetch for '"+serverName+"'");
- // First, do any waiting, and reschedule as needed
- long waitAmount = 0L;
- long currentTime = System.currentTimeMillis();
-
- // System.out.println("Begin fetch for server "+this.toString()+" with minimum milliseconds per fetch of "+new Long(minimumMillisecondsPerFetchPerServer).toString()+
- // " Current time: "+new Long(currentTime).toString()+ " Next fetch time: "+new Long(nextFetchTime).toString());
-
- synchronized (this)
- {
- if (currentTime < nextFetchTime)
- {
- waitAmount = nextFetchTime-currentTime;
- nextFetchTime = nextFetchTime + minimumMillisecondsPerFetchPerServer;
- }
- else
- nextFetchTime = currentTime + minimumMillisecondsPerFetchPerServer;
- }
- if (waitAmount > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("RSS: Performing a fetch wait for server '"+serverName+"' for "+
- new Long(waitAmount).toString()+" ms.");
- ManifoldCF.sleep(waitAmount);
- }
-
- // System.out.println("For server "+this.toString()+", at "+new Long(System.currentTimeMillis()).toString()+", the next fetch time is now "+new Long(nextFetchTime).toString());
-
- synchronized (this)
- {
- if (refCount == 0)
- {
- // Now, reset bandwidth throttling counters
- estimateValid = false;
- rateEstimate = 0.0;
- totalBytesRead = 0L;
- estimateInProgress = false;
- seriesStartTime = -1L;
- }
- refCount++;
- }
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Begin fetch noted for '"+serverName+"'");
-
- }
-
- /** Note the end of a fetch operation. Call this method just after the fetch completes.
- */
- public void endFetch()
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note end fetch for '"+serverName+"'");
-
- synchronized (this)
- {
- refCount--;
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: End fetch noted for '"+serverName+"'");
-
- }
-
- /** Note the start of an individual byte read of a specified size. Call this method just before the
- * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
- */
- public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
- throws InterruptedException
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note begin read for '"+serverName+"'");
-
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- while (estimateInProgress)
- wait();
- if (estimateValid == false)
- {
- seriesStartTime = currentTime;
- estimateInProgress = true;
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
- // Exit early; this thread isn't going to do any waiting
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Read begin noted; gathering stats for '"+serverName+"'");
-
- return;
- }
- }
-
- // It is possible for the following code to get interrupted. If that happens,
- // we have to unstick the threads that are waiting on the estimate!
- boolean finished = false;
- try
- {
- long waitTime = 0L;
- synchronized (this)
- {
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
-
- // Estimate the time this read will take, and wait accordingly
- long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
- // Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
- // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
- // current time. But it can't be negative.
- waitTime = (desiredEndTime - estimatedTime) - currentTime;
- }
-
- if (waitTime > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("RSS: Performing a read wait on server '"+serverName+"' of "+
- new Long(waitTime).toString()+" ms.");
- ManifoldCF.sleep(waitTime);
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Begin read noted for '"+serverName+"'");
- finished = true;
+ inputStream.close();
}
finally
{
- if (!finished)
- {
- abortRead();
- }
+ streamThrottler.closeStream();
}
}
- /** Abort a read in progress.
- */
- public void abortRead()
- {
- synchronized (this)
- {
- if (estimateInProgress)
- {
- estimateInProgress = false;
- notifyAll();
- }
- }
- }
-
- /** Note the end of an individual read from the server. Call this just after an individual read completes.
- * Pass the actual number of bytes read to the method.
- */
- public void endRead(int originalCount, int actualCount)
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note end read for '"+serverName+"'");
-
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
- if (estimateInProgress)
- {
- if (actualCount == 0)
- // Didn't actually get any bytes, so use 0.0
- rateEstimate = 0.0;
- else
- rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
- estimateValid = true;
- estimateInProgress = false;
- notifyAll();
- }
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: End read noted for '"+serverName+"'");
-
- }
-
- /** Discard this server.
- */
- public void discard()
- {
- // Nothing needed anymore
- }
-
}
/** This thread does the actual socket communication with the server.
@@ -1235,10 +930,8 @@
{
/** The connection */
protected final ThrottledConnection theConnection;
- /** The connection bandwidth we want */
- protected final double minimumMillisecondsPerBytePerServer;
- /** The server object we use to track connections and fetches. */
- protected final Server server;
+ /** The fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Client and method, all preconfigured */
protected final HttpClient httpClient;
protected final HttpRequestBase executeMethod;
@@ -1256,15 +949,13 @@
protected Throwable generalException = null;
- public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
- double minimumMillisecondsPerBytePerServer,
+ public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
HttpClient httpClient, HttpRequestBase executeMethod)
{
super();
setDaemon(true);
this.theConnection = theConnection;
- this.server = server;
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+ this.fetchThrottler = fetchThrottler;
this.httpClient = httpClient;
this.executeMethod = executeMethod;
}
@@ -1316,7 +1007,7 @@
bodyStream = response.getEntity().getContent();
if (bodyStream != null)
{
- bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+ bodyStream = new ThrottledInputstream(theConnection,fetchThrottler.createFetchStream(),bodyStream);
threadStream = new XThreadInputStream(bodyStream);
}
streamCreated = true;
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
index 13b81f4..5af995c 100644
--- a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
@@ -70,6 +70,7 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.SolrException;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
/**
@@ -158,7 +159,7 @@
try
{
- CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts);
+ CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts, new ModifiedLBHttpSolrServer(HttpClientUtil.createClient(null)));
cloudSolrServer.setZkClientTimeout(zkClientTimeout);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.setDefaultCollection(collection);
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java
new file mode 100644
index 0000000..a8b728f
--- /dev/null
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.manifoldcf.agents.output.solr;
+
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.BinaryResponseParser;
+import org.apache.solr.client.solrj.*;
+import java.net.MalformedURLException;
+import org.apache.http.client.HttpClient;
+
+
+/** This class overrides and somewhat changes the behavior of the
+* SolrJ LBHttpSolrServer class. This is so it instantiates our modified
+* HttpSolrServer class, so that multipart forms work.
+*/
+public class ModifiedLBHttpSolrServer extends LBHttpSolrServer
+{
+ private final HttpClient httpClient;
+ private final ResponseParser parser;
+
+ public ModifiedLBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
+ this(null, solrServerUrls);
+ }
+
+ /** The provided httpClient should use a multi-threaded connection manager */
+ public ModifiedLBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
+ throws MalformedURLException {
+ this(httpClient, new BinaryResponseParser(), solrServerUrl);
+ }
+
+ /** The provided httpClient should use a multi-threaded connection manager */
+ public ModifiedLBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
+ throws MalformedURLException {
+ super(httpClient, parser, solrServerUrl);
+ this.httpClient = httpClient;
+ this.parser = parser;
+ }
+
+ @Override
+ protected HttpSolrServer makeServer(String server) throws MalformedURLException {
+ return new ModifiedHttpSolrServer(server, httpClient, parser);
+ }
+
+}
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
index 770a89a..e85e1f3 100644
--- a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
@@ -434,6 +434,8 @@
public String getOutputDescription(OutputSpecification spec)
throws ManifoldCFException, ServiceInterruption
{
+ getSession();
+
StringBuilder sb = new StringBuilder();
// All the arguments need to go into this string, since they affect ingestion.
@@ -578,6 +580,7 @@
public boolean checkMimeTypeIndexable(String outputDescription, String mimeType)
throws ManifoldCFException, ServiceInterruption
{
+ getSession();
if (includedMimeTypes != null && includedMimeTypes.get(mimeType) == null)
return false;
if (excludedMimeTypes != null && excludedMimeTypes.get(mimeType) != null)
@@ -594,6 +597,7 @@
public boolean checkLengthIndexable(String outputDescription, long length)
throws ManifoldCFException, ServiceInterruption
{
+ getSession();
if (maxDocumentLength != null && length > maxDocumentLength.longValue())
return false;
return super.checkLengthIndexable(outputDescription,length);
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
index d401159..8e8b65a 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
@@ -39,6 +39,12 @@
public static final int FETCH_INTERRUPTED = -104;
public static final int FETCH_UNKNOWN_ERROR = -999;
+ /** Check whether the connection has expired.
+ *@param currentTime is the current time to use to judge if a connection has expired.
+ *@return true if the connection has expired, and should be closed.
+ */
+ public boolean hasExpired(long currentTime);
+
/** Begin the fetch process.
* @param fetchType is a short descriptive string describing the kind of fetch being requested. This
* is used solely for logging purposes.
@@ -113,8 +119,13 @@
public void doneFetch(IVersionActivity activities)
throws ManifoldCFException;
- /** Close the connection. Call this to end this server connection.
+ /** Close the connection. Call this to return the connection to
+ * its pool.
*/
- public void close()
- throws ManifoldCFException;
+ public void close();
+
+ /** Destroy the connection. Call this to close the connection.
+ */
+ public void destroy();
+
}
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
index db7b063..c121568 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
@@ -33,13 +33,13 @@
* any given bin value as much as possible. For that reason I've organized this structure
* accordingly.
*/
-public class ThrottleDescription
+public class ThrottleDescription implements IThrottleSpec
{
public static final String _rcsid = "@(#)$Id: ThrottleDescription.java 988245 2010-08-23 18:39:35Z kwright $";
/** This is the hash that contains everything. It's keyed by the regexp string itself.
* Values are ThrottleItem's. */
- protected HashMap patternHash = new HashMap();
+ protected Map<String,ThrottleItem> patternHash = new HashMap<String,ThrottleItem>();
/** Constructor. Build the description from the ConfigParams. */
public ThrottleDescription(ConfigParams configData)
@@ -146,17 +146,15 @@
}
/** Given a bin name, find the max open connections to use for that bin.
- *@return -1 if no limit found.
+ *@return Integer.MAX_VALUE if no limit found.
*/
+ @Override
public int getMaxOpenConnections(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
int maxCount = -1;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Integer limit = ti.getMaxOpenConnections();
if (limit != null)
{
@@ -169,22 +167,24 @@
}
}
}
+ if (maxCount == -1)
+ maxCount = Integer.MAX_VALUE;
+ else if (maxCount == 0)
+ maxCount = 1;
return maxCount;
}
/** Look up minimum milliseconds per byte for a bin.
*@return 0.0 if no limit found.
*/
+ @Override
public double getMinimumMillisecondsPerByte(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
double minMilliseconds = 0.0;
boolean seenSomething = false;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Double limit = ti.getMinimumMillisecondsPerByte();
if (limit != null)
{
@@ -206,16 +206,14 @@
/** Look up minimum milliseconds for a fetch for a bin.
*@return 0 if no limit found.
*/
+ @Override
public long getMinimumMillisecondsPerFetch(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
long minMilliseconds = 0L;
boolean seenSomething = false;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Long limit = ti.getMinimumMillisecondsPerFetch();
if (limit != null)
{
@@ -239,7 +237,7 @@
protected static class ThrottleItem
{
/** The bin-matching pattern. */
- protected Pattern pattern;
+ protected final Pattern pattern;
/** The minimum milliseconds between bytes, or null if no limit. */
protected Double minimumMillisecondsPerByte = null;
/** The minimum milliseconds per fetch, or null if no limit */
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
index e035198..e1f42da 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
@@ -100,6 +100,12 @@
{
public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $";
+ /** Web throttle group type */
+ protected static final String webThrottleGroupType = "_WEB_";
+
+ /** Idle timeout */
+ protected static final long idleTimeout = 300000L;
+
/** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */
protected static final boolean recordEverything = false;
@@ -109,18 +115,13 @@
protected static final long TIME_6HRS = 6L * 60L * 60000L;
protected static final long TIME_1DAY = 24L * 60L * 60000L;
+ /** The read chunk length */
+ protected static final int READ_CHUNK_LENGTH = 4096;
- // The following static bin pools correspond to global resources that will be managed via ILockManager.
+ /** Connection pools.
+ /* This is a static hash of the connection pools in existence. Each connection pool represents a set of identical connections. */
+ protected final static Map<ConnectionPoolKey,ConnectionPool> connectionPools = new HashMap<ConnectionPoolKey,ConnectionPool>();
- /** This is the static pool of ConnectionBin's, keyed by bin name. */
- protected static Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
- /** This is the static pool of ThrottleBin's, keyed by bin name. */
- protected static Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
-
- /** This global lock protects the "distributed pool" resource, and insures that a connection
- * can get pulled out of all the right pools and wind up in only the hands of one thread. */
- protected static Integer poolLock = new Integer(0);
-
/** Current host name */
private static String currentHost = null;
static
@@ -138,17 +139,13 @@
}
}
- /** The read chunk length */
- protected static final int READ_CHUNK_LENGTH = 4096;
-
- /** Constructor.
+ /** Constructor. Private since we never instantiate.
*/
- public ThrottledFetcher()
+ private ThrottledFetcher()
{
}
-
/** Obtain a connection to specified protocol, server, and port. We use the protocol because the
* setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if
* we distinguish connections based on that.
@@ -164,15 +161,22 @@
*@param connectionLimit isthe maximum number of connections permitted.
*@return an IThrottledConnection object that can be used to fetch from the port.
*/
- public static IThrottledConnection getConnection(String protocol, String server, int port,
+ public static IThrottledConnection getConnection(IThreadContext threadContext, String throttleGroupName,
+ String protocol, String server, int port,
PageCredentials authentication,
IKeystoreManager trustStore,
- ThrottleDescription throttleDescription, String[] binNames,
+ IThrottleSpec throttleDescription, String[] binNames,
int connectionLimit,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
- // Create the https scheme for this connection
+ // Get a throttle groups handle
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ // Create the appropruate throttle group, or update the throttle description for an existing one
+ throttleGroups.createOrUpdateThrottleGroup(webThrottleGroupType,throttleGroupName,throttleDescription);
+
+ // Create the https scheme and trust store string for this connection
javax.net.ssl.SSLSocketFactory baseFactory;
String trustStoreString;
if (trustStore != null)
@@ -186,781 +190,68 @@
trustStoreString = null;
}
-
- ConnectionBin[] bins = new ConnectionBin[binNames.length];
-
- // Now, start looking for a connection
- int i = 0;
- while (i < binNames.length)
+ // Construct a connection pool key
+ ConnectionPoolKey poolKey = new ConnectionPoolKey(protocol,server,port,authentication,
+ trustStoreString,proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+
+ ConnectionPool p;
+ synchronized (connectionPools)
{
- String binName = binNames[i];
-
- // Find or create the bin object
- ConnectionBin cb;
- synchronized (connectionBins)
+ p = connectionPools.get(poolKey);
+ if (p == null)
{
- cb = connectionBins.get(binName);
- if (cb == null)
- {
- cb = new ConnectionBin(binName);
- connectionBins.put(binName,cb);
- }
- //cb.sanityCheck();
+ // Construct a new IConnectionThrottler.
+ IConnectionThrottler connectionThrottler =
+ throttleGroups.obtainConnectionThrottler(webThrottleGroupType,throttleGroupName,binNames);
+ p = new ConnectionPool(connectionThrottler,protocol,server,port,authentication,baseFactory,
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ connectionPools.put(poolKey,p);
}
- bins[i] = cb;
- i++;
}
-
- ThrottledConnection connectionToReuse;
-
- long startTime = 0L;
- if (Logging.connectors.isDebugEnabled())
+
+ try
{
- startTime = System.currentTimeMillis();
- Logging.connectors.debug("WEB: Waiting to start getting a connection to "+protocol+"://"+server+":"+port);
+ return p.grab();
}
-
- synchronized (poolLock)
+ catch (InterruptedException e)
{
-
- // If the number of outstanding connections is greater than the global limit, close pooled connections until we are under the limit
- long idleTimeout = 64000L;
- while (true)
- {
- int openCount = 0;
-
- // Lock up everything for a moment
- synchronized (connectionBins)
- {
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- openCount += cb.countConnections();
- }
- }
-
- if (openCount < connectionLimit)
- break;
-
- if (idleTimeout == 0L)
- {
- // Can't actually conclude anything here unfortunately
-
- // Logging.connectors.warn("Web: Exceeding connection limit! Open count = "+Integer.toString(openCount)+"; limit = "+Integer.toString(connectionLimit));
- break;
- }
- idleTimeout = idleTimeout/4L;
-
- // Lock up everything for a moment, since otherwise we could delete something people
- // expect to stick around.
- synchronized (connectionBins)
- {
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- cb.flushIdleConnections(idleTimeout);
- }
- }
- }
-
- try
- {
- // Retry until we get the connection.
- while (true)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Attempting to get connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- i = 0;
-
- connectionToReuse = null;
-
- try
- {
-
- // Now, start looking for a connection
- while (i < binNames.length)
- {
- String binName = binNames[i];
- ConnectionBin cb = bins[i];
-
- // Figure out the connection limit for this bin, based on the throttle description
- int maxConnections = throttleDescription.getMaxOpenConnections(binName);
-
- // If no restriction, use a very large value.
- if (maxConnections == -1)
- maxConnections = Integer.MAX_VALUE;
- else if (maxConnections == 0)
- maxConnections = 1;
-
- // Now, do what we need to do to reserve our connection for this bin.
- // If we can't reserve it now, we plan on undoing everything we did, so
- // whatever we do must be reversible. Furthermore, nothing we call here
- // should actually wait(); that will occur if we can't get what we need out
- // here at this level.
-
- if (connectionToReuse != null)
- {
- // We have a reuse candidate already, so just make sure each remaining bin is within
- // its limits.
- cb.insureWithinLimits(maxConnections,connectionToReuse);
- }
- else
- {
- connectionToReuse = cb.findConnection(maxConnections,bins,protocol,server,port,authentication,trustStoreString,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
- }
-
- // Increment after we successfully handled this bin
- i++;
- }
-
- // That loop completed, meaning that we think we got a connection. Now, go through all the bins and make sure there's enough time since the last
- // fetch. If not, we have to clean everything up and try again.
- long currentTime = System.currentTimeMillis();
-
- // Global lock needed to insure that fetch time is updated across all bins simultaneously
- synchronized (connectionBins)
- {
- i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i];
- ConnectionBin cb = bins[i];
- //cb.sanityCheck();
- // Get the minimum time between fetches for this bin, based on the throttle description
- long minMillisecondsPerFetch = throttleDescription.getMinimumMillisecondsPerFetch(binName);
- if (cb.getLastFetchTime() + minMillisecondsPerFetch > currentTime)
- throw new WaitException(cb.getLastFetchTime() + minMillisecondsPerFetch - currentTime);
- i++;
- }
- i = 0;
- while (i < binNames.length)
- {
- ConnectionBin cb = bins[i++];
- cb.setLastFetchTime(currentTime);
- }
- }
-
- }
- catch (Throwable e)
- {
- // We have to free everything and retry, because otherwise we are subject to deadlock.
- // The only thing we have reserved is the connection, which we must free if there's a
- // problem.
-
- if (connectionToReuse != null)
- {
- // Return this connection to the pool. That is, the pools for all the bins.
- int k = 0;
- while (k < binNames.length)
- {
- String binName = binNames[k++];
- ConnectionBin cb;
- synchronized (connectionBins)
- {
- cb = connectionBins.get(binName);
- if (cb == null)
- {
- cb = new ConnectionBin(binName);
- connectionBins.put(binName,cb);
- }
- }
- //cb.sanityCheck();
- cb.addToPool(connectionToReuse);
- //cb.sanityCheck();
- }
- connectionToReuse = null;
- // We should not need to notify here because nothing has really changed from
- // when the attempt started to get the connection. We just undid what we did.
- }
-
-
- if (e instanceof Error)
- throw (Error)e;
- if (e instanceof ManifoldCFException)
- throw (ManifoldCFException)e;
-
- if (e instanceof WaitException)
- {
- // Wait because we need a certain amount of time after a previous fetch.
- WaitException we = (WaitException)e;
- long waitAmount = we.getWaitAmount();
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Waiting "+new Long(waitAmount).toString()+" ms before starting fetch on "+protocol+"://"+server+":"+port);
- // Really don't want to sleep inside the pool lock!
- // The easiest thing to do instead is to use a timed wait. There is no reason why we need
- // to wake before the wait time is exceeded - but it's harmless, and the alternative is to
- // do more reorganization than probably is wise.
- poolLock.wait(waitAmount);
- continue;
- }
-
- if (e instanceof PoolException)
- {
-
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Going into wait for connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- // Now, wait for something external to change. The only thing that can help us is if
- // some other thread frees a connection.
- poolLock.wait();
- // Go back around and try again.
- continue;
- }
-
- throw new ManifoldCFException("Unexpected exception encountered: "+e.getMessage(),e);
- }
-
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Successfully got connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- // If we have a connection located, activate it.
- if (connectionToReuse == null)
- connectionToReuse = new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
- connectionToReuse.setup(throttleDescription);
- return connectionToReuse;
- }
- }
- catch (InterruptedException e)
- {
- throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
- }
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
-
/** Flush connections that have timed out from inactivity. */
- public static void flushIdleConnections()
+ public static void flushIdleConnections(IThreadContext threadContext)
throws ManifoldCFException
{
- synchronized (poolLock)
+ // Go through outstanding connection pools and clean them up.
+ synchronized (connectionPools)
{
- // Lock up everything for a moment, since otherwise we could delete something people
- // expect to stick around.
- synchronized (connectionBins)
+ for (ConnectionPool pool : connectionPools.values())
{
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- if (cb.flushIdleConnections(60000L))
- {
- // Bin is no longer doing anything; get rid of it.
- // I've determined this is safe - inUseConnections is designed to prevent any active connection from getting
- // whacked.
- // Oops. Hang results again when I enabled this, so out it goes again.
- //connectionBins.remove(binName);
- //binIter = connectionBins.keySet().iterator();
- }
- }
+ pool.flushIdleConnections();
}
}
}
- /** Connection pool for a bin.
- * An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
- * NOTE WELL: This resource must be constrained globally, across all JVMs!
- * To do that, we need an ILockManager to handle the global data for each bin.
- */
- protected static class ConnectionBin
- {
- /** This is the bin name which this connection pool belongs to */
- protected String binName;
- /** This is the number of connections in this bin that are signed out and presumably in use */
- protected int inUseConnections = 0;
- /** This is the last time a fetch was done on this bin */
- protected long lastFetchTime = 0L;
- /** This object is what we synchronize on when we are waiting on a connection to free up for this
- * bin. This is a separate object, because we also want to protect the integrity of the
- * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
- protected Integer connectionWait = new Integer(0);
- /** This map contains ThrottledConnection objects that are in the pool, and are not in use. */
- protected HashMap freePool = new HashMap();
-
- /** Constructor. */
- public ConnectionBin(String binName)
- {
- this.binName = binName;
- }
-
- /** Get the bin name. */
- public String getBinName()
- {
- return binName;
- }
-
- /** Note the creation of an active connection that belongs to this bin. The slots all must
- * have been reserved prior to the connection being created.
- */
- public synchronized void noteConnectionCreation()
- {
- inUseConnections++;
- }
-
- /** Note the destruction of an active connection that belongs to this bin.
- */
- public synchronized void noteConnectionDestruction()
- {
- inUseConnections--;
- }
-
-
- /** Activate a connection that should be in the pool.
- * Removes the connection from the pool.
- */
- public synchronized void takeFromPool(ThrottledConnection tc)
- {
- // Remove this connection from the pool list
- freePool.remove(tc);
- inUseConnections++;
- }
-
- /** Put a connection into the pool.
- */
- public synchronized void addToPool(ThrottledConnection tc)
- {
- // Add this connection to the pool list
- freePool.put(tc,tc);
- inUseConnections--;
- }
-
- /** Verify that this bin is within limits.
- */
- public synchronized void insureWithinLimits(int maxConnections, ThrottledConnection existingConnection)
- throws PoolException
- {
- //sanityCheck();
-
- // See if the connection is in fact within the pool; if so, we just presume the limits are fine as they are.
- // This is necessary because if the connection that's being checked for is freed, then we wreck the data structures.
- if (existsInPool(existingConnection))
- return;
-
- while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
- {
- //sanityCheck();
-
- // If there are any pool connections, free them one at a time
- ThrottledConnection freeMe = getPoolConnection();
- if (freeMe != null)
- {
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- freeMe.activate();
- freeMe.destroy();
- continue;
- }
-
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
- }
- }
-
- /** This method is called only when there is no existing connection yet identified that can be used
- * for contacting the server and port specified. This method returns a connection if a matching one can be found;
- * otherwise it returns null.
- * If a matching connection is found, it is activated before it is returned. That removes the connection from all
- * pools in which it lives.
- */
- public synchronized ThrottledConnection findConnection(int maxConnections,
- ConnectionBin[] binNames, String protocol, String server, int port,
- PageCredentials authentication, String trustStoreString,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- throws PoolException
- {
- //sanityCheck();
-
- // First, wait until there's no excess.
- while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
- {
- //sanityCheck();
- // If there are any pool connections, free them one at a time
- ThrottledConnection freeMe = getPoolConnection();
- if (freeMe != null)
- {
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- freeMe.activate();
- freeMe.destroy();
- continue;
- }
-
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
-
- }
-
- // Wait until there's a free one
- if (maxConnections > 0 && inUseConnections > maxConnections-1)
- {
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
- }
-
- // A null return means that there is no existing pooled connection that matches, and the caller is free to create a new connection
- ThrottledConnection rval = getPoolConnection();
- if (rval == null)
- return null;
-
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- rval.activate();
- //sanityCheck();
-
- if (!rval.matches(binNames,protocol,server,port,authentication,trustStoreString,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword))
- {
- // Destroy old connection. That should free up space for a new creation.
- rval.destroy();
- // Return null to indicate that we can create a new connection now
- return null;
- }
-
- // Existing entry matched. Activate and return it.
- return rval;
- }
-
- /** Note a new time for connection fetch for this pool.
- *@param currentTime is the time the fetch was started.
- */
- public synchronized void setLastFetchTime(long currentTime)
- {
- if (currentTime > lastFetchTime)
- lastFetchTime = currentTime;
- }
-
- /** Get the last fetch time.
- *@return the time.
- */
- public synchronized long getLastFetchTime()
- {
- return lastFetchTime;
- }
-
- /** Count connections that are in use.
- *@return connections that are in use.
- */
- public synchronized int countConnections()
- {
- return freePool.size() + inUseConnections;
- }
-
- /** Flush any idle connections.
- *@return true if the connection bin is now, in fact, empty.
- */
- public synchronized boolean flushIdleConnections(long idleTimeout)
- {
- //sanityCheck();
-
- // We have to time out the pool connections. When there are no pool connections
- // left, AND the in-use counts are zero, we can delete the whole thing.
- Iterator iter = freePool.keySet().iterator();
- while (iter.hasNext())
- {
- ThrottledConnection tc = (ThrottledConnection)iter.next();
- if (tc.flushIdleConnections(idleTimeout))
- {
- // Can delete this connection, since it timed out.
- tc.activate();
- tc.destroy();
- iter = freePool.keySet().iterator();
- }
- }
-
- //sanityCheck();
-
- return (freePool.size() == 0 && inUseConnections == 0);
- }
-
- /** Grab a connection from the current pool. This does not remove the connection from the pool;
- * it just sets it up so that later methods can do that.
- */
- protected ThrottledConnection getPoolConnection()
- {
- if (freePool.size() == 0)
- return null;
- Iterator iter = freePool.keySet().iterator();
- ThrottledConnection rval = (ThrottledConnection)iter.next();
- return rval;
- }
-
- /** Check if a connection exists in the pool already.
- */
- protected boolean existsInPool(ThrottledConnection tc)
- {
- return freePool.get(tc) != null;
- }
-
- public synchronized void sanityCheck()
- {
- // Make sure all the connections in the current pool in fact have a reference to this bin.
- Iterator iter = freePool.keySet().iterator();
- while (iter.hasNext())
- {
- ThrottledConnection tc = (ThrottledConnection)iter.next();
- tc.mustHaveReference(this);
- }
- }
-
- }
-
- /** Throttles for a bin.
- * An instance of this class keeps track of the information needed to bandwidth throttle access
- * to a url belonging to a specific bin.
- *
- * In order to calculate
- * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
- * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
- * window length is too long.
- *
- * One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
- * "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
- * smallest intervals.
- *
- * Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
- * at the fastest possible rate without long pauses spent doing something else. The only complication is that
- * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
- * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
- * than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
- * acceptable.
- *
- * Some notes on the algorithms used to limit server bandwidth impact
- * ==================================================================
- *
- * In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
- * the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
- * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
- * access as a way of estimating how long it will take to fetch those next n bytes.
- *
- * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
- * and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
- *
- * 1) The first chunk of any series should proceed without interference from other connections to the same server.
- * The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
- *
- * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
- * connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
- * took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
- * using less server bandwidth.
- *
- * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
- * new chunks from other connections can start.
- *
- * NOTE WELL: This resource must be constrained globally, across all JVMs!
- * To do that, we need an ILockManager to handle the global data for each bin.
- */
- protected static class ThrottleBin
- {
- /** This is the bin name which this throttle belongs to. */
- protected final String binName;
- /** This is the reference count for this bin (which records active references) */
- protected volatile int refCount = 0;
- /** The inverse rate estimate of the first fetch, in ms/byte */
- protected double rateEstimate = 0.0;
- /** Flag indicating whether a rate estimate is needed */
- protected volatile boolean estimateValid = false;
- /** Flag indicating whether rate estimation is in progress yet */
- protected volatile boolean estimateInProgress = false;
- /** The start time of this series */
- protected long seriesStartTime = -1L;
- /** Total actual bytes read in this series; this includes fetches in progress */
- protected long totalBytesRead = -1L;
-
- /** Constructor. */
- public ThrottleBin(String binName)
- {
- this.binName = binName;
- }
-
- /** Get the bin name. */
- public String getBinName()
- {
- return binName;
- }
-
- /** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
- * May wait until schedule allows.
- */
- public void beginFetch()
- throws InterruptedException
- {
- synchronized (this)
- {
- if (refCount == 0)
- {
- // Now, reset bandwidth throttling counters
- estimateValid = false;
- rateEstimate = 0.0;
- totalBytesRead = 0L;
- estimateInProgress = false;
- seriesStartTime = -1L;
- }
- refCount++;
- }
-
- }
-
- /** Abort the fetch.
- */
- public void abortFetch()
- {
- synchronized (this)
- {
- refCount--;
- }
- }
-
- /** Note the start of an individual byte read of a specified size. Call this method just before the
- * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
- */
- public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
- throws InterruptedException
- {
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- while (estimateInProgress)
- wait();
- if (estimateValid == false)
- {
- seriesStartTime = currentTime;
- estimateInProgress = true;
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
- // Exit early; this thread isn't going to do any waiting
- return;
- }
- }
-
- // It is possible for the following code to get interrupted. If that happens,
- // we have to unstick the threads that are waiting on the estimate!
- boolean finished = false;
- try
- {
- long waitTime = 0L;
- synchronized (this)
- {
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
-
- // Estimate the time this read will take, and wait accordingly
- long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
- // Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
- // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
- // current time. But it can't be negative.
- waitTime = (desiredEndTime - estimatedTime) - currentTime;
- }
-
- if (waitTime > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Performing a read wait on bin '"+binName+"' of "+
- new Long(waitTime).toString()+" ms.");
- ManifoldCF.sleep(waitTime);
- }
- finished = true;
- }
- finally
- {
- if (!finished)
- {
- abortRead();
- }
- }
- }
-
- /** Abort a read in progress.
- */
- public void abortRead()
- {
- synchronized (this)
- {
- if (estimateInProgress)
- {
- estimateInProgress = false;
- notifyAll();
- }
- }
- }
-
- /** Note the end of an individual read from the server. Call this just after an individual read completes.
- * Pass the actual number of bytes read to the method.
- */
- public void endRead(int originalCount, int actualCount)
- {
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
- if (estimateInProgress)
- {
- if (actualCount == 0)
- // Didn't actually get any bytes, so use 0.0
- rateEstimate = 0.0;
- else
- rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
- estimateValid = true;
- estimateInProgress = false;
- notifyAll();
- }
- }
- }
-
- /** Note the end of a fetch operation. Call this method just after the fetch completes.
- */
- public boolean endFetch()
- {
- synchronized (this)
- {
- refCount--;
- return (refCount == 0);
- }
-
- }
-
- }
-
/** Throttled connections. Each instance of a connection describes the bins to which it belongs,
* along with the actual open connection itself, and the last time the connection was used. */
protected static class ThrottledConnection implements IThrottledConnection
{
- /** The connection has resolved pointers to the ConnectionBin structures that manage pool
- * maximums. These are ONLY valid when the connection is actually in the pool. */
- protected ConnectionBin[] connectionBinArray;
- /** The connection has resolved pointers to the ThrottleBin structures that help manage
- * bandwidth throttling. */
- protected ThrottleBin[] throttleBinArray;
- /** These are the bandwidth limits, per bin */
- protected double[] minMillisecondsPerByte;
- /** Is the connection considered "active"? */
- protected boolean isActive;
- /** If not active, this is when it went inactive */
- protected long inactiveTime = 0L;
-
+ /** Connection pool */
+ protected final ConnectionPool myPool;
+ /** Fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Protocol */
- protected String protocol;
+ protected final String protocol;
/** Server */
- protected String server;
+ protected final String server;
/** Port */
- protected int port;
+ protected final int port;
/** Authentication */
- protected PageCredentials authentication;
- /** Trust store */
- protected IKeystoreManager trustStore;
- /** Trust store string */
- protected String trustStoreString;
+ protected final PageCredentials authentication;
+
+ /** This is when the connection will expire. Only valid if connection is in the pool. */
+ protected long expireTime = -1L;
/** The http connection manager. The pool is of size 1. */
protected PoolingClientConnectionManager connManager = null;
@@ -1003,10 +294,13 @@
/** Constructor. Create a connection with a specific server and port, and
* register it as active against all bins. */
- public ThrottledConnection(String protocol, String server, int port, PageCredentials authentication,
- javax.net.ssl.SSLSocketFactory httpsSocketFactory, String trustStoreString, ConnectionBin[] connectionBins,
+ public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler,
+ String protocol, String server, int port, PageCredentials authentication,
+ javax.net.ssl.SSLSocketFactory httpsSocketFactory,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
{
+ this.myPool = myPool;
+ this.fetchThrottler = fetchThrottler;
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
this.proxyAuthDomain = proxyAuthDomain;
@@ -1017,168 +311,21 @@
this.port = port;
this.authentication = authentication;
this.httpsSocketFactory = httpsSocketFactory;
- this.trustStoreString = trustStoreString;
- this.connectionBinArray = connectionBins;
- this.throttleBinArray = new ThrottleBin[connectionBins.length];
- this.minMillisecondsPerByte = new double[connectionBins.length];
- this.isActive = true;
- int i = 0;
- while (i < connectionBins.length)
- {
- connectionBins[i].noteConnectionCreation();
- // We don't keep throttle bin references around, since these are transient
- throttleBinArray[i] = null;
- minMillisecondsPerByte[i] = 0.0;
- i++;
- }
-
-
}
- public void mustHaveReference(ConnectionBin cb)
- {
- int i = 0;
- while (i < connectionBinArray.length)
- {
- if (cb == connectionBinArray[i])
- return;
- i++;
- }
- String msg = "Connection bin "+cb.toString()+" owns connection "+this.toString()+" for "+protocol+server+":"+port+
- " but there is no back reference!";
- Logging.connectors.error(msg);
- System.out.println(msg);
- new Exception(msg).printStackTrace();
- System.exit(3);
- //throw new RuntimeException(msg);
- }
-
- /** See if this instances matches a given server and port. */
- public boolean matches(ConnectionBin[] bins, String protocol, String server, int port, PageCredentials authentication,
- String trustStoreString, String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- {
- if (this.trustStoreString == null || trustStoreString == null)
- {
- if (this.trustStoreString != trustStoreString)
- return false;
- }
- else
- {
- if (!this.trustStoreString.equals(trustStoreString))
- return false;
- }
-
- if (this.authentication == null || authentication == null)
- {
- if (this.authentication != authentication)
- return false;
- }
- else
- {
- if (!this.authentication.equals(authentication))
- return false;
- }
-
- if (this.proxyHost == null || proxyHost == null)
- {
- if (this.proxyHost != proxyHost)
- return false;
- }
- else
- {
- if (!this.proxyHost.equals(proxyHost))
- return false;
- if (this.proxyAuthDomain == null || proxyAuthDomain == null)
- {
- if (this.proxyAuthDomain != proxyAuthDomain)
- return false;
- }
- else
- {
- if (!this.proxyAuthDomain.equals(proxyAuthDomain))
- return false;
- }
- if (this.proxyAuthUsername == null || proxyAuthUsername == null)
- {
- if (this.proxyAuthUsername != proxyAuthUsername)
- return false;
- }
- else
- {
- if (!this.proxyAuthUsername.equals(proxyAuthUsername))
- return false;
- }
- if (this.proxyAuthPassword == null || proxyAuthPassword == null)
- {
- if (this.proxyAuthPassword != proxyAuthPassword)
- return false;
- }
- else
- {
- if (!this.proxyAuthPassword.equals(proxyAuthPassword))
- return false;
- }
- }
-
- if (this.proxyPort != proxyPort)
- return false;
-
-
- if (this.connectionBinArray.length != bins.length || !this.protocol.equals(protocol) || !this.server.equals(server) || this.port != port)
- return false;
-
- int i = 0;
- while (i < bins.length)
- {
- if (connectionBinArray[i] != bins[i])
- return false;
- i++;
- }
- return true;
- }
-
- /** Activate the connection. */
- public void activate()
- {
- isActive = true;
- int i = 0;
- while (i < connectionBinArray.length)
- {
- connectionBinArray[i++].takeFromPool(this);
- }
- }
-
- /** Set up the connection. This allows us to feed all bins the correct bandwidth limit info.
+ /** Check whether the connection has expired.
+ *@param currentTime is the current time to use to judge if a connection has expired.
+ *@return true if the connection has expired, and should be closed.
*/
- public void setup(ThrottleDescription description)
+ @Override
+ public boolean hasExpired(long currentTime)
{
- // Go through all bins, and set up the current limits.
- int i = 0;
- while (i < connectionBinArray.length)
- {
- String binName = connectionBinArray[i].getBinName();
- minMillisecondsPerByte[i] = description.getMinimumMillisecondsPerByte(binName);
- i++;
- }
- }
-
- /** Do periodic bookkeeping.
- *@return true if the connection is no longer valid, and can be removed. */
- public boolean flushIdleConnections(long idleTimeout)
- {
- if (isActive)
- return false;
-
if (connManager != null)
{
connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS);
connManager.closeExpiredConnections();
- // Need to determine if there's a valid connection in the connection manager still, or if it is empty.
- //return connManager.getConnectionsInPool() == 0;
- return true;
}
- else
- return true;
+ return (currentTime > expireTime);
}
/** Log the fetch of a number of bytes, from within a stream. */
@@ -1187,65 +334,10 @@
fetchCounter += (long)count;
}
- /** Begin a read operation, from within a stream */
- public void beginRead(int len)
- throws InterruptedException
- {
- // Consult with throttle bins
- int lastOneDone = 0;
- try
- {
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- throttleBinArray[i].beginRead(len,minMillisecondsPerByte[i]);
- lastOneDone = i + 1;
- }
- }
- finally
- {
- if (lastOneDone != throttleBinArray.length)
- {
- for (int i = 0; i < lastOneDone; i++)
- {
- throttleBinArray[i].abortRead();
- }
- }
- }
- }
-
- /** End a read operation, from within a stream */
- public void endRead(int origLen, int actualAmt)
- {
- // Consult with throttle bins
- Throwable e = null;
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- try
- {
- throttleBinArray[i].endRead(origLen,actualAmt);
- }
- catch (Throwable e2)
- {
- e = e2;
- }
- }
- if (e != null)
- {
- if (e instanceof RuntimeException)
- throw (RuntimeException)e;
- else if (e instanceof Error)
- throw (Error)e;
- else
- throw new RuntimeException("Unknown exception: " + e.getMessage(),e);
- }
- }
-
/** Destroy the connection forever */
- protected void destroy()
+ @Override
+ public void destroy()
{
- if (isActive == false)
- throw new RuntimeException("Trying to destroy an inactive connection");
-
// Kill the actual connection object.
if (connManager != null)
{
@@ -1253,13 +345,6 @@
connManager = null;
}
- // Call all the bins this belongs to, and decrement the in-use count.
- int i = 0;
- while (i < connectionBinArray.length)
- {
- ConnectionBin cb = connectionBinArray[i++];
- cb.noteConnectionDestruction();
- }
}
@@ -1273,43 +358,15 @@
{
this.fetchType = fetchType;
this.fetchCounter = 0L;
- int lastCreated = 0;
try
{
- // Find or create the needed throttle bins
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- // Access the bins as we need them, and drop them when ref count goes to zero
- String binName = connectionBinArray[i].getBinName();
- ThrottleBin tb;
- synchronized (throttleBins)
- {
- tb = throttleBins.get(binName);
- if (tb == null)
- {
- tb = new ThrottleBin(binName);
- throttleBins.put(binName,tb);
- }
- tb.beginFetch();
- }
- throttleBinArray[i] = tb;
- lastCreated = i + 1;
- }
+ if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()");
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
- finally
- {
- if (lastCreated != throttleBinArray.length)
- {
- for (int i = 0; i < lastCreated; i++)
- {
- throttleBinArray[i].abortFetch();
- }
- }
- }
}
/** Execute the fetch and get the return code. This method uses the
@@ -1600,7 +657,7 @@
//httpClient.setCookieStore(cookieStore);
// Create the thread
- methodThread = new ExecuteMethodThread(this, httpClient, fetchMethod, cookieStore);
+ methodThread = new ExecuteMethodThread(this, fetchThrottler, httpClient, fetchMethod, cookieStore);
try
{
methodThread.start();
@@ -1893,17 +950,6 @@
methodThread.abort();
long endTime = System.currentTimeMillis();
- int i = 0;
- while (i < throttleBinArray.length)
- {
- synchronized (throttleBins)
- {
- if (throttleBinArray[i].endFetch())
- throttleBins.remove(throttleBinArray[i].getBinName());
- }
- throttleBinArray[i] = null;
- i++;
- }
activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH,
new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -1945,44 +991,13 @@
}
- /** Close the connection. Call this to end this server connection.
+ /** Close the connection. Call this to return the connection to its pool.
*/
@Override
public void close()
- throws ManifoldCFException
{
- synchronized (poolLock)
- {
- // Verify that all the connections that exist are in fact sane
- synchronized (connectionBins)
- {
- for (String connectionName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(connectionName);
- //cb.sanityCheck();
- }
- }
-
- // Leave the connection alive, but mark it as inactive, and return it to the appropriate pools.
- isActive = false;
- inactiveTime = System.currentTimeMillis();
- int i = 0;
- while (i < connectionBinArray.length)
- {
- connectionBinArray[i++].addToPool(this);
- }
- // Verify that all the connections that exist are in fact sane
- synchronized (connectionBins)
- {
- for (String connectionName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(connectionName);
- //cb.sanityCheck();
- }
- }
- // Wake up everything waiting on the pool lock
- poolLock.notifyAll();
- }
+ expireTime = System.currentTimeMillis() + idleTimeout;
+ myPool.release(this);
}
protected void handleHTTPException(HttpException e, String activity)
@@ -2048,17 +1063,18 @@
*/
protected static class ThrottledInputstream extends InputStream
{
- /** Stream throttling parameters */
- protected double minimumMillisecondsPerBytePerServer;
+ /** Stream throttler */
+ protected final IStreamThrottler streamThrottler;
/** The throttled connection we belong to */
- protected ThrottledConnection throttledConnection;
+ protected final ThrottledConnection throttledConnection;
/** The stream we are wrapping. */
- protected InputStream inputStream;
+ protected final InputStream inputStream;
/** Constructor.
*/
- public ThrottledInputstream(ThrottledConnection connection, InputStream is)
+ public ThrottledInputstream(IStreamThrottler streamThrottler, ThrottledConnection connection, InputStream is)
{
+ this.streamThrottler = streamThrottler;
this.throttledConnection = connection;
this.inputStream = is;
}
@@ -2126,7 +1142,8 @@
{
try
{
- throttledConnection.beginRead(len);
+ if (streamThrottler.obtainReadPermission(len) == false)
+ throw new IllegalStateException("Unexpected result calling obtainReadPermission()");
int amt = 0;
try
{
@@ -2136,10 +1153,10 @@
finally
{
if (amt == -1)
- throttledConnection.endRead(len,0);
+ streamThrottler.releaseReadPermission(len,0);
else
{
- throttledConnection.endRead(len,amt);
+ streamThrottler.releaseReadPermission(len,amt);
throttledConnection.logFetchCount(amt);
}
}
@@ -2226,6 +1243,10 @@
{
Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e);
}
+ finally
+ {
+ streamThrottler.closeStream();
+ }
}
}
@@ -2298,6 +1319,8 @@
{
/** The connection */
protected final ThrottledConnection theConnection;
+ /** The fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Client and method, all preconfigured */
protected final AbstractHttpClient httpClient;
protected final HttpRequestBase executeMethod;
@@ -2317,12 +1340,13 @@
protected Throwable generalException = null;
- public ExecuteMethodThread(ThrottledConnection theConnection,
+ public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
AbstractHttpClient httpClient, HttpRequestBase executeMethod, CookieStore cookieStore)
{
super();
setDaemon(true);
this.theConnection = theConnection;
+ this.fetchThrottler = fetchThrottler;
this.httpClient = httpClient;
this.executeMethod = executeMethod;
this.cookieStore = cookieStore;
@@ -2419,7 +1443,7 @@
bodyStream = response.getEntity().getContent();
if (bodyStream != null)
{
- bodyStream = new ThrottledInputstream(theConnection,bodyStream);
+ bodyStream = new ThrottledInputstream(fetchThrottler.createFetchStream(),theConnection,bodyStream);
if (gzip)
bodyStream = new GZIPInputStream(bodyStream);
else if (deflate)
@@ -2741,4 +1765,254 @@
}
+ /** Connection pool key */
+ protected static class ConnectionPoolKey
+ {
+ protected final String protocol;
+ protected final String server;
+ protected final int port;
+ protected final PageCredentials authentication;
+ protected final String trustStoreString;
+ protected final String proxyHost;
+ protected final int proxyPort;
+ protected final String proxyAuthDomain;
+ protected final String proxyAuthUsername;
+ protected final String proxyAuthPassword;
+
+ public ConnectionPoolKey(String protocol,
+ String server, int port, PageCredentials authentication,
+ String trustStoreString, String proxyHost, int proxyPort,
+ String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+ {
+ this.protocol = protocol;
+ this.server = server;
+ this.port = port;
+ this.authentication = authentication;
+ this.trustStoreString = trustStoreString;
+ this.proxyHost = proxyHost;
+ this.proxyPort = proxyPort;
+ this.proxyAuthDomain = proxyAuthDomain;
+ this.proxyAuthUsername = proxyAuthUsername;
+ this.proxyAuthPassword = proxyAuthPassword;
+ }
+
+ public int hashCode()
+ {
+ return protocol.hashCode() +
+ server.hashCode() +
+ (port * 31) +
+ ((authentication==null)?0:authentication.hashCode()) +
+ ((trustStoreString==null)?0:trustStoreString.hashCode()) +
+ ((proxyHost==null)?0:proxyHost.hashCode()) +
+ (proxyPort * 29) +
+ ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) +
+ ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) +
+ ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ConnectionPoolKey))
+ return false;
+ ConnectionPoolKey other = (ConnectionPoolKey)o;
+ if (!server.equals(other.server) ||
+ port != other.port)
+ return false;
+ if (authentication == null || other.authentication == null)
+ {
+ if (authentication != other.authentication)
+ return false;
+ }
+ else
+ {
+ if (!authentication.equals(other.authentication))
+ return false;
+ }
+ if (trustStoreString == null || other.trustStoreString == null)
+ {
+ if (trustStoreString != other.trustStoreString)
+ return false;
+ }
+ else
+ {
+ if (!trustStoreString.equals(other.trustStoreString))
+ return false;
+ }
+ if (proxyHost == null || other.proxyHost == null)
+ {
+ if (proxyHost != other.proxyHost)
+ return false;
+ }
+ else
+ {
+ if (!proxyHost.equals(other.proxyHost))
+ return false;
+ }
+ if (proxyPort != other.proxyPort)
+ return false;
+ if (proxyAuthDomain == null || other.proxyAuthDomain == null)
+ {
+ if (proxyAuthDomain != other.proxyAuthDomain)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthDomain.equals(other.proxyAuthDomain))
+ return false;
+ }
+ if (proxyAuthUsername == null || other.proxyAuthUsername == null)
+ {
+ if (proxyAuthUsername != other.proxyAuthUsername)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthUsername.equals(other.proxyAuthUsername))
+ return false;
+ }
+ if (proxyAuthPassword == null || other.proxyAuthPassword == null)
+ {
+ if (proxyAuthPassword != other.proxyAuthPassword)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthPassword.equals(other.proxyAuthPassword))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /** Each connection pool has identical connections we can draw on.
+ */
+ protected static class ConnectionPool
+ {
+ /** Throttler */
+ protected final IConnectionThrottler connectionThrottler;
+
+ // If we need to create a connection, these are what we use
+
+ protected final String protocol;
+ protected final String server;
+ protected final int port;
+ protected final PageCredentials authentication;
+ protected final javax.net.ssl.SSLSocketFactory baseFactory;
+ protected final String proxyHost;
+ protected final int proxyPort;
+ protected final String proxyAuthDomain;
+ protected final String proxyAuthUsername;
+ protected final String proxyAuthPassword;
+
+ /** The actual pool of connections */
+ protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>();
+
+ public ConnectionPool(IConnectionThrottler connectionThrottler,
+ String protocol,
+ String server, int port, PageCredentials authentication,
+ javax.net.ssl.SSLSocketFactory baseFactory,
+ String proxyHost, int proxyPort,
+ String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+ {
+ this.connectionThrottler = connectionThrottler;
+
+ this.protocol = protocol;
+ this.server = server;
+ this.port = port;
+ this.authentication = authentication;
+ this.baseFactory = baseFactory;
+ this.proxyHost = proxyHost;
+ this.proxyPort = proxyPort;
+ this.proxyAuthDomain = proxyAuthDomain;
+ this.proxyAuthUsername = proxyAuthUsername;
+ this.proxyAuthPassword = proxyAuthPassword;
+ }
+
+ public IThrottledConnection grab()
+ throws InterruptedException
+ {
+ // Wait for a connection
+ int result = connectionThrottler.waitConnectionAvailable();
+ if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+ {
+ // We are guaranteed to have a connection in the pool, unless there's a coding error.
+ synchronized (connections)
+ {
+ return connections.remove(connections.size()-1);
+ }
+ }
+ else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
+ protocol,server,port,authentication,baseFactory,
+ proxyHost,proxyPort,
+ proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ }
+ else
+ throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+ }
+
+ public void release(IThrottledConnection connection)
+ {
+ if (connectionThrottler.noteReturnedConnection())
+ {
+ // Destroy this connection
+ connection.destroy();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ else
+ {
+ // Return to pool
+ synchronized (connections)
+ {
+ connections.add(connection);
+ }
+ connectionThrottler.noteConnectionReturnedToPool();
+ }
+ }
+
+ public void flushIdleConnections()
+ {
+ long currentTime = System.currentTimeMillis();
+ // First, remove connections that are over the quota
+ while (connectionThrottler.checkDestroyPooledConnection())
+ {
+ // Destroy the oldest ones first
+ IThrottledConnection connection;
+ synchronized (connections)
+ {
+ connection = connections.remove(0);
+ }
+ connection.close();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ // Now, get rid of expired connections
+ while (true)
+ {
+ boolean expired;
+ synchronized (connections)
+ {
+ expired = connections.size() > 0 && connections.get(0).hasExpired(currentTime);
+ }
+ if (!expired)
+ break;
+ // We found an expired connection! Now tell the throttler that, and see if it agrees.
+ if (connectionThrottler.checkExpireConnection())
+ {
+ // Remove a connection from the pool, and destroy it.
+ // It's not guaranteed to be an expired one, but that's a rare occurrence, we expect.
+ IThrottledConnection connection;
+ synchronized (connections)
+ {
+ connection = connections.remove(0);
+ }
+ connection.destroy();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ else
+ break;
+ }
+ }
+
+ }
}
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
index b358da5..64c78d9 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
@@ -160,6 +160,8 @@
protected int connectionTimeoutMilliseconds = 60000;
/** Socket timeout, milliseconds */
protected int socketTimeoutMilliseconds = 300000;
+ /** Throttle group name */
+ protected String throttleGroupName = null;
// Canonicalization enabling/disabling. Eventually this will probably need to be by regular expression.
@@ -354,6 +356,9 @@
{
String x;
+ // Either set this from the connection name, or just have one. Right now, we have one.
+ String throttleGroupName = "";
+
String emailAddress = params.getParameter(WebcrawlerConfig.PARAMETER_EMAIL);
if (emailAddress == null)
throw new ManifoldCFException("Missing email address");
@@ -406,7 +411,7 @@
public void poll()
throws ManifoldCFException
{
- ThrottledFetcher.flushIdleConnections();
+ ThrottledFetcher.flushIdleConnections(currentContext);
}
/** Check status of connection.
@@ -425,6 +430,7 @@
public void disconnect()
throws ManifoldCFException
{
+ throttleGroupName = null;
throttleDescription = null;
credentialsDescription = null;
trustsDescription = null;
@@ -711,7 +717,9 @@
// Prepare to perform the fetch, and decide what to do with the document.
//
- IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,ipAddress,port,
+ IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,
+ throttleGroupName,
+ protocol,ipAddress,port,
credential,trustStore,throttleDescription,binNames,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
@@ -5126,7 +5134,8 @@
// We've successfully obtained a lock on reading robots for this server! Now, guarantee that we'll free it, by instantiating a try/finally
try
{
- IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,hostIPAddress,port,credential,
+ IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,throttleGroupName,
+ protocol,hostIPAddress,port,credential,
trustStore,throttleDescription,binNames,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
diff --git a/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java b/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
index 4b495a6..531c3d0 100644
--- a/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
+++ b/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
@@ -55,6 +55,8 @@
ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
// Get the output connector pool handle
IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+ // Throttler subsystem
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
/* For HSQLDB debugging...
IDBInterface database = DBInterfaceFactory.make(threadContext,
@@ -88,6 +90,9 @@
// Do the cleanup
outputConnectorPool.pollAllConnectors();
+ // Poll connection bins
+ throttleGroups.poll();
+ // Expire objects
cacheManager.expireObjects(System.currentTimeMillis());
// Sleep for the retry interval.
diff --git a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java
new file mode 100644
index 0000000..ad99a7e
--- /dev/null
+++ b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.apiservice;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors. The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Constructor.
+ */
+ public IdleCleanupThread()
+ throws ManifoldCFException
+ {
+ super();
+ setName("Idle cleanup thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ Logging.root.debug("Start up idle cleanup thread");
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ // Get the cache handle.
+ ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+
+ IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+ IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+ IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+ IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ // Do the cleanup
+ repositoryConnectorPool.pollAllConnectors();
+ outputConnectorPool.pollAllConnectors();
+ authorityConnectorPool.pollAllConnectors();
+ mappingConnectorPool.pollAllConnectors();
+
+ throttleGroups.poll();
+
+ cacheManager.expireObjects(System.currentTimeMillis());
+
+ // Sleep for the retry interval.
+ ManifoldCF.sleep(5000L);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ {
+ Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ManifoldCF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // Log it, but keep the thread alive
+ Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+ if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+ {
+ // Shut the whole system down!
+ System.exit(1);
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("API service ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("API service could not start - shutting down");
+ Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+
+ }
+
+}
diff --git a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
index 1916cfa..cfe250e 100644
--- a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
+++ b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
@@ -29,11 +29,16 @@
{
public static final String _rcsid = "@(#)$Id$";
+ protected IdleCleanupThread idleCleanupThread = null;
+
public void contextInitialized(ServletContextEvent sce)
{
try
{
- ManifoldCF.initializeEnvironment(ThreadContextFactory.make());
+ IThreadContext threadContext = ThreadContextFactory.make();
+ ManifoldCF.initializeEnvironment(threadContext);
+ idleCleanupThread = new IdleCleanupThread();
+ idleCleanupThread.start();
}
catch (ManifoldCFException e)
{
@@ -43,7 +48,21 @@
public void contextDestroyed(ServletContextEvent sce)
{
- ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+ try
+ {
+ while (true)
+ {
+ if (idleCleanupThread == null)
+ break;
+ idleCleanupThread.interrupt();
+ if (!idleCleanupThread.isAlive())
+ idleCleanupThread = null;
+ }
+ }
+ finally
+ {
+ ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+ }
}
}
diff --git a/framework/build.xml b/framework/build.xml
index 4b17af1..262d99c 100644
--- a/framework/build.xml
+++ b/framework/build.xml
@@ -1497,6 +1497,7 @@
<test name="org.apache.manifoldcf.core.common.DateTest" todir="test-output"/>
<test name="org.apache.manifoldcf.core.fuzzyml.TestFuzzyML" todir="test-output"/>
<test name="org.apache.manifoldcf.core.lockmanager.TestZooKeeperLocks" todir="test-output"/>
+ <test name="org.apache.manifoldcf.core.throttler.TestThrottler" todir="test-output"/>
</junit>
</target>
diff --git a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java
new file mode 100644
index 0000000..8e1d50d
--- /dev/null
+++ b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.combinedservice;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors. The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Constructor.
+ */
+ public IdleCleanupThread()
+ throws ManifoldCFException
+ {
+ super();
+ setName("Idle cleanup thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ Logging.root.debug("Start up idle cleanup thread");
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ // Get the cache handle.
+ ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+
+ IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+ IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+ IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+ IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ // Do the cleanup
+ repositoryConnectorPool.pollAllConnectors();
+ outputConnectorPool.pollAllConnectors();
+ authorityConnectorPool.pollAllConnectors();
+ mappingConnectorPool.pollAllConnectors();
+
+ throttleGroups.poll();
+
+ cacheManager.expireObjects(System.currentTimeMillis());
+
+ // Sleep for the retry interval.
+ ManifoldCF.sleep(5000L);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ {
+ Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ManifoldCF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // Log it, but keep the thread alive
+ Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+ if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+ {
+ // Shut the whole system down!
+ System.exit(1);
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("Combined service ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("Combined service could not start - shutting down");
+ Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+
+ }
+
+}
diff --git a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
index b7b5300..b091414 100644
--- a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
+++ b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
@@ -31,6 +31,7 @@
public static final String _rcsid = "@(#)$Id$";
protected static AgentsThread agentsThread = null;
+ protected IdleCleanupThread idleCleanupThread = null;
public void contextInitialized(ServletContextEvent sce)
{
@@ -44,8 +45,14 @@
ManifoldCF.registerThisAgent(tc);
ManifoldCF.reregisterAllConnectors(tc);
+ // This is for the UI and API components
+ idleCleanupThread = new IdleCleanupThread();
+ idleCleanupThread.start();
+
+ // This is for the agents process
agentsThread = new AgentsThread(ManifoldCF.getProcessID());
agentsThread.start();
+
}
catch (ManifoldCFException e)
{
@@ -65,6 +72,15 @@
agentsThread = null;
AgentsDaemon.clearAgentsShutdownSignal(tc);
}
+
+ while (true)
+ {
+ if (idleCleanupThread == null)
+ break;
+ idleCleanupThread.interrupt();
+ if (!idleCleanupThread.isAlive())
+ idleCleanupThread = null;
+ }
}
catch (InterruptedException e)
{
diff --git a/framework/core/pom.xml b/framework/core/pom.xml
index 1c365a4..985e2f5 100644
--- a/framework/core/pom.xml
+++ b/framework/core/pom.xml
@@ -92,5 +92,23 @@
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
+ <dependency>
+ <groupId>postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${postgresql.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>${hsqldb.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java b/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
index 19e8ca7..27037be 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
@@ -572,6 +572,8 @@
// Compute MaximumTarget
SumClass sumClass = new SumClass(serviceName);
lockManager.scanServiceData(serviceTypeName, sumClass);
+ //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+
int numServices = sumClass.getNumServices();
if (numServices == 0)
return;
@@ -614,11 +616,13 @@
{
// We want a fast ramp up, so make this proportional to globalMax
int increment = globalMax >> 2;
- if (increment < 0)
+ if (increment == 0)
increment = 1;
optimalTarget += increment;
}
+ //System.out.println(serviceTypeName+":maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
// Now compute actual target
int target = maximumTarget;
if (target > fairTarget)
@@ -626,6 +630,7 @@
if (target > optimalTarget)
target = optimalTarget;
+ //System.out.println(serviceTypeName+":Picking target="+target+"; localInUse="+localInUse);
// Write these values to the service data variables.
// NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
// So, that's why we have a write lock around the pool calculations.
@@ -635,6 +640,7 @@
// Now, update our localMax
if (target == localMax)
return;
+ //System.out.println(serviceTypeName+":Updating target: "+target);
// Compute the number of instances in use locally
localInUse = localMax - numFree;
localMax = target;
@@ -647,6 +653,35 @@
{
lockManager.leaveWriteLock(targetCalcLockName);
}
+
+ // Finally, free pooled instances in excess of target
+ while (stack.size() > 0 && stack.size() > numFree)
+ {
+ // Try to find a connector instance that is not actually connected.
+ // These are likely to be at the front of the queue, since those are the
+ // oldest.
+ int j;
+ for (j = 0; j < stack.size(); j++)
+ {
+ if (!stack.get(j).isConnected())
+ break;
+ }
+ T rc;
+ if (j == stack.size())
+ rc = stack.remove(stack.size()-1);
+ else
+ rc = stack.remove(j);
+ rc.setThreadContext(threadContext);
+ try
+ {
+ rc.disconnect();
+ }
+ finally
+ {
+ rc.clearThreadContext();
+ }
+ }
+
}
/** Flush unused connectors.
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
new file mode 100644
index 0000000..1d76f00
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
@@ -0,0 +1,112 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+import java.util.*;
+
+/** An IConnectionThrottler object is not thread-local. It gates connection
+* creation and pool management.
+* The underlying model is a pool of connections. A connection gets pulled off the pool and
+* used to perform a fetch. If there are insufficient connections in the pool, and there is
+* sufficient capacity to create a new connection, a connection will be created instead.
+* When the fetch is done, the connection is returned, and then there is a decision whether
+* or not to put the connection back into the pool, or to destroy it. Finally, the pool is
+* periodically evaluated, and connections may be destroyed if either they have expired,
+* or the allocated connections are still over capacity.
+*
+* This object does not in itself contain a connection pool - but it is intended to assist
+* in the management of that pool. Specifically, it tracks connections that are in the
+* pool, and connections that are handed out for use, and performs ALL the waiting needed
+* due to the pool being empty and/or the number of active connections being at or over
+* the quota.
+*/
+public interface IConnectionThrottler
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ // For grabbing a connection for use
+
+ /** Get the connection from the pool */
+ public final static int CONNECTION_FROM_POOL = 0;
+ /** Create a connection */
+ public final static int CONNECTION_FROM_CREATION = 1;
+ /** Pool shutting down */
+ public final static int CONNECTION_FROM_NOWHERE = -1;
+
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
+ */
+ public int waitConnectionAvailable()
+ throws InterruptedException;
+
+ /** For a new connection, obtain the fetch throttler to use for the connection.
+ * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+ * the calling code is expected to create a connection using the result of this method.
+ *@return the fetch throttler for a new connection.
+ */
+ public IFetchThrottler getNewConnectionFetchThrottler();
+
+ /** This method indicates whether a formerly in-use connection should be placed back
+ * in the pool or destroyed.
+ *@return true if the connection should not be put into the pool but should instead
+ * simply be destroyed. If true is returned, the caller MUST call noteConnectionDestroyed()
+ * after the connection is destroyed in order for the bookkeeping to work. If false
+ * is returned, the caller MUST call noteConnectionReturnedToPool() after the connection
+ * is returned to the pool.
+ */
+ public boolean noteReturnedConnection();
+
+ /** This method calculates whether a connection should be taken from the pool and destroyed
+ /* in order to meet quota requirements. If this method returns
+ /* true, you MUST remove a connection from the pool, and you MUST call
+ /* noteConnectionDestroyed() afterwards.
+ *@return true if a pooled connection should be destroyed. If true is returned, the
+ * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+ */
+ public boolean checkDestroyPooledConnection();
+
+ /** Connection expiration is tricky, because even though a connection may be identified as
+ * being expired, at the very same moment it could be handed out in another thread. So there
+ * is a natural race condition present.
+ * The way the connection throttler deals with that is to allow the caller to reserve a connection
+ * for expiration. This must be called BEFORE the actual identified connection is removed from the
+ * connection pool. If the value returned by this method is "true", then a connection MUST be removed
+ * from the pool and destroyed, whether or not the identified connection is actually still available for
+ * destruction or not.
+ *@return true if a connection from the pool can be expired. If true is returned, noteConnectionDestruction()
+ * MUST be called once the connection has actually been destroyed.
+ */
+ public boolean checkExpireConnection();
+
+ /** Note that a connection has been returned to the pool. Call this method after a connection has been
+ * placed back into the pool and is available for use.
+ */
+ public void noteConnectionReturnedToPool();
+
+ /** Note that a connection has been destroyed. Call this method ONLY after noteReturnedConnection()
+ * or checkDestroyPooledConnection() returns true, AND the connection has been already
+ * destroyed.
+ */
+ public void noteConnectionDestroyed();
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
new file mode 100644
index 0000000..91d51f4
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
@@ -0,0 +1,46 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** An IFetchThrottler object is meant to be used as part of a fetch cycle. It is not
+* thread-local, and does not require access to a thread context. It thus also does not
+* throw ManifoldCFExceptions. It is thus suitable for use in background threads, etc.
+* These objects are typically created by IConnectionThrottler objects - they are not meant
+* to be created directly.
+*/
+public interface IFetchThrottler
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Get permission to fetch a document. This grants permission to start
+ * fetching a single document, within the connection that has already been
+ * granted permission that created this object.
+ *@return false if the throttler is being shut down.
+ */
+ public boolean obtainFetchDocumentPermission()
+ throws InterruptedException;
+
+ /** Open a fetch stream. When done (or aborting), call
+ * IStreamThrottler.closeStream() to note the completion of the document
+ * fetch activity.
+ *@return the stream throttler to use to throttle the actual data access.
+ */
+ public IStreamThrottler createFetchStream();
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
new file mode 100644
index 0000000..b8e1759
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
@@ -0,0 +1,50 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** An IConnectionThrottler object is meant to be embedded in an InputStream. It is not
+* thread-local, and does not require access to a thread context. It thus also does not
+* throw ManifoldCFExceptions. It is thus suitable for use in background threads, etc.
+* These objects are typically created by IFetchThrottler objects - they are not meant
+* to be created directly.
+*/
+public interface IStreamThrottler
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
+ * The throttle group, bin names, etc are already known
+ * to this specific interface object, so it is unnecessary to include them here.
+ *@param byteCount is the number of bytes to get permissions to read.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
+ */
+ public boolean obtainReadPermission(int byteCount)
+ throws InterruptedException;
+
+ /** Note the completion of the read of a block of bytes. Call this after
+ * obtainReadPermission() was successfully called, and bytes were successfully read.
+ *@param origByteCount is the originally requested number of bytes to get permissions to read.
+ *@param actualByteCount is the number of bytes actually read.
+ */
+ public void releaseReadPermission(int origByteCount, int actualByteCount);
+
+ /** Note the stream being closed.
+ */
+ public void closeStream();
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
new file mode 100644
index 0000000..0e5df5d
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
@@ -0,0 +1,86 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+import java.util.*;
+
+/** An IThrottleGroups object is thread-local and creates a virtual pool
+* of connections to resources whose access needs to be throttled in number,
+* rate of use, and byte rate.
+*/
+public interface IThrottleGroups
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
+ */
+ public Set<String> getThrottleGroups(String throttleGroupType)
+ throws ManifoldCFException;
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+ throws ManifoldCFException;
+
+ /** Create or update a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ public void createOrUpdateThrottleGroup(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ throws ManifoldCFException;
+
+ /** Construct connection throttler for connections with specific bin names. This object is meant to be embedded with a connection
+ * pool of similar objects, and used to gate the creation of new connections in that pool.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames are the connection type bin names.
+ *@return the connection throttling object, or null if the pool is being shut down.
+ */
+ public IConnectionThrottler obtainConnectionThrottler(String throttleGroupType, String throttleGroup, String[] binNames)
+ throws ManifoldCFException;
+
+ /** Poll periodically, to update cluster-wide statistics and allocation.
+ *@param throttleGroupType is the throttle group type to update.
+ */
+ public void poll(String throttleGroupType)
+ throws ManifoldCFException;
+
+ /** Poll periodically, to update ALL cluster-wide statistics and allocation.
+ */
+ public void poll()
+ throws ManifoldCFException;
+
+ /** Free all unused resources.
+ */
+ public void freeUnusedResources()
+ throws ManifoldCFException;
+
+ /** Shut down throttler permanently.
+ */
+ public void destroy()
+ throws ManifoldCFException;
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
new file mode 100644
index 0000000..3b8cfcd
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
@@ -0,0 +1,44 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+
+/** An IThrottleSpec object describes what throttling criteria to apply
+* per bin.
+*/
+public interface IThrottleSpec
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Given a bin name, find the max open connections to use for that bin.
+ *@return Integer.MAX_VALUE if no limit found.
+ */
+ public int getMaxOpenConnections(String binName);
+
+ /** Look up minimum milliseconds per byte for a bin.
+ *@return 0.0 if no limit found.
+ */
+ public double getMinimumMillisecondsPerByte(String binName);
+
+ /** Look up minimum milliseconds for a fetch for a bin.
+ *@return 0 if no limit found.
+ */
+ public long getMinimumMillisecondsPerFetch(String binName);
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
new file mode 100644
index 0000000..96c4dc2
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
@@ -0,0 +1,50 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** Thread-local IThrottleGroups factory.
+*/
+public class ThrottleGroupsFactory
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ // name to use in thread context pool of objects
+ private final static String objectName = "_ThrottleGroups_";
+
+ private ThrottleGroupsFactory()
+ {
+ }
+
+ /** Make a connection throttle handle.
+ *@param tc is the thread context.
+ *@return the handle.
+ */
+ public static IThrottleGroups make(IThreadContext tc)
+ throws ManifoldCFException
+ {
+ Object o = tc.get(objectName);
+ if (o == null || !(o instanceof IThrottleGroups))
+ {
+ o = new org.apache.manifoldcf.core.throttler.ThrottleGroups(tc);
+ tc.save(objectName,o);
+ }
+ return (IThrottleGroups)o;
+ }
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
index a5e23ef..90617e7 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
@@ -511,6 +511,63 @@
zookeeperWatcher = null;
}
+ public static String zooKeeperSafeName(String input)
+ {
+ // Escape "/" characters
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < input.length(); i++)
+ {
+ char x = input.charAt(i);
+ if (x == '/')
+ sb.append('\\').append('0');
+ else if (x == '\u007f')
+ sb.append('\\').append('1');
+ else if (x == '\\')
+ sb.append('\\').append('\\');
+ else if (x >= '\u0000' && x < '\u0020')
+ sb.append('\\').append(x + '\u0040');
+ else if (x >= '\u0080' && x < '\u00a0')
+ sb.append('\\').append(x + '\u0060' - '\u0080');
+ else
+ sb.append(x);
+ }
+ return sb.toString();
+ }
+
+ public static String zooKeeperDecodeSafeName(String input)
+ {
+ // Escape "/" characters
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ while (i < input.length())
+ {
+ char x = input.charAt(i);
+ if (x == '\\')
+ {
+ i++;
+ if (i == input.length())
+ throw new RuntimeException("Supposedly safe zookeeper name is not properly encoded!!");
+ x = input.charAt(i);
+ if (x == '0')
+ sb.append('/');
+ else if (x == '1')
+ sb.append('\u007f');
+ else if (x == '\\')
+ sb.append('\\');
+ else if (x >= '\u0040' && x < '\u0060')
+ sb.append(x - '\u0040');
+ else if (x >= '\u0060' && x < '\u0080')
+ sb.append(x - '\u0060' + '\u0080');
+ else
+ throw new RuntimeException("Supposedly safe zookeeper name is not properly encoded!!");
+ }
+ else
+ sb.append(x);
+ i++;
+ }
+ return sb.toString();
+ }
+
// Protected methods
/** Create a node and a sequential child node. Neither node has any data.
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
index 7526880..abeee94 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
@@ -152,7 +152,9 @@
if (serviceName == null)
serviceName = constructUniqueServiceName(connection, serviceType);
- String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+ String encodedServiceName = ZooKeeperConnection.zooKeeperSafeName(serviceName);
+
+ String activePath = buildServiceTypeActivePath(serviceType, encodedServiceName);
if (connection.checkNodeExists(activePath))
throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"' is already active");
// First, see where we stand.
@@ -162,11 +164,11 @@
List<String> children = connection.getChildren(registrationNodePath);
boolean foundService = false;
boolean foundActiveService = false;
- for (String registeredServiceName : children)
+ for (String encodedRegisteredServiceName : children)
{
- if (registeredServiceName.equals(serviceName))
+ if (encodedRegisteredServiceName.equals(encodedServiceName))
foundService = true;
- if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+ if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
foundActiveService = true;
}
@@ -197,17 +199,17 @@
if (unregisterAll)
{
// Unregister all (since we did a global cleanup)
- for (String registeredServiceName : children)
+ for (String encodedRegisteredServiceName : children)
{
- if (!registeredServiceName.equals(serviceName))
- connection.deleteChild(registrationNodePath, registeredServiceName);
+ if (!encodedRegisteredServiceName.equals(encodedServiceName))
+ connection.deleteChild(registrationNodePath, encodedRegisteredServiceName);
}
}
// Now, register (if needed)
if (!foundService)
{
- connection.createChild(registrationNodePath, serviceName);
+ connection.createChild(registrationNodePath, encodedServiceName);
}
// Last, set the appropriate active flag
@@ -248,7 +250,7 @@
enterServiceRegistryWriteLock(connection, serviceType);
try
{
- String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+ String activePath = buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName));
connection.setNodeData(activePath, (serviceData==null)?new byte[0]:serviceData);
}
finally
@@ -284,7 +286,7 @@
enterServiceRegistryReadLock(connection, serviceType);
try
{
- String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+ String activePath = buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName));
return connection.getNodeData(activePath);
}
finally
@@ -321,13 +323,13 @@
{
String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
List<String> children = connection.getChildren(registrationNodePath);
- for (String registeredServiceName : children)
+ for (String encodedRegisteredServiceName : children)
{
- String activeNodePath = buildServiceTypeActivePath(serviceType, registeredServiceName);
+ String activeNodePath = buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName);
if (connection.checkNodeExists(activeNodePath))
{
byte[] serviceData = connection.getNodeData(activeNodePath);
- if (dataAcceptor.acceptServiceData(registeredServiceName, serviceData))
+ if (dataAcceptor.acceptServiceData(ZooKeeperConnection.zooKeeperDecodeSafeName(encodedRegisteredServiceName), serviceData))
break;
}
}
@@ -368,9 +370,9 @@
String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
List<String> children = connection.getChildren(registrationNodePath);
int activeServiceCount = 0;
- for (String registeredServiceName : children)
+ for (String encodedRegisteredServiceName : children)
{
- if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+ if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
activeServiceCount++;
}
return activeServiceCount;
@@ -417,25 +419,25 @@
// Presumably the caller will lather, rinse, and repeat.
String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
List<String> children = connection.getChildren(registrationNodePath);
- String serviceName = null;
- for (String registeredServiceName : children)
+ String encodedServiceName = null;
+ for (String encodedRegisteredServiceName : children)
{
- if (!connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+ if (!connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
{
- serviceName = registeredServiceName;
+ encodedServiceName = encodedRegisteredServiceName;
break;
}
}
- if (serviceName == null)
+ if (encodedServiceName == null)
return true;
// Found one, in serviceName, at position i
// Ideally, we should signal at this point that we're cleaning up after it, and then leave
// the exclusive lock, so that other activity can take place. MHL
- cleanup.cleanUpService(serviceName);
+ cleanup.cleanUpService(ZooKeeperConnection.zooKeeperDecodeSafeName(encodedServiceName));
// Unregister the service.
- connection.deleteChild(registrationNodePath, serviceName);
+ connection.deleteChild(registrationNodePath, encodedServiceName);
return false;
}
finally
@@ -473,7 +475,7 @@
enterServiceRegistryWriteLock(connection, serviceType);
try
{
- connection.deleteNode(buildServiceTypeActivePath(serviceType, serviceName));
+ connection.deleteNode(buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName)));
}
finally
{
@@ -510,7 +512,7 @@
enterServiceRegistryReadLock(connection, serviceType);
try
{
- return connection.checkNodeExists(buildServiceTypeActivePath(serviceType, serviceName));
+ return connection.checkNodeExists(buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName)));
}
finally
{
@@ -576,7 +578,7 @@
*/
protected static String makeServiceCounterName(String serviceType)
{
- return SERVICETYPE_ANONYMOUS_COUNTER_PREFIX + serviceType;
+ return SERVICETYPE_ANONYMOUS_COUNTER_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
}
/** Read service counter.
@@ -621,21 +623,21 @@
*/
protected static String buildServiceTypeLockPath(String serviceType)
{
- return SERVICETYPE_LOCK_PATH_PREFIX + serviceType;
+ return SERVICETYPE_LOCK_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
}
/** Build a zk path for the active node for a specific service of a specific type.
*/
- protected static String buildServiceTypeActivePath(String serviceType, String serviceName)
+ protected static String buildServiceTypeActivePath(String serviceType, String encodedServiceName)
{
- return SERVICETYPE_ACTIVE_PATH_PREFIX + serviceType + "-" + serviceName;
+ return SERVICETYPE_ACTIVE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType) + "-" + encodedServiceName;
}
/** Build a zk path for the registration node for a specific service type.
*/
protected static String buildServiceTypeRegistrationPath(String serviceType)
{
- return SERVICETYPE_REGISTER_PATH_PREFIX + serviceType;
+ return SERVICETYPE_REGISTER_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
}
// Shared configuration
@@ -731,7 +733,7 @@
ZooKeeperConnection connection = pool.grab();
try
{
- connection.setGlobalFlag(FLAG_PATH_PREFIX + flagName);
+ connection.setGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
}
finally
{
@@ -756,7 +758,7 @@
ZooKeeperConnection connection = pool.grab();
try
{
- connection.clearGlobalFlag(FLAG_PATH_PREFIX + flagName);
+ connection.clearGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
}
finally
{
@@ -782,7 +784,7 @@
ZooKeeperConnection connection = pool.grab();
try
{
- return connection.checkGlobalFlag(FLAG_PATH_PREFIX + flagName);
+ return connection.checkGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
}
finally
{
@@ -809,7 +811,7 @@
ZooKeeperConnection connection = pool.grab();
try
{
- return connection.readData(RESOURCE_PATH_PREFIX + resourceName);
+ return connection.readData(RESOURCE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(resourceName));
}
finally
{
@@ -836,7 +838,7 @@
ZooKeeperConnection connection = pool.grab();
try
{
- connection.writeData(RESOURCE_PATH_PREFIX + resourceName, data);
+ connection.writeData(RESOURCE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(resourceName), data);
}
finally
{
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
index dd1f599..b68c970 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
@@ -41,7 +41,7 @@
{
super(lockPool,lockKey);
this.pool = pool;
- this.lockPath = LOCK_PATH_PREFIX + lockKey.toString();
+ this.lockPath = LOCK_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(lockKey.toString());
}
@Override
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java b/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
index 632274e..59683d8 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
@@ -253,6 +253,8 @@
masterDatabaseUsername = LockManagerFactory.getStringProperty(threadContext,masterDatabaseUsernameProperty,"manifoldcf");
masterDatabasePassword = LockManagerFactory.getStringProperty(threadContext,masterDatabasePasswordProperty,"local_pg_passwd");
+ // Register the throttler for cleanup on shutdown
+ addShutdownHook(new ThrottlerShutdown());
// Register the file tracker for cleanup on shutdown
tracker = new FileTrack();
addShutdownHook(tracker);
@@ -1383,6 +1385,38 @@
}
+ /** Class that cleans up throttler on exit */
+ protected static class ThrottlerShutdown implements IShutdownHook
+ {
+ public ThrottlerShutdown()
+ {
+ }
+
+ @Override
+ public void doCleanup(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ IThrottleGroups connectionThrottler = ThrottleGroupsFactory.make(threadContext);
+ connectionThrottler.destroy();
+ }
+
+ /** Finalizer, which is designed to catch class unloading that tomcat 5.5 does.
+ */
+ protected void finalize()
+ throws Throwable
+ {
+ try
+ {
+ doCleanup(ThreadContextFactory.make());
+ }
+ finally
+ {
+ super.finalize();
+ }
+ }
+
+ }
+
/** Class that cleans up database handles on exit */
protected static class DatabaseShutdown implements IShutdownHook
{
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
new file mode 100644
index 0000000..32d64c2
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
@@ -0,0 +1,437 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.concurrent.atomic.*;
+import java.util.*;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out throttling for connections,
+* on a bin-by-bin basis. It is *not*, however, a connection pool. Actually establishing
+* connections, and pooling established connections, is functionality that must reside in the
+* caller.
+*
+* The 'connections' each connection bin tracks are connections outstanding that share this bin name.
+* Not all such connections are identical; some may in fact have entirely different sets of
+* bins associated with them, but they all have the specific bin in common. Since each bin has its
+* own unique limit, this effectively means that in order to get a connection, you need to find an
+* available slot in ALL of its constituent connection bins. If the connections are pooled, it makes
+* the most sense to divide the pool up by characteristics such that identical connections are all
+* handled together - and it is reasonable to presume that an identical connection has identical
+* connection bins.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ConnectionBin
+{
+ /** True if this bin is alive still */
+ protected boolean isAlive = true;
+ /** This is the bin name which this connection pool belongs to */
+ protected final String binName;
+ /** Service type name */
+ protected final String serviceTypeName;
+ /** The (anonymous) service name */
+ protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
+ /** This is the maximum number of active connections allowed for this bin */
+ protected int maxActiveConnections = 0;
+
+ /** This is the local maximum number of active connections allowed for this bin */
+ protected int localMax = 0;
+ /** This is the number of connections in this bin that have been reserved - that is, they
+ * are promised to various callers, but those callers have not yet committed to obtaining them. */
+ protected int reservedConnections = 0;
+ /** This is the number of connections in this bin that are connected; immaterial whether they are
+ * in use or in a pool somewhere. */
+ protected int inUseConnections = 0;
+
+ /** The service type prefix for connection bins */
+ protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
+
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
+
+ /** Random number */
+ protected final static Random randomNumberGenerator = new Random();
+
+ /** Constructor. */
+ public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+ throws ManifoldCFException
+ {
+ this.binName = binName;
+ this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+ // Now, register and activate service anonymously, and record the service name we get.
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+ }
+
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+ {
+ return serviceTypePrefix + throttlingGroupName + "_" + binName;
+ }
+
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Update the maximum number of active connections.
+ */
+ public synchronized void updateMaxActiveConnections(int maxActiveConnections)
+ {
+ // Update the number; the poller will wake up any waiting threads.
+ this.maxActiveConnections = maxActiveConnections;
+ }
+
+ /** Wait for a connection to become available, in the context of an existing connection pool.
+ *@param poolCount is the number of connections in the pool times the number of bins per connection.
+ * This parameter is only ever changed in this class!!
+ *@return a recommendation as to how to proceed, using the IConnectionThrottler values. If the
+ * recommendation is to create a connection, a slot will be reserved for that purpose. A
+ * subsequent call to noteConnectionCreation() will be needed to confirm the reservation, or clearReservation() to
+ * release the reservation.
+ */
+ public synchronized int waitConnectionAvailable(AtomicInteger poolCount)
+ throws InterruptedException
+ {
+ // Reserved connections keep a slot available which can't be used by anyone else.
+ // Connection bins are always sorted so that deadlocks can't occur.
+ // Once all slots are reserved, the caller will go ahead and create the necessary connection
+ // and convert the reservation to a new connection.
+
+ while (true)
+ {
+ if (!isAlive)
+ return IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+ int currentPoolCount = poolCount.get();
+ if (currentPoolCount > 0)
+ {
+ // Recommendation is to pull the connection from the pool.
+ poolCount.set(currentPoolCount - 1);
+ return IConnectionThrottler.CONNECTION_FROM_POOL;
+ }
+ if (inUseConnections + reservedConnections < localMax)
+ {
+ reservedConnections++;
+ return IConnectionThrottler.CONNECTION_FROM_CREATION;
+ }
+ // Wait for a connection to free up. Note that it is up to the caller to free stuff up.
+ wait();
+ }
+ }
+
+ /** Undo what we had decided to do before.
+ *@param recommendation is the decision returned by waitForConnection() above.
+ */
+ public synchronized void undoReservation(int recommendation, AtomicInteger poolCount)
+ {
+ if (recommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ if (reservedConnections == 0)
+ throw new IllegalStateException("Can't clear a reservation we don't have");
+ reservedConnections--;
+ notifyAll();
+ }
+ else if (recommendation == IConnectionThrottler.CONNECTION_FROM_POOL)
+ {
+ poolCount.set(poolCount.get() + 1);
+ notifyAll();
+ }
+ }
+
+ /** Note the creation of an active connection that belongs to this bin. The connection MUST
+ * have been reserved prior to the connection being created.
+ */
+ public synchronized void noteConnectionCreation()
+ {
+ if (reservedConnections == 0)
+ throw new IllegalStateException("Creating a connection when no connection slot reserved!");
+ reservedConnections--;
+ inUseConnections++;
+ // No notification needed because the total number of reserved+active connections did not change.
+ }
+
+ /** Figure out whether we are currently over target or not for this bin.
+ */
+ public synchronized boolean shouldReturnedConnectionBeDestroyed()
+ {
+ // We don't count reserved connections here because those are not yet committed
+ return inUseConnections > localMax;
+ }
+
+ public static final int CONNECTION_DESTROY = 0;
+ public static final int CONNECTION_POOLEMPTY = 1;
+ public static final int CONNECTION_WITHINBOUNDS = 2;
+
+ /** Figure out whether we are currently over target or not for this bin, and whether a
+ * connection should be pulled from the pool and destroyed.
+ * Note that this is tricky in conjunction with other bins, because those other bins
+ * may conclude that we can't destroy a connection. If so, we just return the stolen
+ * connection back to the pool.
+ *@return CONNECTION_DESTROY, CONNECTION_POOLEMPTY, or CONNECTION_WITHINBOUNDS.
+ */
+ public synchronized int shouldPooledConnectionBeDestroyed(AtomicInteger poolCount)
+ {
+ int currentPoolCount = poolCount.get();
+ if (currentPoolCount > 0)
+ {
+ // Consider it removed from the pool for the purposes of consideration. If we change our minds, we'll
+ // return it, and no harm done.
+ poolCount.set(currentPoolCount-1);
+ // We don't count reserved connections here because those are not yet committed.
+ if (inUseConnections > localMax)
+ {
+ return CONNECTION_DESTROY;
+ }
+ return CONNECTION_WITHINBOUNDS;
+ }
+ return CONNECTION_POOLEMPTY;
+ }
+
+ /** Check only if there's a pooled connection, and make moves to take it from the pool.
+ */
+ public synchronized boolean hasPooledConnection(AtomicInteger poolCount)
+ {
+ int currentPoolCount = poolCount.get();
+ if (currentPoolCount > 0)
+ {
+ poolCount.set(currentPoolCount-1);
+ return true;
+ }
+ return false;
+ }
+
+ /** Undo the decision to destroy a pooled connection.
+ */
+ public synchronized void undoPooledConnectionDecision(AtomicInteger poolCount)
+ {
+ poolCount.set(poolCount.get() + 1);
+ notifyAll();
+ }
+
+ /** Note a connection returned to the pool.
+ */
+ public synchronized void noteConnectionReturnedToPool(AtomicInteger poolCount)
+ {
+ poolCount.set(poolCount.get() + 1);
+ // Wake up threads possibly waiting on a pool return.
+ notifyAll();
+ }
+
+ /** Note the destruction of an active connection that belongs to this bin.
+ */
+ public synchronized void noteConnectionDestroyed()
+ {
+ inUseConnections--;
+ notifyAll();
+ }
+
+ /** Poll this bin */
+ public synchronized void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // The meat of the cross-cluster apportionment algorithm goes here!
+ // Two global numbers each service posts: "in-use" and "target". At no time does a service *ever* post either a "target"
+ // that, together with all other active service targets, is in excess of the max. Also, at no time a service post
+ // a target that, when added to the other "in-use" values, exceeds the max. If the "in-use" values everywhere else
+ // already equal or exceed the max, then the target will be zero.
+ // The target quota is calculated as follows:
+ // (1) Target is summed, excluding ours. This is GlobalTarget.
+ // (2) In-use is summed, excluding ours. This is GlobalInUse.
+ // (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
+ // smaller, but never less than zero.
+ // (4) Our FairTarget is computed. The FairTarget divides the Maximum by the number of services, and adds
+ // 1 randomly based on the remainder.
+ // (5) We compute OptimalTarget as follows: We start with current local target. If current local target
+ // exceeds current local in-use count, we adjust OptimalTarget downward by one. Otherwise we increase it
+ // by one.
+ // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // Compute MaximumTarget
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+ //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ int globalTarget = sumClass.getGlobalTarget();
+ int globalInUse = sumClass.getGlobalInUse();
+ int maximumTarget = maxActiveConnections - globalTarget;
+ if (maximumTarget > maxActiveConnections - globalInUse)
+ maximumTarget = maxActiveConnections - globalInUse;
+ if (maximumTarget < 0)
+ maximumTarget = 0;
+
+ // Compute FairTarget
+ int fairTarget = maxActiveConnections / numServices;
+ int remainder = maxActiveConnections % numServices;
+ // Randomly choose whether we get an addition to the FairTarget
+ if (randomNumberGenerator.nextInt(numServices) < remainder)
+ fairTarget++;
+
+ // Compute OptimalTarget
+ int localInUse = inUseConnections;
+ int optimalTarget = localMax;
+ if (localMax > localInUse)
+ optimalTarget--;
+ else
+ {
+ // We want a fast ramp up, so make this proportional to maxActiveConnections
+ int increment = maxActiveConnections >> 2;
+ if (increment == 0)
+ increment = 1;
+ optimalTarget += increment;
+ }
+
+ //System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
+ // Now compute actual target
+ int target = maximumTarget;
+ if (target > fairTarget)
+ target = fairTarget;
+ if (target > optimalTarget)
+ target = optimalTarget;
+
+ // Write these values to the service data variables.
+ // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+ // So, that's why we have a write lock around the pool calculations.
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
+
+ // Now, update our localMax, if it needs it.
+ if (target == localMax)
+ return;
+ localMax = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+ }
+
+ /** Shut down the bin, and release everything that is waiting on it.
+ */
+ public synchronized void shutDown(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ isAlive = false;
+ notifyAll();
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.endServiceActivity(serviceTypeName, serviceName);
+ }
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected int globalTargetTally = 0;
+ protected int globalInUseTally = 0;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ globalInUseTally += unpackInUse(serviceData);
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public int getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ public int getGlobalInUse()
+ {
+ return globalInUseTally;
+ }
+
+ }
+
+ protected static int unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0;
+ return (((int)data[0]) & 0xff) +
+ ((((int)data[1]) << 8) & 0xff00) +
+ ((((int)data[2]) << 16) & 0xff0000) +
+ ((((int)data[3]) << 24) & 0xff000000);
+ }
+
+ protected static int unpackInUse(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0;
+ return (((int)data[4]) & 0xff) +
+ ((((int)data[5]) << 8) & 0xff00) +
+ ((((int)data[6]) << 16) & 0xff0000) +
+ ((((int)data[7]) << 24) & 0xff000000);
+ }
+
+ protected static byte[] pack(int target, int inUse)
+ {
+ byte[] rval = new byte[8];
+ rval[0] = (byte)(target & 0xff);
+ rval[1] = (byte)((target >> 8) & 0xff);
+ rval[2] = (byte)((target >> 16) & 0xff);
+ rval[3] = (byte)((target >> 24) & 0xff);
+ rval[4] = (byte)(inUse & 0xff);
+ rval[5] = (byte)((inUse >> 8) & 0xff);
+ rval[6] = (byte)((inUse >> 16) & 0xff);
+ rval[7] = (byte)((inUse >> 24) & 0xff);
+ return rval;
+ }
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
new file mode 100644
index 0000000..c05b28c
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
@@ -0,0 +1,368 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out fetch rate throttling for connections,
+* on a bin-by-bin basis.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class FetchBin
+{
+ /** This is set to true until the bin is shut down. */
+ protected boolean isAlive = true;
+ /** This is the bin name which this connection pool belongs to */
+ protected final String binName;
+ /** Service type name */
+ protected final String serviceTypeName;
+ /** The (anonymous) service name */
+ protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
+ /** This is the minimum time between fetches for this bin, in ms. */
+ protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+ /** The local minimum time between fetches */
+ protected long localMinimum = Long.MAX_VALUE;
+
+ /** This is the last time a fetch was done on this bin */
+ protected long lastFetchTime = 0L;
+ /** Is the next fetch reserved? */
+ protected boolean reserveNextFetch = false;
+
+ /** The service type prefix for fetch bins */
+ protected final static String serviceTypePrefix = "_FETCHBIN_";
+
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
+ /** Constructor. */
+ public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+ throws ManifoldCFException
+ {
+ this.binName = binName;
+ this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+ // Now, register and activate service anonymously, and record the service name we get.
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+ }
+
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+ {
+ return serviceTypePrefix + throttlingGroupName + "_" + binName;
+ }
+
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Update the maximum number of active connections.
+ */
+ public synchronized void updateMinTimeBetweenFetches(long minTimeBetweenFetches)
+ {
+ // Update the number and wake up any waiting threads; they will take care of everything.
+ this.minTimeBetweenFetches = minTimeBetweenFetches;
+ }
+
+ /** Reserve a request to fetch a document from this bin. The actual fetch is not yet committed
+ * with this call, but if it succeeds for all bins associated with the document, then the caller
+ * has permission to do the fetch, and can update the last fetch time.
+ *@return false if the fetch bin is being shut down.
+ */
+ public synchronized boolean reserveFetchRequest()
+ throws InterruptedException
+ {
+ // First wait for the ability to even get the next fetch from this bin
+ while (true)
+ {
+ if (!isAlive)
+ return false;
+ if (!reserveNextFetch)
+ {
+ reserveNextFetch = true;
+ return true;
+ }
+ wait();
+ }
+ }
+
+ /** Clear reserved request.
+ */
+ public synchronized void clearReservation()
+ {
+ if (!reserveNextFetch)
+ throw new IllegalStateException("Can't clear a fetch reservation we don't have");
+ reserveNextFetch = false;
+ notifyAll();
+ }
+
+ /** Wait the necessary time to do the fetch. Presumes we've reserved the next fetch
+ * rights already, via reserveFetchRequest().
+ *@return false if the wait did not complete because the bin was shut down.
+ */
+ public synchronized boolean waitNextFetch()
+ throws InterruptedException
+ {
+ if (!reserveNextFetch)
+ throw new IllegalStateException("No fetch request reserved!");
+
+ while (true)
+ {
+ if (!isAlive)
+ // Leave it to the caller to undo reservations
+ return false;
+ if (localMinimum == Long.MAX_VALUE)
+ {
+ // wait forever - but eventually someone will set a smaller interval and wake us up.
+ wait();
+ }
+ else
+ {
+ long currentTime = System.currentTimeMillis();
+ // Compute how long we have to wait, based on the current time and the time of the last fetch.
+ long waitAmt = lastFetchTime + localMinimum - currentTime;
+ if (waitAmt <= 0L)
+ {
+ // Note actual time we start the fetch.
+ if (currentTime > lastFetchTime)
+ lastFetchTime = currentTime;
+ reserveNextFetch = false;
+ return true;
+ }
+ wait(waitAmt);
+ }
+ }
+ }
+
+ /** Poll this bin */
+ public synchronized void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // This is where the cross-cluster logic happens.
+ // Each service records the following information:
+ // -- the target rate, in fetches per millisecond
+ // -- the earliest possible time for the service's next fetch, in ms from start of epoch
+ // Target rates are apportioned in fetches-per-ms space, as follows:
+ // (1) Target rate is summed cross-cluster, excluding our local service. This is GlobalTarget.
+ // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+ // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+ // (4) Finally, we compute Target rate by taking the minimum of MaximumTarget, FairTarget.
+ // The earliest time for the next fetch is computed as follows:
+ // (1) Find the LATEST most recent fetch time across the services, including an updated time for
+ // the local service.
+ // (2) Compute the next possible fetch time, using the Target rate and that fetch time.
+ // (3) The new targeted fetch time will be set to that value.
+
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ double globalTarget = sumClass.getGlobalTarget();
+ long earliestTargetTime = sumClass.getEarliestTime();
+ long currentTime = System.currentTimeMillis();
+
+ if (lastFetchTime == 0L)
+ earliestTargetTime = currentTime;
+ else if (earliestTargetTime > lastFetchTime)
+ earliestTargetTime = lastFetchTime;
+
+ // Now, compute the target rate
+ double globalMaxFetchesPerMillisecond;
+ double maximumTarget;
+ double fairTarget;
+ if (minTimeBetweenFetches == 0.0)
+ {
+ //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+ globalMaxFetchesPerMillisecond = Double.MAX_VALUE;
+ maximumTarget = globalMaxFetchesPerMillisecond;
+ fairTarget = globalMaxFetchesPerMillisecond;
+ }
+ else
+ {
+ globalMaxFetchesPerMillisecond = 1.0 / minTimeBetweenFetches;
+ //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+ maximumTarget = globalMaxFetchesPerMillisecond - globalTarget;
+ if (maximumTarget < 0.0)
+ maximumTarget = 0.0;
+
+ // Compute FairTarget
+ fairTarget = globalMaxFetchesPerMillisecond / numServices;
+ }
+
+ // Now compute actual target
+ double inverseTarget = maximumTarget;
+ if (inverseTarget > fairTarget)
+ inverseTarget = fairTarget;
+
+ long target;
+ if (inverseTarget == 0.0)
+ target = Long.MAX_VALUE;
+ else
+ target = (long)(1.0/inverseTarget +0.5);
+
+ long nextFetchTime = earliestTargetTime + target;
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, nextFetchTime));
+
+ // Update local parameters: the rate, and the next time.
+ // But in order to update the next time, we have to update the last time.
+ if (target == localMinimum && earliestTargetTime == lastFetchTime)
+ return;
+ //System.out.println(binName+":Setting localMinimum="+target+"; last fetch time="+earliestTargetTime);
+ localMinimum = target;
+ lastFetchTime = earliestTargetTime;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
+ }
+
+ /** Shut the bin down, and wake up all threads waiting on it.
+ */
+ public synchronized void shutDown(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ isAlive = false;
+ notifyAll();
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.endServiceActivity(serviceTypeName, serviceName);
+ }
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected double globalTargetTally = 0;
+ protected long earliestTime = Long.MAX_VALUE;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ long checkTime = unpackEarliestTime(serviceData);
+ if (checkTime < earliestTime)
+ earliestTime = checkTime;
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public double getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ public long getEarliestTime()
+ {
+ return earliestTime;
+ }
+ }
+
+ protected static double unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0.0;
+ return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+ ((((long)data[1]) << 8) & 0xff00L) +
+ ((((long)data[2]) << 16) & 0xff0000L) +
+ ((((long)data[3]) << 24) & 0xff000000L) +
+ ((((long)data[4]) << 32) & 0xff00000000L) +
+ ((((long)data[5]) << 40) & 0xff0000000000L) +
+ ((((long)data[6]) << 48) & 0xff000000000000L) +
+ ((((long)data[7]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static long unpackEarliestTime(byte[] data)
+ {
+ if (data == null || data.length != 16)
+ return Long.MAX_VALUE;
+ return (((long)data[8]) & 0xffL) +
+ ((((long)data[9]) << 8) & 0xff00L) +
+ ((((long)data[10]) << 16) & 0xff0000L) +
+ ((((long)data[11]) << 24) & 0xff000000L) +
+ ((((long)data[12]) << 32) & 0xff00000000L) +
+ ((((long)data[13]) << 40) & 0xff0000000000L) +
+ ((((long)data[14]) << 48) & 0xff000000000000L) +
+ ((((long)data[15]) << 56) & 0xff00000000000000L);
+ }
+
+ protected static byte[] pack(double targetDouble, long earliestTime)
+ {
+ long target = Double.doubleToLongBits(targetDouble);
+ byte[] rval = new byte[16];
+ rval[0] = (byte)(target & 0xffL);
+ rval[1] = (byte)((target >> 8) & 0xffL);
+ rval[2] = (byte)((target >> 16) & 0xffL);
+ rval[3] = (byte)((target >> 24) & 0xffL);
+ rval[4] = (byte)((target >> 32) & 0xffL);
+ rval[5] = (byte)((target >> 40) & 0xffL);
+ rval[6] = (byte)((target >> 48) & 0xffL);
+ rval[7] = (byte)((target >> 56) & 0xffL);
+ rval[8] = (byte)(earliestTime & 0xffL);
+ rval[9] = (byte)((earliestTime >> 8) & 0xffL);
+ rval[10] = (byte)((earliestTime >> 16) & 0xffL);
+ rval[11] = (byte)((earliestTime >> 24) & 0xffL);
+ rval[12] = (byte)((earliestTime >> 32) & 0xffL);
+ rval[13] = (byte)((earliestTime >> 40) & 0xffL);
+ rval[14] = (byte)((earliestTime >> 48) & 0xffL);
+ rval[15] = (byte)((earliestTime >> 56) & 0xffL);
+ return rval;
+ }
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
new file mode 100644
index 0000000..dac1d39
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
@@ -0,0 +1,450 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+
+/** Throttles for a bin.
+* An instance of this class keeps track of the information needed to bandwidth throttle access
+* to a url belonging to a specific bin.
+*
+* In order to calculate
+* the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
+* For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
+* window length is too long.
+*
+* One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
+* "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
+* smallest intervals.
+*
+* Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
+* at the fastest possible rate without long pauses spent doing something else. The only complication is that
+* fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
+* For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
+* than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
+* acceptable.
+*
+* Some notes on the algorithms used to limit server bandwidth impact
+* ==================================================================
+*
+* In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
+* the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
+* is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
+* access as a way of estimating how long it will take to fetch those next n bytes.
+*
+* For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
+* and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
+*
+* 1) The first chunk of any series should proceed without interference from other connections to the same server.
+* The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
+*
+* 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
+* connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
+* took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
+* using less server bandwidth.
+*
+* 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
+* new chunks from other connections can start.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ThrottleBin
+{
+ /** This signals whether the bin is alive or not. */
+ protected boolean isAlive = true;
+ /** This is the bin name which this throttle belongs to. */
+ protected final String binName;
+ /** Service type name */
+ protected final String serviceTypeName;
+ /** The (anonymous) service name */
+ protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
+ /** The minimum milliseconds per byte */
+ protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+ /** The local minimum milliseconds per byte */
+ protected double localMinimum = Double.MAX_VALUE;
+
+ /** This is the reference count for this bin (which records active references) */
+ protected volatile int refCount = 0;
+ /** The inverse rate estimate of the first fetch, in ms/byte */
+ protected double rateEstimate = 0.0;
+ /** Flag indicating whether a rate estimate is needed */
+ protected volatile boolean estimateValid = false;
+ /** Flag indicating whether rate estimation is in progress yet */
+ protected volatile boolean estimateInProgress = false;
+ /** The start time of this series */
+ protected long seriesStartTime = -1L;
+ /** Total actual bytes read in this series; this includes fetches in progress */
+ protected long totalBytesRead = -1L;
+
+ /** The service type prefix for throttle bins */
+ protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
+
+ /** Constructor. */
+ public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+ throws ManifoldCFException
+ {
+ this.binName = binName;
+ this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+ // Now, register and activate service anonymously, and record the service name we get.
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+ }
+
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+ {
+ return serviceTypePrefix + throttlingGroupName + "_" + binName;
+ }
+
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Update minimumMillisecondsPerBytePerServer */
+ public synchronized void updateMinimumMillisecondsPerByte(double min)
+ {
+ this.minimumMillisecondsPerByte = min;
+ }
+
+ /** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
+ * May wait until schedule allows.
+ */
+ public void beginFetch()
+ {
+ synchronized (this)
+ {
+ if (refCount == 0)
+ {
+ // Now, reset bandwidth throttling counters
+ estimateValid = false;
+ rateEstimate = 0.0;
+ totalBytesRead = 0L;
+ estimateInProgress = false;
+ seriesStartTime = -1L;
+ }
+ refCount++;
+ }
+
+ }
+
+ /** Abort the fetch.
+ */
+ public void abortFetch()
+ {
+ synchronized (this)
+ {
+ refCount--;
+ }
+ }
+
+ /** Note the start of an individual byte read of a specified size. Call this method just before the
+ * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
+ *@return false if the wait was interrupted due to the bin being shut down.
+ */
+ public boolean beginRead(int byteCount)
+ throws InterruptedException
+ {
+
+ synchronized (this)
+ {
+ while (true)
+ {
+ if (!isAlive)
+ return false;
+ if (estimateInProgress)
+ {
+ wait();
+ continue;
+ }
+
+ // Update the current time
+ long currentTime = System.currentTimeMillis();
+
+ if (estimateValid == false)
+ {
+ seriesStartTime = currentTime;
+ estimateInProgress = true;
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+ // Exit early; this thread isn't going to do any waiting
+ return true;
+ }
+
+ // If we haven't set a proper throttle yet, wait until we do.
+ if (localMinimum == Double.MAX_VALUE)
+ {
+ wait();
+ continue;
+ }
+
+ // Estimate the time this read will take, and wait accordingly
+ long estimatedTime = (long)(rateEstimate * (double)byteCount);
+
+ // Figure out how long the total byte count should take, to meet the constraint
+ long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
+
+
+ // The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
+ // current time. But it can't be negative.
+ long waitTime = (desiredEndTime - estimatedTime) - currentTime;
+
+ // If no wait is needed, go ahead and update what needs to be updated and exit. Otherwise, do the wait.
+ if (waitTime <= 0L)
+ {
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+ return true;
+ }
+
+ this.wait(waitTime);
+ // Back around again...
+ }
+ }
+ }
+
+ /** Abort a read in progress.
+ */
+ public void abortRead()
+ {
+ synchronized (this)
+ {
+ if (estimateInProgress)
+ {
+ estimateInProgress = false;
+ notifyAll();
+ }
+ }
+ }
+
+ /** Note the end of an individual read from the server. Call this just after an individual read completes.
+ * Pass the actual number of bytes read to the method.
+ */
+ public void endRead(int originalCount, int actualCount)
+ {
+ synchronized (this)
+ {
+ totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
+ if (estimateInProgress)
+ {
+ if (actualCount == 0)
+ // Didn't actually get any bytes, so use 0.0
+ rateEstimate = 0.0;
+ else
+ rateEstimate = ((double)(System.currentTimeMillis() - seriesStartTime))/(double)actualCount;
+ estimateValid = true;
+ estimateInProgress = false;
+ notifyAll();
+ }
+ }
+ }
+
+ /** Note the end of a fetch operation. Call this method just after the fetch completes.
+ */
+ public boolean endFetch()
+ {
+ synchronized (this)
+ {
+ refCount--;
+ return (refCount == 0);
+ }
+
+ }
+
+ /** Poll this bin */
+ public synchronized void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+
+ // Enter write lock
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // The cross-cluster apportionment of byte fetching goes here.
+ // For byte-rate throttling, the apportioning algorithm is simple. First, it's done
+ // in bytes per millisecond, which is the inverse of what we actually use for the
+ // rest of this class. Each service posts its current value for the maximum bytes
+ // per millisecond, and a target value for the same.
+ // The target value is computed as follows:
+ // (1) Target is summed cross-cluster, excluding our local service. This is GlobalTarget.
+ // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+ // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+ // (4) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
+
+ // Compute MaximumTarget
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ double globalTarget = sumClass.getGlobalTarget();
+ double globalMaxBytesPerMillisecond;
+ double maximumTarget;
+ double fairTarget;
+ if (minimumMillisecondsPerByte == 0.0)
+ {
+ //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+ globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+ maximumTarget = globalMaxBytesPerMillisecond;
+ fairTarget = globalMaxBytesPerMillisecond;
+ }
+ else
+ {
+ globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+ //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+ maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+ if (maximumTarget < 0.0)
+ maximumTarget = 0.0;
+
+ // Compute FairTarget
+ fairTarget = globalMaxBytesPerMillisecond / numServices;
+ }
+
+ // Now compute actual target
+ double inverseTarget = maximumTarget;
+ if (inverseTarget > fairTarget)
+ inverseTarget = fairTarget;
+
+ //System.out.println(binName+":Inverse target = "+inverseTarget+"; maximumTarget = "+maximumTarget+"; fairTarget = "+fairTarget);
+
+ // Write these values to the service data variables.
+ // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+ // So, that's why we have a write lock around the pool calculations.
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget));
+
+ // Update our local minimum.
+ double target;
+ if (inverseTarget == 0.0)
+ target = Double.MAX_VALUE;
+ else
+ target = 1.0 / inverseTarget;
+
+ // Reset local minimum, if it has changed.
+ if (target == localMinimum)
+ return;
+ //System.out.println(binName+":Updating local minimum to "+target);
+ localMinimum = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
+ }
+
+ /** Shut down this bin.
+ */
+ public synchronized void shutDown(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ isAlive = false;
+ notifyAll();
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.endServiceActivity(serviceTypeName, serviceName);
+ }
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected double globalTargetTally = 0;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public double getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ }
+
+ protected static double unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0.0;
+ return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+ ((((long)data[1]) << 8) & 0xff00L) +
+ ((((long)data[2]) << 16) & 0xff0000L) +
+ ((((long)data[3]) << 24) & 0xff000000L) +
+ ((((long)data[4]) << 32) & 0xff00000000L) +
+ ((((long)data[5]) << 40) & 0xff0000000000L) +
+ ((((long)data[6]) << 48) & 0xff000000000000L) +
+ ((((long)data[7]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static byte[] pack(double targetDouble)
+ {
+ long target = Double.doubleToLongBits(targetDouble);
+ byte[] rval = new byte[8];
+ rval[0] = (byte)(target & 0xffL);
+ rval[1] = (byte)((target >> 8) & 0xffL);
+ rval[2] = (byte)((target >> 16) & 0xffL);
+ rval[3] = (byte)((target >> 24) & 0xffL);
+ rval[4] = (byte)((target >> 32) & 0xffL);
+ rval[5] = (byte)((target >> 40) & 0xffL);
+ rval[6] = (byte)((target >> 48) & 0xffL);
+ rval[7] = (byte)((target >> 56) & 0xffL);
+ return rval;
+ }
+
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java
new file mode 100644
index 0000000..cb38c42
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java
@@ -0,0 +1,134 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+
+/** An implementation of IThrottleGroups, which establishes a JVM-wide
+* pool of throttlers that can be used as a resource by any connector that needs
+* it.
+*/
+public class ThrottleGroups implements IThrottleGroups
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** The thread context */
+ protected final IThreadContext threadContext;
+
+ /** The actual static pool */
+ protected final static Throttler throttler = new Throttler();
+
+ /** Constructor */
+ public ThrottleGroups(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ this.threadContext = threadContext;
+ }
+
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
+ */
+ @Override
+ public Set<String> getThrottleGroups(String throttleGroupType)
+ throws ManifoldCFException
+ {
+ return throttler.getThrottleGroups(threadContext, throttleGroupType);
+ }
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ @Override
+ public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+ throws ManifoldCFException
+ {
+ throttler.removeThrottleGroup(threadContext, throttleGroupType, throttleGroup);
+ }
+
+ /** Set or update throttle specification for a throttle group. This creates the
+ * throttle group if it does not yet exist.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ @Override
+ public void createOrUpdateThrottleGroup(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ throws ManifoldCFException
+ {
+ throttler.createOrUpdateThrottleGroup(threadContext, throttleGroupType, throttleGroup, throttleSpec);
+ }
+
+ /** Construct connection throttler for connections with specific bin names. This object is meant to be embedded with a connection
+ * pool of similar objects, and used to gate the creation of new connections in that pool.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames are the connection type bin names.
+ *@return the connection throttling object, or null if the pool is being shut down.
+ */
+ @Override
+ public IConnectionThrottler obtainConnectionThrottler(String throttleGroupType, String throttleGroup, String[] binNames)
+ throws ManifoldCFException
+ {
+ java.util.Arrays.sort(binNames);
+ return throttler.obtainConnectionThrottler(threadContext, throttleGroupType, throttleGroup, binNames);
+ }
+
+ /** Poll periodically, to update cluster-wide statistics and allocation.
+ *@param throttleGroupType is the throttle group type to update.
+ */
+ @Override
+ public void poll(String throttleGroupType)
+ throws ManifoldCFException
+ {
+ throttler.poll(threadContext, throttleGroupType);
+ }
+
+ /** Poll periodically, to update ALL cluster-wide statistics and allocation.
+ */
+ @Override
+ public void poll()
+ throws ManifoldCFException
+ {
+ throttler.poll(threadContext);
+ }
+
+ /** Free unused resources.
+ */
+ @Override
+ public void freeUnusedResources()
+ throws ManifoldCFException
+ {
+ throttler.freeUnusedResources(threadContext);
+ }
+
+ /** Shut down throttler permanently.
+ */
+ @Override
+ public void destroy()
+ throws ManifoldCFException
+ {
+ throttler.destroy(threadContext);
+ }
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
new file mode 100644
index 0000000..e3d4406
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
@@ -0,0 +1,1111 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/** A Throttler object creates a virtual pool of connections to resources
+* whose access needs to be throttled in number, rate of use, and byte rate.
+* This code is modeled on the code for distributed connection pools, and is intended
+* to work in a similar manner. Basically, a periodic assessment is done about what the
+* local throttling parameters should be (on a per-pool basis), and the local throttling
+* activities then adjust what they are doing based on the new parameters. A service
+* model is used to keep track of which pools have what clients working with them.
+* This implementation has the advantage that:
+* (1) Only local throttling ever takes place on a method-by-method basis, which makes
+* it possible to use throttling even in streams and background threads;
+* (2) Throttling resources are apportioned fairly, on average, between all the various
+* cluster members, so it is unlikely that any persistent starvation conditions can
+* arise.
+*/
+public class Throttler
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Throttle group hash table. Keyed by throttle group type, value is throttling groups */
+ protected final Map<String,ThrottlingGroups> throttleGroupsHash = new HashMap<String,ThrottlingGroups>();
+
+ /** Create a throttler instance. Usually there will be one of these per connector
+ * type that needs throttling.
+ */
+ public Throttler()
+ {
+ }
+
+ // There are a lot of synchronizers to coordinate here. They are indeed hierarchical. It is not possible to simply
+ // throw a synchronizer at every level, and require that we hold all of them, because when we wait somewhere in the
+ // inner level, we will continue to hold locks and block access to all the outer levels.
+ //
+ // Instead, I've opted for a model whereby individual resources are protected. This is tricky to coordinate, though,
+ // because (for instance) after a resource has been removed from the hash table, it had better be cleaned up
+ // thoroughly before the outer lock is removed, or two versions of the resource might wind up coming into existence.
+ // The general rule is therefore:
+ // (1) Creation or deletion of resources involves locking the parent where the resource is being added or removed
+ // (2) Anything that waits CANNOT also add or remove.
+
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
+ */
+ public Set<String> getThrottleGroups(IThreadContext threadContext, String throttleGroupType)
+ throws ManifoldCFException
+ {
+ synchronized (throttleGroupsHash)
+ {
+ return throttleGroupsHash.keySet();
+ }
+ }
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ public void removeThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup)
+ throws ManifoldCFException
+ {
+ // Removal. Lock the whole hierarchy.
+ synchronized (throttleGroupsHash)
+ {
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg != null)
+ {
+ tg.removeThrottleGroup(threadContext, throttleGroup);
+ }
+ }
+ }
+
+ /** Set or update throttle specification for a throttle group. This creates the
+ * throttle group if it does not yet exist.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ public void createOrUpdateThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ throws ManifoldCFException
+ {
+ // Potential addition. Lock the whole hierarchy.
+ synchronized (throttleGroupsHash)
+ {
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg == null)
+ {
+ tg = new ThrottlingGroups(throttleGroupType);
+ throttleGroupsHash.put(throttleGroupType, tg);
+ }
+ tg.createOrUpdateThrottleGroup(threadContext, throttleGroup, throttleSpec);
+ }
+ }
+
+ /** Construct connection throttler for connections with specific bin names. This object is meant to be embedded with a connection
+ * pool of similar objects, and used to gate the creation of new connections in that pool.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames are the connection type bin names.
+ *@return the connection throttling object, or null if the pool is being shut down.
+ */
+ public IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
+ throws ManifoldCFException
+ {
+ // No waiting, so lock the entire tree.
+ synchronized (throttleGroupsHash)
+ {
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg != null)
+ return tg.obtainConnectionThrottler(threadContext, throttleGroup, binNames);
+ return null;
+ }
+ }
+
+ /** Poll periodically.
+ */
+ public void poll(IThreadContext threadContext, String throttleGroupType)
+ throws ManifoldCFException
+ {
+ // No waiting, so lock the entire tree.
+ synchronized (throttleGroupsHash)
+ {
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg != null)
+ tg.poll(threadContext);
+ }
+
+ }
+
+ /** Poll ALL bins periodically.
+ */
+ public void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // No waiting, so lock the entire tree.
+ synchronized (throttleGroupsHash)
+ {
+ for (ThrottlingGroups tg : throttleGroupsHash.values())
+ {
+ tg.poll(threadContext);
+ }
+ }
+
+ }
+
+ /** Free unused resources.
+ */
+ public void freeUnusedResources(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // This potentially affects the entire hierarchy.
+ // Go through the whole pool and clean it out
+ synchronized (throttleGroupsHash)
+ {
+ Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroups p = iter.next();
+ p.freeUnusedResources(threadContext);
+ }
+ }
+ }
+
+ /** Shut down all throttlers and deregister them.
+ */
+ public void destroy(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // This affects the entire hierarchy, so lock the whole thing.
+ // Go through the whole pool and clean it out
+ synchronized (throttleGroupsHash)
+ {
+ Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroups p = iter.next();
+ p.destroy(threadContext);
+ iter.remove();
+ }
+ }
+ }
+
+ // Protected methods and classes
+
+ protected static String buildThrottlingGroupName(String throttlingGroupType, String throttlingGroupName)
+ {
+ return throttlingGroupType + "_" + throttlingGroupName;
+ }
+
+ /** This class represents a throttling group pool */
+ protected class ThrottlingGroups
+ {
+ /** The throttling group type for this throttling group pool */
+ protected final String throttlingGroupTypeName;
+ /** The pool of individual throttle group services for this pool, keyed by throttle group name */
+ protected final Map<String,ThrottlingGroup> groups = new HashMap<String,ThrottlingGroup>();
+
+ public ThrottlingGroups(String throttlingGroupTypeName)
+ {
+ this.throttlingGroupTypeName = throttlingGroupTypeName;
+ }
+
+ /** Update throttle specification */
+ public void createOrUpdateThrottleGroup(IThreadContext threadContext, String throttleGroup, IThrottleSpec throttleSpec)
+ throws ManifoldCFException
+ {
+ synchronized (groups)
+ {
+ ThrottlingGroup g = groups.get(throttleGroup);
+ if (g == null)
+ {
+ g = new ThrottlingGroup(threadContext, throttlingGroupTypeName, throttleGroup, throttleSpec);
+ groups.put(throttleGroup, g);
+ }
+ else
+ {
+ g.updateThrottleSpecification(throttleSpec);
+ }
+ }
+ }
+
+ /** Obtain connection throttler.
+ *@return the throttler, or null of the hierarchy has changed.
+ */
+ public IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String throttleGroup, String[] binNames)
+ throws ManifoldCFException
+ {
+ synchronized (groups)
+ {
+ ThrottlingGroup g = groups.get(throttleGroup);
+ if (g == null)
+ return null;
+ return g.obtainConnectionThrottler(threadContext, binNames);
+ }
+ }
+
+ /** Remove specified throttle group */
+ public void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
+ throws ManifoldCFException
+ {
+ // Must synch the whole thing, because otherwise there would be a risk of someone recreating the
+ // group right after we removed it from the map, and before we destroyed it.
+ synchronized (groups)
+ {
+ ThrottlingGroup g = groups.remove(throttleGroup);
+ if (g != null)
+ {
+ g.destroy(threadContext);
+ }
+ }
+ }
+
+ /** Poll this set of throttle groups.
+ */
+ public void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ synchronized (groups)
+ {
+ Iterator<String> iter = groups.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String throttleGroup = iter.next();
+ ThrottlingGroup p = groups.get(throttleGroup);
+ p.poll(threadContext);
+ }
+ }
+ }
+
+ /** Free unused resources */
+ public void freeUnusedResources(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ synchronized (groups)
+ {
+ Iterator<ThrottlingGroup> iter = groups.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroup g = iter.next();
+ g.freeUnusedResources(threadContext);
+ }
+ }
+ }
+
+ /** Destroy and shutdown all */
+ public void destroy(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ synchronized (groups)
+ {
+ Iterator<ThrottlingGroup> iter = groups.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroup p = iter.next();
+ p.destroy(threadContext);
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /** This class represents a throttling group, of a specific throttling group type. It basically
+ * describes an entire self-consistent throttling environment.
+ */
+ protected class ThrottlingGroup
+ {
+ /** The throttling group name */
+ protected final String throttlingGroupName;
+ /** The current throttle spec */
+ protected IThrottleSpec throttleSpec;
+
+ /** The connection bins */
+ protected final Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
+ /** The fetch bins */
+ protected final Map<String,FetchBin> fetchBins = new HashMap<String,FetchBin>();
+ /** The throttle bins */
+ protected final Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
+
+ // For synchronization, we use several in this class.
+ // Modification to the connectionBins, fetchBins, or throttleBins hashes uses the appropriate local synchronizer.
+ // Changes to other local variables use the main synchronizer.
+
+ /** Constructor
+ */
+ public ThrottlingGroup(IThreadContext threadContext, String throttlingGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ throws ManifoldCFException
+ {
+ this.throttlingGroupName = buildThrottlingGroupName(throttlingGroupType, throttleGroup);
+ this.throttleSpec = throttleSpec;
+ // Once all that is done, perform the initial setting of all the bin cutoffs
+ poll(threadContext);
+ }
+
+ /** Create a bunch of bins, corresponding to the bin names specified.
+ * Note that this also registers them as services etc.
+ *@param binNames describes the set of bins to create.
+ */
+ public synchronized IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String[] binNames)
+ throws ManifoldCFException
+ {
+ synchronized (connectionBins)
+ {
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin == null)
+ {
+ bin = new ConnectionBin(threadContext, throttlingGroupName, binName);
+ connectionBins.put(binName, bin);
+ }
+ }
+ }
+
+ synchronized (fetchBins)
+ {
+ for (String binName : binNames)
+ {
+ FetchBin bin = fetchBins.get(binName);
+ if (bin == null)
+ {
+ bin = new FetchBin(threadContext, throttlingGroupName, binName);
+ fetchBins.put(binName, bin);
+ }
+ }
+ }
+
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin == null)
+ {
+ bin = new ThrottleBin(threadContext, throttlingGroupName, binName);
+ throttleBins.put(binName, bin);
+ }
+ }
+ }
+
+ return new ConnectionThrottler(this, binNames);
+ }
+
+ /** Update the throttle spec.
+ *@param throttleSpec is the new throttle spec for this throttle group.
+ */
+ public synchronized void updateThrottleSpecification(IThrottleSpec throttleSpec)
+ throws ManifoldCFException
+ {
+ this.throttleSpec = throttleSpec;
+ }
+
+
+ // IConnectionThrottler support methods
+
+ /** Wait for a connection to become available.
+ *@param poolCount is a description of how many connections
+ * are available in the current pool, across all bins.
+ *@return the IConnectionThrottler codes for results.
+ */
+ public int waitConnectionAvailable(String[] binNames, AtomicInteger poolCount)
+ throws InterruptedException
+ {
+ // Each bin can signal something different. Bins that signal
+ // CONNECTION_FROM_NOWHERE are shutting down, but there's also
+ // apparently the conflicting possibilities of distinct answers of
+ // CONNECTION_FROM_POOL and CONNECTION_FROM_CREATION.
+ // However: the pool count we track is in fact N * the actual pool count,
+ // where N is the number of bins in each connection. This means that a conflict
+ // is ALWAYS due to two entities simultaneously calling waitConnectionAvailable(),
+ // and deadlocking each other. The solution is therefore to back off and retry.
+
+ // This is the retry loop
+ while (true)
+ {
+ int currentRecommendation = IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+
+ boolean retry = false;
+
+ // First, make sure all the bins exist, and reserve a slot in each
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ConnectionBin bin;
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ {
+ // Reserve a slot
+ int result = bin.waitConnectionAvailable(poolCount);
+ if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE ||
+ (currentRecommendation != IConnectionThrottler.CONNECTION_FROM_NOWHERE && currentRecommendation != result))
+ {
+ // Release previous reservations, and either return, or retry
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.undoReservation(currentRecommendation, poolCount);
+ }
+ if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+ return result;
+ // Break out of the outer loop so we can retry
+ retry = true;
+ break;
+ }
+ if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+ currentRecommendation = result;
+ }
+ i++;
+ }
+
+ if (retry)
+ continue;
+
+ // Complete the reservation process (if that is what we decided)
+ if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ // All reservations have been made! Convert them.
+ for (String binName : binNames)
+ {
+ ConnectionBin bin;
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.noteConnectionCreation();
+ }
+ }
+
+ return currentRecommendation;
+ }
+
+ }
+
+ public IFetchThrottler getNewConnectionFetchThrottler(String[] binNames)
+ {
+ return new FetchThrottler(this, binNames);
+ }
+
+ public boolean noteReturnedConnection(String[] binNames)
+ {
+ // If ANY of the bins think the connection should be destroyed, then that will be
+ // the recommendation.
+ synchronized (connectionBins)
+ {
+ boolean destroyConnection = false;
+
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ {
+ destroyConnection |= bin.shouldReturnedConnectionBeDestroyed();
+ }
+ }
+
+ return destroyConnection;
+ }
+ }
+
+ public boolean checkDestroyPooledConnection(String[] binNames, AtomicInteger poolCount)
+ {
+ // Only if all believe we can destroy a pool connection, will we do it.
+ // This is because some pools may be empty, etc.
+ synchronized (connectionBins)
+ {
+ boolean destroyConnection = false;
+
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ {
+ int result = bin.shouldPooledConnectionBeDestroyed(poolCount);
+ if (result == ConnectionBin.CONNECTION_POOLEMPTY)
+ {
+ // Give up now, and undo all the other bins
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ bin = connectionBins.get(binName);
+ bin.undoPooledConnectionDecision(poolCount);
+ }
+ return false;
+ }
+ else if (result == ConnectionBin.CONNECTION_DESTROY)
+ {
+ destroyConnection = true;
+ }
+ }
+ i++;
+ }
+
+ if (destroyConnection)
+ return true;
+
+ // Undo pool reservation, since everything is apparently within bounds.
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ bin.undoPooledConnectionDecision(poolCount);
+ }
+
+ return false;
+ }
+
+ }
+
+ /** Connection expiration is tricky, because even though a connection may be identified as
+ * being expired, at the very same moment it could be handed out in another thread. So there
+ * is a natural race condition present.
+ * The way the connection throttler deals with that is to allow the caller to reserve a connection
+ * for expiration. This must be called BEFORE the actual identified connection is removed from the
+ * connection pool. If the value returned by this method is "true", then a connection MUST be removed
+ * from the pool and destroyed, whether or not the identified connection is actually still available for
+ * destruction or not.
+ *@return true if a connection from the pool can be expired. If true is returned, noteConnectionDestruction()
+ * MUST be called once the connection has actually been destroyed.
+ */
+ public boolean checkExpireConnection(String[] binNames, AtomicInteger poolCount)
+ {
+ synchronized (connectionBins)
+ {
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ {
+ if (!bin.hasPooledConnection(poolCount))
+ {
+ // Give up now, and undo all the other bins
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ bin = connectionBins.get(binName);
+ bin.undoPooledConnectionDecision(poolCount);
+ }
+ return false;
+ }
+ }
+ i++;
+ }
+ return true;
+ }
+ }
+
+ public void noteConnectionReturnedToPool(String[] binNames, AtomicInteger poolCount)
+ {
+ synchronized (connectionBins)
+ {
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ bin.noteConnectionReturnedToPool(poolCount);
+ }
+ }
+ }
+
+ public void noteConnectionDestroyed(String[] binNames)
+ {
+ synchronized (connectionBins)
+ {
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ bin.noteConnectionDestroyed();
+ }
+ }
+ }
+
+ // IFetchThrottler support methods
+
+ /** Get permission to fetch a document. This grants permission to start
+ * fetching a single document, within the connection that has already been
+ * granted permission that created this object.
+ *@param binNames are the names of the bins.
+ *@return false if being shut down
+ */
+ public boolean obtainFetchDocumentPermission(String[] binNames)
+ throws InterruptedException
+ {
+ // First, make sure all the bins exist, and reserve a slot in each
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ FetchBin bin;
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ // Reserve a slot
+ if (bin == null || !bin.reserveFetchRequest())
+ {
+ // Release previous reservations, and return null
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ }
+ return false;
+ }
+ i++;
+ }
+
+ // All reservations have been made! Convert them.
+ // (These are guaranteed to succeed - but they may wait)
+ i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ FetchBin bin;
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ {
+ if (!bin.waitNextFetch())
+ {
+ // Undo the reservations we haven't processed yet
+ while (i < binNames.length)
+ {
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ i++;
+ }
+ return false;
+ }
+ }
+ i++;
+ }
+ return true;
+ }
+
+ public IStreamThrottler createFetchStream(String[] binNames)
+ {
+ // Do a "begin fetch" for all throttle bins
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin != null)
+ bin.beginFetch();
+ }
+ }
+
+ return new StreamThrottler(this, binNames);
+ }
+
+ // IStreamThrottler support methods
+
+ /** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
+ * The throttle group, bin names, etc are already known
+ * to this specific interface object, so it is unnecessary to include them here.
+ *@param byteCount is the number of bytes to get permissions to read.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
+ */
+ public boolean obtainReadPermission(String[] binNames, int byteCount)
+ throws InterruptedException
+ {
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ThrottleBin bin;
+ synchronized (throttleBins)
+ {
+ bin = throttleBins.get(binName);
+ }
+ if (bin == null || !bin.beginRead(byteCount))
+ {
+ // End bins we've already done, and exit
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (throttleBins)
+ {
+ bin = throttleBins.get(binName);
+ }
+ if (bin != null)
+ bin.endRead(byteCount,0);
+ }
+ return false;
+ }
+ i++;
+ }
+ return true;
+ }
+
+ /** Note the completion of the read of a block of bytes. Call this after
+ * obtainReadPermission() was successfully called, and bytes were successfully read.
+ *@param origByteCount is the originally requested number of bytes to get permissions to read.
+ *@param actualByteCount is the number of bytes actually read.
+ */
+ public void releaseReadPermission(String[] binNames, int origByteCount, int actualByteCount)
+ {
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin != null)
+ bin.endRead(origByteCount, actualByteCount);
+ }
+ }
+ }
+
+ /** Note the stream being closed.
+ */
+ public void closeStream(String[] binNames)
+ {
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin != null)
+ bin.endFetch();
+ }
+ }
+ }
+
+ // Bookkeeping methods
+
+ /** Call this periodically.
+ */
+ public synchronized void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // Go through all existing bins and update each one.
+ synchronized (connectionBins)
+ {
+ for (ConnectionBin bin : connectionBins.values())
+ {
+ bin.updateMaxActiveConnections(throttleSpec.getMaxOpenConnections(bin.getBinName()));
+ bin.poll(threadContext);
+ }
+ }
+
+ synchronized (fetchBins)
+ {
+ for (FetchBin bin : fetchBins.values())
+ {
+ bin.updateMinTimeBetweenFetches(throttleSpec.getMinimumMillisecondsPerFetch(bin.getBinName()));
+ bin.poll(threadContext);
+ }
+ }
+
+ synchronized (throttleBins)
+ {
+ for (ThrottleBin bin : throttleBins.values())
+ {
+ bin.updateMinimumMillisecondsPerByte(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
+ bin.poll(threadContext);
+ }
+ }
+
+ }
+
+ /** Free unused resources.
+ */
+ public synchronized void freeUnusedResources(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // Does nothing; there are not really resources to free
+ }
+
+ /** Destroy this pool.
+ */
+ public synchronized void destroy(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ synchronized (connectionBins)
+ {
+ Iterator<ConnectionBin> binIter = connectionBins.values().iterator();
+ while (binIter.hasNext())
+ {
+ ConnectionBin bin = binIter.next();
+ bin.shutDown(threadContext);
+ binIter.remove();
+ }
+ }
+
+ synchronized (fetchBins)
+ {
+ Iterator<FetchBin> binIter = fetchBins.values().iterator();
+ while (binIter.hasNext())
+ {
+ FetchBin bin = binIter.next();
+ bin.shutDown(threadContext);
+ binIter.remove();
+ }
+ }
+
+ synchronized (throttleBins)
+ {
+ Iterator<ThrottleBin> binIter = throttleBins.values().iterator();
+ while (binIter.hasNext())
+ {
+ ThrottleBin bin = binIter.next();
+ bin.shutDown(threadContext);
+ binIter.remove();
+ }
+ }
+
+ }
+ }
+
+ /** Connection throttler implementation class.
+ * This class instance stores some parameters and links back to ThrottlingGroup. But each class instance
+ * models a connection pool with the specified bins. But the description of each pool consists of more than just
+ * the bin names that describe the throttling - it also may include connection parameters which we have
+ * no insight into at this level.
+ *
+ * Thus, in order to do pool tracking properly, we cannot simply rely on the individual connection bin instances
+ * to do all the work, since they cannot distinguish between different pools properly. So that leaves us with
+ * two choices. (1) We can somehow push the separate pool instance parameters down to the connection bin
+ * level, or (2) the connection bins cannot actually do any waiting or blocking.
+ *
+ * The benefit of having blocking take place in connection bins is that they are in fact designed to be precisely
+ * the thing you would want to synchronize on. If we presume that the waits happen in those classes,
+ * then we need the ability to send in our local pool count to them, and we need to be able to "wake up"
+ * those underlying classes when the local pool count changes.
+ */
+ protected static class ConnectionThrottler implements IConnectionThrottler
+ {
+ protected final ThrottlingGroup parent;
+ protected final String[] binNames;
+
+ // Keep track of local pool parameters.
+
+ /** This is the number of connections in the pool, times the number of bins per connection */
+ protected final AtomicInteger poolCount = new AtomicInteger(0);
+
+ public ConnectionThrottler(ThrottlingGroup parent, String[] binNames)
+ {
+ this.parent = parent;
+ this.binNames = binNames;
+ }
+
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
+ */
+ @Override
+ public int waitConnectionAvailable()
+ throws InterruptedException
+ {
+ return parent.waitConnectionAvailable(binNames, poolCount);
+ }
+
+ /** For a new connection, obtain the fetch throttler to use for the connection.
+ * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+ * the calling code is expected to create a connection using the result of this method.
+ *@return the fetch throttler for a new connection.
+ */
+ @Override
+ public IFetchThrottler getNewConnectionFetchThrottler()
+ {
+ return parent.getNewConnectionFetchThrottler(binNames);
+ }
+
+ /** For returning a connection from use, there is only one method. This method signals
+ /* whether a formerly in-use connection should be placed back in the pool or destroyed.
+ *@return true if the connection should NOT be put into the pool but should instead
+ * simply be destroyed. If true is returned, the caller MUST call noteConnectionDestroyed()
+ * (below) in order for the bookkeeping to work.
+ */
+ @Override
+ public boolean noteReturnedConnection()
+ {
+ return parent.noteReturnedConnection(binNames);
+ }
+
+ /** This method calculates whether a connection should be taken from the pool and destroyed
+ /* in order to meet quota requirements. If this method returns
+ /* true, you MUST remove a connection from the pool, and you MUST call
+ /* noteConnectionDestroyed() afterwards.
+ *@return true if a pooled connection should be destroyed. If true is returned, the
+ * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+ */
+ @Override
+ public boolean checkDestroyPooledConnection()
+ {
+ return parent.checkDestroyPooledConnection(binNames, poolCount);
+ }
+
+ /** Connection expiration is tricky, because even though a connection may be identified as
+ * being expired, at the very same moment it could be handed out in another thread. So there
+ * is a natural race condition present.
+ * The way the connection throttler deals with that is to allow the caller to reserve a connection
+ * for expiration. This must be called BEFORE the actual identified connection is removed from the
+ * connection pool. If the value returned by this method is "true", then a connection MUST be removed
+ * from the pool and destroyed, whether or not the identified connection is actually still available for
+ * destruction or not.
+ *@return true if a connection from the pool can be expired. If true is returned, noteConnectionDestruction()
+ * MUST be called once the connection has actually been destroyed.
+ */
+ @Override
+ public boolean checkExpireConnection()
+ {
+ return parent.checkExpireConnection(binNames, poolCount);
+ }
+
+ /** Note that a connection has been returned to the pool. Call this method after a connection has been
+ * placed back into the pool and is available for use.
+ */
+ @Override
+ public void noteConnectionReturnedToPool()
+ {
+ parent.noteConnectionReturnedToPool(binNames, poolCount);
+ }
+
+ /** Note that a connection has been destroyed. Call this method ONLY after noteReturnedConnection()
+ * or checkDestroyPooledConnection() returns true, AND the connection has been already
+ * destroyed.
+ */
+ @Override
+ public void noteConnectionDestroyed()
+ {
+ parent.noteConnectionDestroyed(binNames);
+ }
+ }
+
+ /** Fetch throttler implementation class.
+ * This basically stores some parameters and links back to ThrottlingGroup.
+ */
+ protected static class FetchThrottler implements IFetchThrottler
+ {
+ protected final ThrottlingGroup parent;
+ protected final String[] binNames;
+
+ public FetchThrottler(ThrottlingGroup parent, String[] binNames)
+ {
+ this.parent = parent;
+ this.binNames = binNames;
+ }
+
+ /** Get permission to fetch a document. This grants permission to start
+ * fetching a single document, within the connection that has already been
+ * granted permission that created this object.
+ *@return false if the throttler is being shut down.
+ */
+ @Override
+ public boolean obtainFetchDocumentPermission()
+ throws InterruptedException
+ {
+ return parent.obtainFetchDocumentPermission(binNames);
+ }
+
+ /** Open a fetch stream. When done (or aborting), call
+ * IStreamThrottler.closeStream() to note the completion of the document
+ * fetch activity.
+ *@return the stream throttler to use to throttle the actual data access.
+ */
+ @Override
+ public IStreamThrottler createFetchStream()
+ {
+ return parent.createFetchStream(binNames);
+ }
+
+ }
+
+ /** Stream throttler implementation class.
+ * This basically stores some parameters and links back to ThrottlingGroup.
+ */
+ protected static class StreamThrottler implements IStreamThrottler
+ {
+ protected final ThrottlingGroup parent;
+ protected final String[] binNames;
+
+ public StreamThrottler(ThrottlingGroup parent, String[] binNames)
+ {
+ this.parent = parent;
+ this.binNames = binNames;
+ }
+
+ /** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
+ * The throttle group, bin names, etc are already known
+ * to this specific interface object, so it is unnecessary to include them here.
+ *@param byteCount is the number of bytes to get permissions to read.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
+ */
+ @Override
+ public boolean obtainReadPermission(int byteCount)
+ throws InterruptedException
+ {
+ return parent.obtainReadPermission(binNames, byteCount);
+ }
+
+ /** Note the completion of the read of a block of bytes. Call this after
+ * obtainReadPermission() was successfully called, and bytes were successfully read.
+ *@param origByteCount is the originally requested number of bytes to get permissions to read.
+ *@param actualByteCount is the number of bytes actually read.
+ */
+ @Override
+ public void releaseReadPermission(int origByteCount, int actualByteCount)
+ {
+ parent.releaseReadPermission(binNames, origByteCount, actualByteCount);
+ }
+
+ /** Note the stream being closed.
+ */
+ @Override
+ public void closeStream()
+ {
+ parent.closeStream(binNames);
+ }
+
+ }
+
+}
diff --git a/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java b/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
new file mode 100644
index 0000000..2a38d01
--- /dev/null
+++ b/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
@@ -0,0 +1,516 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import org.junit.*;
+import static org.junit.Assert.*;
+
+public class TestThrottler extends org.apache.manifoldcf.core.tests.BaseDerby
+{
+ @Test
+ public void multiThreadConnectionPoolTest()
+ throws Exception
+ {
+ // First, create the throttle group.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+ tg.createOrUpdateThrottleGroup("test","test",new ThrottleSpec());
+
+ // We create a pretend connection pool
+ IConnectionThrottler connectionThrottler = tg.obtainConnectionThrottler("test","test",new String[]{"A","B","C"});
+ System.out.println("Connection throttler obtained");
+
+ // How best to test this?
+ // Well, what I'm going to do is to have multiple threads active. Each one will do perfectly sensible things
+ // while generating a log that includes timestamps for everything that happens. At the end, the log will be
+ // analyzed for violations of throttling policy.
+
+ PollingThread pt = new PollingThread();
+ pt.start();
+
+ EventLog eventLog = new EventLog();
+
+ int numThreads = 10;
+
+ TesterThread[] threads = new TesterThread[numThreads];
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i] = new TesterThread(connectionThrottler, eventLog);
+ threads[i].start();
+ }
+
+ // Now, join all the threads at the end
+ for (int i = 0; i < numThreads; i++)
+ {
+ threads[i].finishUp();
+ }
+
+ pt.interrupt();
+ pt.finishUp();
+
+ // Shut down the throttle group
+ tg.removeThrottleGroup("test","test");
+
+ // Finally, do the log analysis
+ eventLog.analyze();
+
+ System.out.println("Done test");
+ }
+
+ protected static class PollingThread extends Thread
+ {
+ protected Throwable exception = null;
+
+ public PollingThread()
+ {
+ }
+
+ public void run()
+ {
+ try
+ {
+ IThreadContext threadContext = ThreadContextFactory.make();
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ while (true)
+ {
+ throttleGroups.poll("test");
+ Thread.sleep(1000L);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ catch (Exception e)
+ {
+ exception = e;
+ }
+
+ }
+
+ public void finishUp()
+ throws Exception
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof Exception)
+ throw (Exception)exception;
+ else
+ throw new RuntimeException("Unknown exception: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ }
+
+ }
+
+ protected static class TesterThread extends Thread
+ {
+ protected final EventLog eventLog;
+ protected final IConnectionThrottler connectionThrottler;
+ protected Throwable exception = null;
+
+ public TesterThread(IConnectionThrottler connectionThrottler, EventLog eventLog)
+ {
+ this.connectionThrottler = connectionThrottler;
+ this.eventLog = eventLog;
+ }
+
+ public void run()
+ {
+ try
+ {
+ int numberConnectionCycles = 3;
+ int numberFetchesPerCycle = 3;
+
+ for (int k = 0; k < numberConnectionCycles; k++)
+ {
+ // First grab a connection.
+ int rval = connectionThrottler.waitConnectionAvailable();
+ if (rval == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+ throw new Exception("Unexpected return value from waitConnectionAvailable()");
+ IFetchThrottler fetchThrottler;
+ if (rval == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ // Pretend to create the connection
+ eventLog.addLogEntry(new ConnectionCreatedEvent());
+ }
+ else
+ {
+ // Pretend to get it from the pool
+ eventLog.addLogEntry(new ConnectionFromPoolEvent());
+ }
+ fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+
+ for (int l = 0; l < numberFetchesPerCycle; l++)
+ {
+ // Perform a fake fetch
+ if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ throw new Exception("Unexpected return value for obtainFetchDocumentPermission()");
+ eventLog.addLogEntry(new FetchStartEvent());
+ IStreamThrottler streamThrottler = fetchThrottler.createFetchStream();
+ try
+ {
+ // Do one read
+ if (streamThrottler.obtainReadPermission(1000) == false)
+ throw new Exception("False from obtainReadPermission!");
+ eventLog.addLogEntry(new ReadStartEvent(1000));
+ streamThrottler.releaseReadPermission(1000, 1000);
+ eventLog.addLogEntry(new ReadDoneEvent(1000));
+ // Do another read
+ if (streamThrottler.obtainReadPermission(1000) == false)
+ throw new Exception("False from obtainReadPermission!");
+ eventLog.addLogEntry(new ReadStartEvent(1000));
+ streamThrottler.releaseReadPermission(1000, 1000);
+ eventLog.addLogEntry(new ReadDoneEvent(1000));
+ // Do a third read
+ if (streamThrottler.obtainReadPermission(1000) == false)
+ throw new Exception("False from obtainReadPermission!");
+ eventLog.addLogEntry(new ReadStartEvent(1000));
+ streamThrottler.releaseReadPermission(1000, 100);
+ eventLog.addLogEntry(new ReadDoneEvent(100));
+ }
+ finally
+ {
+ // Close the stream
+ streamThrottler.closeStream();
+ }
+ eventLog.addLogEntry(new FetchDoneEvent());
+ }
+
+ // Pretend to release the connection
+ boolean destroyIt = connectionThrottler.noteReturnedConnection();
+ if (destroyIt)
+ {
+ eventLog.addLogEntry(new ConnectionDestroyedEvent());
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ else
+ {
+ eventLog.addLogEntry(new ConnectionReturnedToPoolEvent());
+ connectionThrottler.noteConnectionReturnedToPool();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ exception = e;
+ }
+ }
+
+ public void finishUp()
+ throws Exception
+ {
+ join();
+ if (exception != null)
+ {
+ if (exception instanceof RuntimeException)
+ throw (RuntimeException)exception;
+ else if (exception instanceof Error)
+ throw (Error)exception;
+ else if (exception instanceof Exception)
+ throw (Exception)exception;
+ else
+ throw new RuntimeException("Unknown exception: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+ }
+ }
+
+ }
+
+ protected static class ThrottleSpec implements IThrottleSpec
+ {
+ public ThrottleSpec()
+ {
+ }
+
+ /** Given a bin name, find the max open connections to use for that bin.
+ *@return -1 if no limit found.
+ */
+ @Override
+ public int getMaxOpenConnections(String binName)
+ {
+ if (binName.equals("A"))
+ return 3;
+ if (binName.equals("B"))
+ return 4;
+ return Integer.MAX_VALUE;
+ }
+
+ /** Look up minimum milliseconds per byte for a bin.
+ *@return 0.0 if no limit found.
+ */
+ @Override
+ public double getMinimumMillisecondsPerByte(String binName)
+ {
+ if (binName.equals("B"))
+ return 1.0;
+ if (binName.equals("C"))
+ return 1.5;
+ return 0.0;
+ }
+
+ /** Look up minimum milliseconds for a fetch for a bin.
+ *@return 0 if no limit found.
+ */
+ @Override
+ public long getMinimumMillisecondsPerFetch(String binName)
+ {
+ if (binName.equals("A"))
+ return 5;
+ if (binName.equals("C"))
+ return 20;
+ return 0;
+ }
+
+ }
+
+ protected static class EventLog
+ {
+ protected final List<LogEntry> logList = new ArrayList<LogEntry>();
+
+ public EventLog()
+ {
+ }
+
+ public synchronized void addLogEntry(LogEntry x)
+ {
+ System.out.println(x.toString());
+ logList.add(x);
+ }
+
+ public synchronized void analyze()
+ throws Exception
+ {
+ State s = new State();
+ for (LogEntry l : logList)
+ {
+ l.apply(s);
+ }
+ // Success!
+ }
+
+ }
+
+ protected static abstract class LogEntry
+ {
+ protected final long timestamp;
+
+ public LogEntry(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ public abstract void apply(State state)
+ throws Exception;
+
+ public String toString()
+ {
+ return "Time: "+timestamp;
+ }
+
+ }
+
+ protected static class ConnectionCreatedEvent extends LogEntry
+ {
+ public ConnectionCreatedEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ if (state.outstandingConnections + 1 > 3)
+ throw new Exception("Too many outstanding connections at once!");
+ state.outstandingConnections++;
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Connection created";
+ }
+
+ }
+
+ protected static class ConnectionDestroyedEvent extends LogEntry
+ {
+ public ConnectionDestroyedEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ state.outstandingConnections--;
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Connection destroyed";
+ }
+
+ }
+
+ protected static class ConnectionFromPoolEvent extends LogEntry
+ {
+ public ConnectionFromPoolEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Connection from pool";
+ }
+
+ }
+
+ protected static class ConnectionReturnedToPoolEvent extends LogEntry
+ {
+ public ConnectionReturnedToPoolEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Connection back to pool";
+ }
+
+ }
+
+ protected static class FetchStartEvent extends LogEntry
+ {
+ public FetchStartEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ if (timestamp < state.lastFetch + 20L - 1L)
+ throw new Exception("Fetch too fast: took place in "+ (timestamp - state.lastFetch) + " milliseconds");
+ state.lastFetch = timestamp;
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Fetch start";
+ }
+ }
+
+ protected static class FetchDoneEvent extends LogEntry
+ {
+ public FetchDoneEvent()
+ {
+ super(System.currentTimeMillis());
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Fetch done";
+ }
+ }
+
+ protected static class ReadStartEvent extends LogEntry
+ {
+ final int proposed;
+
+ public ReadStartEvent(int proposed)
+ {
+ super(System.currentTimeMillis());
+ this.proposed = proposed;
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ if (state.firstByteReadTime == -1L)
+ state.firstByteReadTime = timestamp;
+ else
+ {
+ // Calculate running minimum amount of time it should have taken for the bytes given
+ long minTime = (long)(((double)state.byteTotal) * 1.5 + 0.5);
+ if (timestamp - state.firstByteReadTime < minTime)
+ throw new Exception("Took too short a time to read "+state.byteTotal+" bytes: "+(timestamp - state.firstByteReadTime));
+ }
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Read start("+proposed+")";
+ }
+ }
+
+ protected static class ReadDoneEvent extends LogEntry
+ {
+ final int actual;
+
+ public ReadDoneEvent(int actual)
+ {
+ super(System.currentTimeMillis());
+ this.actual = actual;
+ }
+
+ public void apply(State state)
+ throws Exception
+ {
+ state.byteTotal += actual;
+ }
+
+ public String toString()
+ {
+ return super.toString() + "; Read done("+actual+")";
+ }
+ }
+
+ protected static class State
+ {
+ public int outstandingConnections = 0;
+ public long lastFetch = 0L;
+ public long firstByteReadTime = -1L;
+ public long byteTotal = 0L;
+ }
+
+}
diff --git a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java
new file mode 100644
index 0000000..870d34a
--- /dev/null
+++ b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawlerui;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors. The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Constructor.
+ */
+ public IdleCleanupThread()
+ throws ManifoldCFException
+ {
+ super();
+ setName("Idle cleanup thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ Logging.root.debug("Start up idle cleanup thread");
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ // Get the cache handle.
+ ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+
+ IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+ IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+ IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+ IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ // Do the cleanup
+ repositoryConnectorPool.pollAllConnectors();
+ outputConnectorPool.pollAllConnectors();
+ authorityConnectorPool.pollAllConnectors();
+ mappingConnectorPool.pollAllConnectors();
+
+ throttleGroups.poll();
+
+ cacheManager.expireObjects(System.currentTimeMillis());
+
+ // Sleep for the retry interval.
+ ManifoldCF.sleep(5000L);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ {
+ Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ManifoldCF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // Log it, but keep the thread alive
+ Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+ if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+ {
+ // Shut the whole system down!
+ System.exit(1);
+ }
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("Crawler UI ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("Crawler UI could not start - shutting down");
+ Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+
+ }
+
+}
diff --git a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
index 2ea2362..92a2e86 100644
--- a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
+++ b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
@@ -29,11 +29,16 @@
{
public static final String _rcsid = "@(#)$Id$";
+ protected IdleCleanupThread idleCleanupThread = null;
+
public void contextInitialized(ServletContextEvent sce)
{
try
{
- ManifoldCF.initializeEnvironment(ThreadContextFactory.make());
+ IThreadContext threadContext = ThreadContextFactory.make();
+ ManifoldCF.initializeEnvironment(threadContext);
+ idleCleanupThread = new IdleCleanupThread();
+ idleCleanupThread.start();
}
catch (ManifoldCFException e)
{
@@ -43,7 +48,21 @@
public void contextDestroyed(ServletContextEvent sce)
{
- ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+ try
+ {
+ while (true)
+ {
+ if (idleCleanupThread == null)
+ break;
+ idleCleanupThread.interrupt();
+ if (!idleCleanupThread.isAlive())
+ idleCleanupThread = null;
+ }
+ }
+ finally
+ {
+ ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+ }
}
}
diff --git a/framework/crawler-ui/src/main/webapp/editauthority.jsp b/framework/crawler-ui/src/main/webapp/editauthority.jsp
index a31b8e8..978dcca 100644
--- a/framework/crawler-ui/src/main/webapp/editauthority.jsp
+++ b/framework/crawler-ui/src/main/webapp/editauthority.jsp
@@ -544,7 +544,7 @@
<table class="displaytable">
<tr><td class="separator" colspan="5"><hr/></td></tr>
<tr>
- <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.PerJVMColon")%></nobr></td>
+ <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.MaxConnectionsColon")%></nobr></td>
<td class="value" colspan="4"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
</tr>
</table>
diff --git a/framework/crawler-ui/src/main/webapp/editconnection.jsp b/framework/crawler-ui/src/main/webapp/editconnection.jsp
index 914cda2..1906cef 100644
--- a/framework/crawler-ui/src/main/webapp/editconnection.jsp
+++ b/framework/crawler-ui/src/main/webapp/editconnection.jsp
@@ -470,7 +470,7 @@
<table class="displaytable">
<tr><td class="separator" colspan="2"><hr/></td></tr>
<tr>
- <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.Maxconnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.PerJVMColon")%></nobr></td>
+ <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.MaxconnectionsColon")%></nobr></td>
<td class="value"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
</tr>
<tr>
diff --git a/framework/crawler-ui/src/main/webapp/editmapper.jsp b/framework/crawler-ui/src/main/webapp/editmapper.jsp
index a566118..da54a05 100644
--- a/framework/crawler-ui/src/main/webapp/editmapper.jsp
+++ b/framework/crawler-ui/src/main/webapp/editmapper.jsp
@@ -471,7 +471,7 @@
<table class="displaytable">
<tr><td class="separator" colspan="5"><hr/></td></tr>
<tr>
- <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.PerJVMColon")%></nobr></td>
+ <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.MaxConnectionsColon")%></nobr></td>
<td class="value" colspan="4"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
</tr>
</table>
diff --git a/framework/crawler-ui/src/main/webapp/editoutput.jsp b/framework/crawler-ui/src/main/webapp/editoutput.jsp
index 6e6a3ee..6324d97 100644
--- a/framework/crawler-ui/src/main/webapp/editoutput.jsp
+++ b/framework/crawler-ui/src/main/webapp/editoutput.jsp
@@ -400,7 +400,7 @@
<table class="displaytable">
<tr><td class="separator" colspan="2"><hr/></td></tr>
<tr>
- <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.PerJVMColon")%></nobr></td>
+ <td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.MaxConnectionsColon")%></nobr></td>
<td class="value"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
</tr>
</table>
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
index 065467b..9b6150b 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
@@ -5845,7 +5845,10 @@
// (3) If the connector has some other model, we look at the start time. A start
// time of 0 implies a full scan, while any other start time implies an incremental
// scan.
-
+
+ // Always reset document schedules for those documents already pending!
+ jobQueue.resetPendingDocumentSchedules(jobID);
+
// Complete connector model is told everything, so no delete phase.
if (connectorModel == IRepositoryConnector.MODEL_ADD_CHANGE_DELETE)
{
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
index b6b0b6e..a5e6c24 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
@@ -717,6 +717,28 @@
TrackerClass.noteJobChange(jobID,"Prepare full scan");
}
+ /** Reset schedule for all PENDINGPURGATORY entries.
+ *@param jobID is the job identifier.
+ */
+ public void resetPendingDocumentSchedules(Long jobID)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ // Do not reset priorities here! They should all be blank at this point.
+ map.put(checkTimeField,new Long(0L));
+ map.put(checkActionField,actionToString(ACTION_RESCAN));
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobIDField,jobID),
+ new MultiClause(statusField,new Object[]{
+ statusToString(STATUS_PENDINGPURGATORY),
+ statusToString(STATUS_PENDING)})});
+ performUpdate(map,"WHERE "+query,list,null);
+ noteModifications(0,1,0);
+ }
+
/** For ADD_CHANGE_DELETE jobs where the specifications have been changed,
* we must reconsider every existing document. So reconsider them all.
*@param jobID is the job identifier.
diff --git a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
index 8f599b1..2bd2d3e 100644
--- a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
+++ b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
@@ -107,8 +107,7 @@
editoutput.CancelOutputConnectionEditing=Cancel output connection editing
editoutput.Save=Save
editoutput.SaveThisOutputConnection=Save this output connection
-editoutput.MaxConnections=Max connections
-editoutput.PerJVMColon=(per JVM):
+editoutput.MaxConnectionsColon=Max connections:
editoutput.EditOutputConnection=Edit output connection
editoutput.NameColon=Name:
editoutput.DescriptionColon=Description:
@@ -203,8 +202,7 @@
editauthority.ConnectionTypeColon=Connection type:
editauthority.Continue=Continue
editauthority.ContinueToNextPage=Continue to next page
-editauthority.MaxConnections=Max connections
-editauthority.PerJVMColon=(per JVM):
+editauthority.MaxConnectionsColon=Max connections:
editauthority.Cancel=Cancel
editauthority.CancelAuthorityEditing=Cancel authority editing
editauthority.Save=Save
@@ -236,8 +234,7 @@
editmapper.ConnectionTypeColon=Connection type:
editmapper.Continue=Continue
editmapper.ContinueToNextPage=Continue to next page
-editmapper.MaxConnections=Max connections
-editmapper.PerJVMColon=(per JVM):
+editmapper.MaxConnectionsColon=Max connections:
editmapper.Cancel=Cancel
editmapper.CancelMappingEditing=Cancel mapping editing
editmapper.Save=Save
@@ -279,8 +276,7 @@
editconnection.CancelConnectionEditing=Cancel connection editing
editconnection.NameColon=Name:
editconnection.DescriptionColon=Description:
-editconnection.Maxconnections=Max connections
-editconnection.PerJVMColon=(per JVM):
+editconnection.MaxconnectionsColon=Max connections:
editconnection.ThrottlingColon=Throttling:
editconnection.Add=Add
editconnection.BinRegularExpression=Bin regular expression
diff --git a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
index f64add8..b30c7ba 100644
--- a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
+++ b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
@@ -107,8 +107,7 @@
editoutput.CancelOutputConnectionEditing=出力コネクションの編集をキャンセル
editoutput.Save=保存
editoutput.SaveThisOutputConnection=出力コネクションを保存
-editoutput.MaxConnections=最大コネクション数
-editoutput.PerJVMColon=(/JVM):
+editoutput.MaxConnectionsColon=最大コネクション数:
editoutput.EditOutputConnection=出力コネクションを編集
editoutput.NameColon=名前:
editoutput.DescriptionColon=説明:
@@ -203,8 +202,7 @@
editauthority.ConnectionTypeColon=コネクションタイプ:
editauthority.Continue=次へ
editauthority.ContinueToNextPage=次のページに進む
-editauthority.MaxConnections=最大コネクション数
-editauthority.PerJVMColon=(/JVM):
+editauthority.MaxConnectionsColon=最大コネクション数:
editauthority.Cancel=キャンセル
editauthority.CancelAuthorityEditing=権限の編集をキャンセル
editauthority.Save=保存
@@ -236,8 +234,7 @@
editmapper.ConnectionTypeColon=コネクションタイプ:
editmapper.Continue=次へ
editmapper.ContinueToNextPage=次のページに進む
-editmapper.MaxConnections=最大コネクション数
-editmapper.PerJVMColon=(/JVM):
+editmapper.MaxConnectionsColon=最大コネクション数:
editmapper.Cancel=キャンセル
editmapper.CancelMappingEditing=マッピングの編集をキャンセル
editmapper.Save=保存
@@ -279,8 +276,7 @@
editconnection.CancelConnectionEditing=コネクションの編集をキャンセル
editconnection.NameColon=名前:
editconnection.DescriptionColon=説明:
-editconnection.Maxconnections=最大コネクション数
-editconnection.PerJVMColon=(/JVM):
+editconnection.MaxconnectionsColon=最大コネクション数:
editconnection.ThrottlingColon=スロットリング:
editconnection.Add=追加
editconnection.BinRegularExpression=Bin正規表現
diff --git a/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml b/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
index 062da97..dab03d8 100644
--- a/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
+++ b/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
@@ -873,6 +873,7 @@
<tr><td>org.apache.manifoldcf.hsqldbdatabaseport</td><td>No</td><td>The HSQLDB remote server port.</td></tr>
<tr><td>org.apache.manifoldcf.hsqldbdatabaseinstance</td><td>No</td><td>The HSQLDB remote database instance name.</td></tr>
<tr><td>org.apache.manifoldcf.mysql.server</td><td>No</td><td>The MySQL server name. Defaults to 'localhost'.</td></tr>
+ <tr><td>org.apache.manifoldcf.mysql.client</td><td>No</td><td>The MySQL client property. Defaults to 'localhost'. You may want to set this to '%' for a multi-machine setup.</td></tr>
<tr><td>org.apache.manifoldcf.lockmanagerclass</td><td>No</td><td>Specifies the class to use to implement synchronization. Default
is either file-based synchronization or in-memory synchronization, using the org.apache.manifoldcf.core.lockmanager.LockManager class.
Options include org.apache.manifoldcf.core.lockmanager.BaseLockManager, org.apache.manifoldcf.core.FileLockManager, and
diff --git a/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml b/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
index 3a60e6d..dab03d8 100644
--- a/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
+++ b/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
@@ -442,8 +442,9 @@
<tr><td><em>start-database[.sh|.bat]</em></td><td>script to start the HSQLDB database</td></tr>
<tr><td><em>initialize[.sh|.bat]</em></td><td>script to create the database instance, create all database tables, and register connectors</td></tr>
<tr><td><em>start-webapps[.sh|.bat]</em></td><td>script to start Jetty with the ManifoldCF web applications deployed</td></tr>
- <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the agents process</td></tr>
- <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop a running agents process cleanly</td></tr>
+ <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the (first) agents process</td></tr>
+ <tr><td><em>start-agents-2[.sh|.bat]</em></td><td>script to start a second agents process</td></tr>
+ <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop all running agents processes cleanly</td></tr>
<tr><td><em>lock-clean[.sh|.bat]</em></td><td>script to clean up dirty locks (run only when all webapps and processes are stopped)</td></tr>
</table>
<p></p>
@@ -452,7 +453,7 @@
<p></p>
<p>If you run the file-based multiprocess model, after you first start the database (using <em>start-database[.sh|.bat]</em>), you will need to initialize the database before you start the agents process or use the crawler UI. To do this, all you need to do is
run the <em>initialize[.sh|.bat]</em> script. Then, you will need to start the web applications (using <em>start-webapps[.sh|.bat]</em>) and the agents process (using
- <em>start-agents[.sh|.bat]</em>).</p>
+ <em>start-agents[.sh|.bat]</em>), and optionally the second agents process (using <em>start-agents-2[.sh|.bat]</em>).</p>
<p></p>
</section>
@@ -480,8 +481,9 @@
<tr><td><em>start-database[.sh|.bat]</em></td><td>script to start the HSQLDB database</td></tr>
<tr><td><em>initialize[.sh|.bat]</em></td><td>script to create the database instance, create all database tables, and register connectors</td></tr>
<tr><td><em>start-webapps[.sh|.bat]</em></td><td>script to start Jetty with the ManifoldCF web applications deployed</td></tr>
- <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the agents process</td></tr>
- <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop a running agents process cleanly</td></tr>
+ <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start (the first) agents process</td></tr>
+ <tr><td><em>start-agents-2[.sh|.bat]</em></td><td>script to start a second agents process</td></tr>
+ <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop all running agents processes cleanly</td></tr>
</table>
<p></p>
<section>
@@ -494,8 +496,8 @@
<li>Initialize the ManifoldCF shared configuration data (using <em>setglobalproperties[.sh|.bat]</em>)</li>
<li>Start the database (using <em>start-database[.sh|.bat]</em>)</li>
<li>Initialize the database (using <em>initialize[.sh|.bat]</em>)</li>
- <li>Start the agents process (using <em>start-agents[.sh|.bat]</em></li>
- <li>Start the web applications (using <em>start-webapps[.sh|.bat]</em></li>
+ <li>Start the agents process (using <em>start-agents[.sh|.bat]</em>, and optionally <em>start-agents-2[.sh|.bat]</em>)</li>
+ <li>Start the web applications (using <em>start-webapps[.sh|.bat]</em>)</li>
</ol>
<p></p>
</section>
@@ -871,6 +873,7 @@
<tr><td>org.apache.manifoldcf.hsqldbdatabaseport</td><td>No</td><td>The HSQLDB remote server port.</td></tr>
<tr><td>org.apache.manifoldcf.hsqldbdatabaseinstance</td><td>No</td><td>The HSQLDB remote database instance name.</td></tr>
<tr><td>org.apache.manifoldcf.mysql.server</td><td>No</td><td>The MySQL server name. Defaults to 'localhost'.</td></tr>
+ <tr><td>org.apache.manifoldcf.mysql.client</td><td>No</td><td>The MySQL client property. Defaults to 'localhost'. You may want to set this to '%' for a multi-machine setup.</td></tr>
<tr><td>org.apache.manifoldcf.lockmanagerclass</td><td>No</td><td>Specifies the class to use to implement synchronization. Default
is either file-based synchronization or in-memory synchronization, using the org.apache.manifoldcf.core.lockmanager.LockManager class.
Options include org.apache.manifoldcf.core.lockmanager.BaseLockManager, org.apache.manifoldcf.core.FileLockManager, and