Pull up changes from old CONNECTORS-553 branch

git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-553-2@1552344 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 99d33e8..e74d1a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,37 @@
 
 ======================= 1.5-dev =====================
 
+CONNECTORS-846: Once a service had grabbed all connector
+instances, but was not using them any more, it would not release
+them for other cluster members to use.
+(Karl Wright)
+
+CONNECTORS-844: Remove "per JVM" from connection maximum
+messages in crawler UI.
+(Karl Wright)
+
+CONNECTORS-842: Update documentation to describe the functioning
+of the org.apache.manifoldcf.mysql.client property.
+(Chris Griffin, Karl Wright)
+
+CONNECTORS-843: Solr Connector methods do not call getSession()
+when they should, leading to unpredictable results.
+(Markus Schuch, Karl Wright)
+
+CONNECTORS-841: Always reset document schedules on job start.
+(David Morana, Karl Wright)
+
+CONNECTORS-839: Fix our CloudSolrServer usage to use multipart
+post instead of putting everything in the URL.
+(Alessandro Benedetti, Raymond Wiker, Karl Wright)
+
+CONNECTORS-838: Character-stuff names that ZooKeeper sees, to
+prevent issues with '/' characters.
+(Karl Wright)
+
+CONNECTORS-837: Specifying 1 connection causes hang.
+(Karl Wright)
+
 CONNECTORS-836: Use the same thread context in the registered
 shutdown objects.
 (Karl Wright)
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
index 066fa67..7cc4b7b 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
@@ -52,7 +52,7 @@
 {
   public static final String _rcsid = "@(#)$Id: RSSConnector.java 994959 2010-09-08 10:04:42Z kwright $";
 
-
+  protected final static String rssThrottleGroupType = "_RSS_";
 
   // Usage flag values
   protected static final int ROBOTS_NONE = 0;
@@ -105,7 +105,7 @@
   protected Robots robots = null;
 
   /** Storage for fetcher objects */
-  protected static Map fetcherMap = new HashMap();
+  protected static Map<String,ThrottledFetcher> fetcherMap = new HashMap<String,ThrottledFetcher>();
   /** Storage for robots objects */
   protected static Map robotsMap = new HashMap();
 
@@ -231,10 +231,16 @@
 
       }
 
+      IThrottleGroups tg = ThrottleGroupsFactory.make(currentContext);
+      // Create the throttle group
+      tg.createOrUpdateThrottleGroup(rssThrottleGroupType, throttleGroupName, new ThrottleSpec(maxOpenConnectionsPerServer,
+        minimumMillisecondsPerFetchPerServer, minimumMillisecondsPerBytePerServer));
+      
       isInitialized = true;
     }
   }
 
+  
   /** Return the list of activities that this connector supports (i.e. writes into the log).
   *@return the list.
   */
@@ -936,11 +942,9 @@
             String pathPart = url.getFile();
 
             // Check with robots to see if it's allowed
-            if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(protocol,port,hostName,url.getPath(),
+            if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(currentContext,throttleGroupName,
+              protocol,port,hostName,url.getPath(),
               userAgent,from,
-              minimumMillisecondsPerBytePerServer,
-              maxOpenConnectionsPerServer,
-              minimumMillisecondsPerFetchPerServer,
               proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
               activities, connectionLimit))
             {
@@ -955,10 +959,9 @@
             {
 
               // Now, use the fetcher, and get the file.
-              IThrottledConnection connection = fetcher.createConnection(hostName,
-                minimumMillisecondsPerBytePerServer,
-                maxOpenConnectionsPerServer,
-                minimumMillisecondsPerFetchPerServer,
+              IThrottledConnection connection = fetcher.createConnection(currentContext,
+                throttleGroupName,
+                hostName,
                 connectionLimit,
                 feedTimeout,
                 proxyHost,
@@ -5404,7 +5407,7 @@
   {
     synchronized (fetcherMap)
     {
-      ThrottledFetcher tf = (ThrottledFetcher)fetcherMap.get(throttleGroupName);
+      ThrottledFetcher tf = fetcherMap.get(throttleGroupName);
       if (tf == null)
       {
         tf = new ThrottledFetcher();
@@ -5497,6 +5500,47 @@
 
   // Protected classes
 
+  /** The throttle specification class.  Each server name is a different bin in this model.
+  */
+  protected static class ThrottleSpec implements IThrottleSpec
+  {
+    protected final int maxOpenConnectionsPerServer;
+    protected final long minimumMillisecondsPerFetchPerServer;
+    protected final double minimumMillisecondsPerBytePerServer;
+    
+    public ThrottleSpec(int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer,
+      double minimumMillisecondsPerBytePerServer)
+    {
+      this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
+      this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
+      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+    }
+    
+    /** Given a bin name, find the max open connections to use for that bin.
+    *@return Integer.MAX_VALUE if no limit found.
+    */
+    public int getMaxOpenConnections(String binName)
+    {
+      return maxOpenConnectionsPerServer;
+    }
+
+    /** Look up minimum milliseconds per byte for a bin.
+    *@return 0.0 if no limit found.
+    */
+    public double getMinimumMillisecondsPerByte(String binName)
+    {
+      return minimumMillisecondsPerBytePerServer;
+    }
+
+    /** Look up minimum milliseconds for a fetch for a bin.
+    *@return 0 if no limit found.
+    */
+    public long getMinimumMillisecondsPerFetch(String binName)
+    {
+      return minimumMillisecondsPerFetchPerServer;
+    }
+  }
+
   /** Name/value class */
   protected static class NameValue
   {
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
index d9c0e56..1ff565d 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
@@ -109,10 +109,9 @@
   *@param pathString is the path (non-query) part of the URL
   *@return true if fetch is allowed, false otherwise.
   */
-  public boolean isFetchAllowed(String protocol, int port, String hostName, String pathString,
+  public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+    String protocol, int port, String hostName, String pathString,
     String userAgent, String from,
-    double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-    long minimumMillisecondsPerFetchPerServer,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
     IVersionActivity activities, int connectionLimit)
     throws ManifoldCFException, ServiceInterruption
@@ -134,9 +133,9 @@
       }
     }
 
-    return host.isFetchAllowed(System.currentTimeMillis(),pathString,
+    return host.isFetchAllowed(threadContext,throttleGroupName,
+      System.currentTimeMillis(),pathString,
       userAgent,from,
-      minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
       proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,activities,connectionLimit);
   }
 
@@ -257,10 +256,9 @@
     *@param pathString is the path string to check.
     *@return true if crawling is allowed, false otherwise.
     */
-    public boolean isFetchAllowed(long currentTime, String pathString,
+    public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+      long currentTime, String pathString,
       String userAgent, String from,
-      double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
       IVersionActivity activities, int connectionLimit)
       throws ServiceInterruption, ManifoldCFException
@@ -323,9 +321,7 @@
 
         if (readingRobots)
           // This doesn't need to be synchronized because readingRobots blocks all other threads from getting at this object
-          makeValid(currentTime,userAgent,from,
-          minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,
-          minimumMillisecondsPerFetchPerServer,
+          makeValid(threadContext,throttleGroupName,currentTime,userAgent,from,
           proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
           hostName, activities, connectionLimit);
 
@@ -435,9 +431,8 @@
     /** Initialize the record.  This method reads the robots file on the specified protocol/host/port,
     * and parses it according to the rules.
     */
-    protected void makeValid(long currentTime, String userAgent, String from,
-      double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer,
+    protected void makeValid(IThreadContext threadContext, String throttleGroupName,
+      long currentTime, String userAgent, String from,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
       String hostName, IVersionActivity activities, int connectionLimit)
       throws ServiceInterruption, ManifoldCFException
@@ -445,8 +440,8 @@
       invalidTime = currentTime + 24L * 60L * 60L * 1000L;
 
       // Do the fetch
-      IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
-        maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+      IThrottledConnection connection = fetcher.createConnection(threadContext,throttleGroupName,
+        hostName,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
         proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
       try
       {
diff --git a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
index 31e0828..83a0e34 100644
--- a/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
+++ b/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
@@ -88,9 +88,9 @@
   /** This is the lock object for that global handle counter */
   protected static Integer globalHandleCounterLock = new Integer(0);
 
-  /** This hash maps the server string (without port) to a server object, where
+  /** This hash maps the server string (without port) to a pool throttling object, where
   * we can track the statistics and make sure we throttle appropriately */
-  protected Map serverMap = new HashMap();
+  protected final Map<String,IConnectionThrottler> serverMap = new HashMap<String,IConnectionThrottler>();
 
   /** Reference count for how many connections to this pool there are */
   protected int refCount = 0;
@@ -151,35 +151,25 @@
 
   /** Establish a connection to a specified URL.
   * @param serverName is the FQDN of the server, e.g. foo.metacarta.com
-  * @param minimumMillisecondsPerBytePerServer is the average number of milliseconds to wait
-  *       between bytes, on
-  *       average, over all streams reading from this server.  That means that the
-  *       stream will block on fetch until the number of bytes being fetched, done
-  *       in the average time interval required for that fetch, would not exceed
-  *       the desired bandwidth.
-  * @param minimumMillisecondsPerFetchPerServer is the number of milliseconds
-  *        between fetches, as a minimum, on a per-server basis.  Set
-  *        to zero for no limit.
-  * @param maxOpenConnectionsPerServer is the maximum number of open connections to allow for a single server.
-  *        If more than this number of connections would need to be open, then this connection request will block
-  *        until this number will no longer be exceeded.
   * @param connectionLimit is the maximum desired outstanding connections at any one time.
   * @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection before timing out.
   */
-  public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
-    int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds,
+  public synchronized IThrottledConnection createConnection(IThreadContext threadContext, String throttleGroupName,
+    String serverName, int connectionLimit, int connectionTimeoutMilliseconds,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     throws ManifoldCFException, ServiceInterruption
   {
-    Server server;
-    server = (Server)serverMap.get(serverName);
+    IConnectionThrottler server;
+    server = serverMap.get(serverName);
     if (server == null)
     {
-      server = new Server(serverName);
+      // Create a connection throttler for this server
+      IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+      server = tg.obtainConnectionThrottler(RSSConnector.rssThrottleGroupType, throttleGroupName, new String[]{serverName});
       serverMap.put(serverName,server);
     }
 
-    return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+    return new ThrottledConnection(serverName, server,
       connectionTimeoutMilliseconds,connectionLimit,
       proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
   }
@@ -206,14 +196,8 @@
     refCount--;
     if (refCount == 0)
     {
-      // Close all the servers one by one
-      Iterator iter = serverMap.keySet().iterator();
-      while (iter.hasNext())
-      {
-        String serverName = (String)iter.next();
-        Server server = (Server)serverMap.get(serverName);
-        server.discard();
-      }
+      // Since we don't have any actual pools here, this can be a no-op for now
+      // MHL
       serverMap.clear();
     }
   }
@@ -222,14 +206,12 @@
   */
   protected static class ThrottledConnection implements IThrottledConnection
   {
-    /** The connection bandwidth we want */
-    protected final double minimumMillisecondsPerBytePerServer;
-    /** The maximum open connections per server */
-    protected final int maxOpenConnectionsPerServer;
-    /** The minimum time between fetches */
-    protected final long minimumMillisecondsPerFetchPerServer;
-    /** The server object we use to track connections and fetches. */
-    protected final Server server;
+    /** The server fqdn */
+    protected final String serverName;
+    /** The throttling object we use to track connections */
+    protected final IConnectionThrottler connectionThrottler;
+    /** The throttling object we use to track fetches */
+    protected final IFetchThrottler fetchThrottler;
     /** Connection timeout in milliseconds */
     protected final int connectionTimeoutMilliseconds;
     /** The client connection manager */
@@ -259,15 +241,14 @@
     
     /** Constructor.
     */
-    public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+    public ThrottledConnection(String serverName,
+      IConnectionThrottler connectionThrottler,
+      int connectionTimeoutMilliseconds, int connectionLimit,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
       throws ManifoldCFException
     {
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
-      this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
-      this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
-      this.server = server;
+      this.serverName = serverName;
+      this.connectionThrottler = connectionThrottler;
       this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
 
       // Create the https scheme for this connection
@@ -330,7 +311,17 @@
       httpClient = localHttpClient;
 
       registerGlobalHandle(connectionLimit);
-      server.registerConnection(maxOpenConnectionsPerServer);
+      try
+      {
+        int result = connectionThrottler.waitConnectionAvailable();
+        if (result != IConnectionThrottler.CONNECTION_FROM_CREATION)
+          throw new IllegalStateException("Got back unexpected value from waitForAConnection() of "+result);
+        fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+      }
+      catch (InterruptedException e)
+      {
+        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+      }
     }
 
     /** Begin the fetch process.
@@ -344,7 +335,8 @@
       fetchCounter = 0L;
       try
       {
-        server.beginFetch(minimumMillisecondsPerFetchPerServer);
+        if (fetchThrottler.obtainFetchDocumentPermission() == false)
+          throw new IllegalStateException("obtainFetchDocumentPermission() had unexpected return value");
       }
       catch (InterruptedException e)
       {
@@ -386,7 +378,7 @@
     {
 
       StringBuilder sb = new StringBuilder(protocol);
-      sb.append("://").append(server.getServerName());
+      sb.append("://").append(serverName);
       if (port != -1)
         sb.append(":").append(Integer.toString(port));
       sb.append(urlPath);
@@ -407,8 +399,8 @@
       if (lastModified != null)
         executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
       // Create the execution thread.
-      methodThread = new ExecuteMethodThread(this, server,
-        minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+      methodThread = new ExecuteMethodThread(this, fetchThrottler,
+        httpClient, executeMethod);
       // Start the method thread, which will start the transaction
       try
       {
@@ -702,7 +694,6 @@
         if (methodThread != null && threadStarted)
           methodThread.abort();
         long endTime = System.currentTimeMillis();
-        server.endFetch();
 
         activities.recordActivity(new Long(startFetchTime),RSSConnector.ACTIVITY_FETCH,
           new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -749,7 +740,7 @@
     {
       // Clean up the connection pool.  This should do the necessary bookkeeping to release the one connection that's sitting there.
       connectionManager.shutdown();
-      server.releaseConnection();
+      connectionThrottler.noteConnectionDestroyed();
       releaseGlobalHandle();
     }
 
@@ -760,23 +751,20 @@
   */
   protected static class ThrottledInputstream extends InputStream
   {
-    /** Stream throttling parameters */
-    protected double minimumMillisecondsPerBytePerServer;
-    /** The throttled connection we belong to */
-    protected ThrottledConnection throttledConnection;
-    /** The server object we use to track throttling */
-    protected Server server;
+    /** Throttled connection */
+    protected final ThrottledConnection throttledConnection;
+    /** Stream throttler */
+    protected final IStreamThrottler streamThrottler;
     /** The stream we are wrapping. */
-    protected InputStream inputStream;
+    protected final InputStream inputStream;
 
     /** Constructor.
     */
-    public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer)
+    public ThrottledInputstream(ThrottledConnection throttledConnection, IStreamThrottler streamThrottler, InputStream is)
     {
-      this.throttledConnection = connection;
-      this.server = server;
+      this.throttledConnection = throttledConnection;
+      this.streamThrottler = streamThrottler;
       this.inputStream = is;
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
     }
 
     /** Read a byte.
@@ -839,7 +827,8 @@
     {
       try
       {
-        server.beginRead(len,minimumMillisecondsPerBytePerServer);
+        if (streamThrottler.obtainReadPermission(len) == false)
+          throw new IllegalStateException("Throttler shut down while still active");
         int amt = 0;
         try
         {
@@ -849,10 +838,10 @@
         finally
         {
           if (amt == -1)
-            server.endRead(len,0);
+            streamThrottler.releaseReadPermission(len,0);
           else
           {
-            server.endRead(len,amt);
+            streamThrottler.releaseReadPermission(len,amt);
             throttledConnection.logFetchCount(amt);
           }
         }
@@ -909,310 +898,16 @@
     public void close()
       throws IOException
     {
-      inputStream.close();
-    }
-
-  }
-
-  /** This class represents the throttling stuff kept around for a single server.
-  *
-  * In order to calculate
-  * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
-  * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
-  * window length is too long.
-  *
-  * One solution to this problem would be to keep a list of the individual fetches as records.  Then, we could
-  * "expire" a fetch by discarding the old record.  However, this is quite memory consumptive for all but the
-  * smallest intervals.
-  *
-  * Another, better, solution is to hook into the start and end of individual fetches.  These will, presumably, occur
-  * at the fastest possible rate without long pauses spent doing something else.  The only complication is that
-  * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
-  * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
-  * than keep records around.  The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
-  * acceptable.
-  *
-  * For the "maximum open connections" limit, the best thing would be to establish a separate MultiThreadedConnectionPool
-  * for each Server.  Then, the limit would be automatic.
-  *
-  * Some notes on the algorithms used to limit server bandwidth impact
-  * ==================================================================
-  *
-  * In a single connection case, the algorithm we'd want to use works like this.  On the first chunk of a series,
-  * the total length of time and the number of bytes are recorded.  Then, prior to each subsequent chunk, a calculation
-  * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
-  * access as a way of estimating how long it will take to fetch those next n bytes.
-  *
-  * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
-  * and harder still to "hit the target", because simultaneous fetches will intrude.  The strategy is therefore:
-  *
-  * 1) The first chunk of any series should proceed without interference from other connections to the same server.
-  *    The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
-  *
-  * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".  That is, if other
-  *    connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
-  *    took.  Thus, by generating end-time estimates based on this number, we are actually being conservative and
-  *    using less server bandwidth.
-  *
-  * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
-  *    new chunks from other connections can start.
-  *
-  */
-  protected class Server
-  {
-    /** The fqdn of the server */
-    protected final String serverName;
-    /** This is the time of the next allowed fetch (in ms since epoch) */
-    protected long nextFetchTime = 0L;
-
-    // Bandwidth throttling variables
-    /** Reference count for bandwidth variables */
-    protected volatile int refCount = 0;
-    /** The inverse rate estimate of the first fetch, in ms/byte */
-    protected double rateEstimate = 0.0;
-    /** Flag indicating whether a rate estimate is needed */
-    protected volatile boolean estimateValid = false;
-    /** Flag indicating whether rate estimation is in progress yet */
-    protected volatile boolean estimateInProgress = false;
-    /** The start time of this series */
-    protected long seriesStartTime = -1L;
-    /** Total actual bytes read in this series; this includes fetches in progress */
-    protected long totalBytesRead = -1L;
-
-    /** Outstanding connection counter */
-    protected volatile int outstandingConnections = 0;
-
-    /** Constructor */
-    public Server(String serverName)
-    {
-      this.serverName = serverName;
-    }
-
-    /** Get the fqdn of the server */
-    public String getServerName()
-    {
-      return serverName;
-    }
-
-    /** Register an outstanding connection (and wait until it can be obtained before proceeding) */
-    public synchronized void registerConnection(int maxOutstandingConnections)
-      throws ManifoldCFException
-    {
       try
       {
-        while (outstandingConnections >= maxOutstandingConnections)
-        {
-          wait();
-        }
-        outstandingConnections++;
-      }
-      catch (InterruptedException e)
-      {
-        throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
-      }
-    }
-
-    /** Release an outstanding connection back into the pool */
-    public synchronized void releaseConnection()
-    {
-      outstandingConnections--;
-      notifyAll();
-    }
-
-    /** Note the start of a fetch operation.  Call this method just before the actual stream access begins.
-    * May wait until schedule allows.
-    */
-    public void beginFetch(long minimumMillisecondsPerFetchPerServer)
-      throws InterruptedException
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note begin fetch for '"+serverName+"'");
-      // First, do any waiting, and reschedule as needed
-      long waitAmount = 0L;
-      long currentTime = System.currentTimeMillis();
-
-      // System.out.println("Begin fetch for server "+this.toString()+" with minimum milliseconds per fetch of "+new Long(minimumMillisecondsPerFetchPerServer).toString()+
-      //      " Current time: "+new Long(currentTime).toString()+ " Next fetch time: "+new Long(nextFetchTime).toString());
-
-      synchronized (this)
-      {
-        if (currentTime < nextFetchTime)
-        {
-          waitAmount = nextFetchTime-currentTime;
-          nextFetchTime = nextFetchTime + minimumMillisecondsPerFetchPerServer;
-        }
-        else
-          nextFetchTime = currentTime + minimumMillisecondsPerFetchPerServer;
-      }
-      if (waitAmount > 0L)
-      {
-        if (Logging.connectors.isDebugEnabled())
-          Logging.connectors.debug("RSS: Performing a fetch wait for server '"+serverName+"' for "+
-          new Long(waitAmount).toString()+" ms.");
-        ManifoldCF.sleep(waitAmount);
-      }
-
-      // System.out.println("For server "+this.toString()+", at "+new Long(System.currentTimeMillis()).toString()+", the next fetch time is now "+new Long(nextFetchTime).toString());
-
-      synchronized (this)
-      {
-        if (refCount == 0)
-        {
-          // Now, reset bandwidth throttling counters
-          estimateValid = false;
-          rateEstimate = 0.0;
-          totalBytesRead = 0L;
-          estimateInProgress = false;
-          seriesStartTime = -1L;
-        }
-        refCount++;
-      }
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Begin fetch noted for '"+serverName+"'");
-
-    }
-
-    /** Note the end of a fetch operation.  Call this method just after the fetch completes.
-    */
-    public void endFetch()
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note end fetch for '"+serverName+"'");
-
-      synchronized (this)
-      {
-        refCount--;
-      }
-
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: End fetch noted for '"+serverName+"'");
-
-    }
-
-    /** Note the start of an individual byte read of a specified size.  Call this method just before the
-    * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
-    */
-    public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
-      throws InterruptedException
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note begin read for '"+serverName+"'");
-
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        while (estimateInProgress)
-          wait();
-        if (estimateValid == false)
-        {
-          seriesStartTime = currentTime;
-          estimateInProgress = true;
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-          // Exit early; this thread isn't going to do any waiting
-          //if (Logging.connectors.isTraceEnabled())
-          //      Logging.connectors.trace("RSS: Read begin noted; gathering stats for '"+serverName+"'");
-
-          return;
-        }
-      }
-
-      // It is possible for the following code to get interrupted.  If that happens,
-      // we have to unstick the threads that are waiting on the estimate!
-      boolean finished = false;
-      try
-      {
-        long waitTime = 0L;
-        synchronized (this)
-        {
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-
-          // Estimate the time this read will take, and wait accordingly
-          long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
-          // Figure out how long the total byte count should take, to meet the constraint
-          long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
-          // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
-          // current time.  But it can't be negative.
-          waitTime = (desiredEndTime - estimatedTime) - currentTime;
-        }
-
-        if (waitTime > 0L)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("RSS: Performing a read wait on server '"+serverName+"' of "+
-            new Long(waitTime).toString()+" ms.");
-          ManifoldCF.sleep(waitTime);
-        }
-
-        //if (Logging.connectors.isTraceEnabled())
-        //      Logging.connectors.trace("RSS: Begin read noted for '"+serverName+"'");
-        finished = true;
+        inputStream.close();
       }
       finally
       {
-        if (!finished)
-        {
-          abortRead();
-        }
+        streamThrottler.closeStream();
       }
     }
 
-    /** Abort a read in progress.
-    */
-    public void abortRead()
-    {
-      synchronized (this)
-      {
-        if (estimateInProgress)
-        {
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-
-    /** Note the end of an individual read from the server.  Call this just after an individual read completes.
-    * Pass the actual number of bytes read to the method.
-    */
-    public void endRead(int originalCount, int actualCount)
-    {
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: Note end read for '"+serverName+"'");
-
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
-        if (estimateInProgress)
-        {
-          if (actualCount == 0)
-            // Didn't actually get any bytes, so use 0.0
-            rateEstimate = 0.0;
-          else
-            rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
-          estimateValid = true;
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-
-      //if (Logging.connectors.isTraceEnabled())
-      //      Logging.connectors.trace("RSS: End read noted for '"+serverName+"'");
-
-    }
-
-    /** Discard this server.
-    */
-    public void discard()
-    {
-      // Nothing needed anymore
-    }
-
   }
 
   /** This thread does the actual socket communication with the server.
@@ -1235,10 +930,8 @@
   {
     /** The connection */
     protected final ThrottledConnection theConnection;
-    /** The connection bandwidth we want */
-    protected final double minimumMillisecondsPerBytePerServer;
-    /** The server object we use to track connections and fetches. */
-    protected final Server server;
+    /** The fetch throttler */
+    protected final IFetchThrottler fetchThrottler;
     /** Client and method, all preconfigured */
     protected final HttpClient httpClient;
     protected final HttpRequestBase executeMethod;
@@ -1256,15 +949,13 @@
 
     protected Throwable generalException = null;
     
-    public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
-      double minimumMillisecondsPerBytePerServer,
+    public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
       HttpClient httpClient, HttpRequestBase executeMethod)
     {
       super();
       setDaemon(true);
       this.theConnection = theConnection;
-      this.server = server;
-      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+      this.fetchThrottler = fetchThrottler;
       this.httpClient = httpClient;
       this.executeMethod = executeMethod;
     }
@@ -1316,7 +1007,7 @@
                   bodyStream = response.getEntity().getContent();
                   if (bodyStream != null)
                   {
-                    bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+                    bodyStream = new ThrottledInputstream(theConnection,fetchThrottler.createFetchStream(),bodyStream);
                     threadStream = new XThreadInputStream(bodyStream);
                   }
                   streamCreated = true;
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
index 13b81f4..5af995c 100644
--- a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/HttpPoster.java
@@ -70,6 +70,7 @@
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
 
 
 /**
@@ -158,7 +159,7 @@
     
     try
     {
-      CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts);
+      CloudSolrServer cloudSolrServer = new CloudSolrServer(zookeeperHosts, new ModifiedLBHttpSolrServer(HttpClientUtil.createClient(null)));
       cloudSolrServer.setZkClientTimeout(zkClientTimeout);
       cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
       cloudSolrServer.setDefaultCollection(collection);
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java
new file mode 100644
index 0000000..a8b728f
--- /dev/null
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/ModifiedLBHttpSolrServer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.manifoldcf.agents.output.solr;
+
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.impl.BinaryResponseParser;
+import org.apache.solr.client.solrj.*;
+import java.net.MalformedURLException;
+import org.apache.http.client.HttpClient;
+
+
+/** This class overrides and somewhat changes the behavior of the
+* SolrJ LBHttpSolrServer class.  This is so it instantiates our modified
+* HttpSolrServer class, so that multipart forms work.
+*/
+public class ModifiedLBHttpSolrServer extends LBHttpSolrServer
+{
+  private final HttpClient httpClient;
+  private final ResponseParser parser;
+  
+  public ModifiedLBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
+    this(null, solrServerUrls);
+  }
+  
+  /** The provided httpClient should use a multi-threaded connection manager */ 
+  public ModifiedLBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
+          throws MalformedURLException {
+    this(httpClient, new BinaryResponseParser(), solrServerUrl);
+  }
+
+  /** The provided httpClient should use a multi-threaded connection manager */  
+  public ModifiedLBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
+          throws MalformedURLException {
+    super(httpClient, parser, solrServerUrl);
+    this.httpClient = httpClient;
+    this.parser = parser;
+  }
+  
+  @Override
+  protected HttpSolrServer makeServer(String server) throws MalformedURLException {
+    return new ModifiedHttpSolrServer(server, httpClient, parser);
+  }
+
+}
diff --git a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
index 770a89a..e85e1f3 100644
--- a/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
+++ b/connectors/solr/connector/src/main/java/org/apache/manifoldcf/agents/output/solr/SolrConnector.java
@@ -434,6 +434,8 @@
   public String getOutputDescription(OutputSpecification spec)
     throws ManifoldCFException, ServiceInterruption
   {
+    getSession();
+    
     StringBuilder sb = new StringBuilder();
 
     // All the arguments need to go into this string, since they affect ingestion.
@@ -578,6 +580,7 @@
   public boolean checkMimeTypeIndexable(String outputDescription, String mimeType)
     throws ManifoldCFException, ServiceInterruption
   {
+    getSession();
     if (includedMimeTypes != null && includedMimeTypes.get(mimeType) == null)
       return false;
     if (excludedMimeTypes != null && excludedMimeTypes.get(mimeType) != null)
@@ -594,6 +597,7 @@
   public boolean checkLengthIndexable(String outputDescription, long length)
     throws ManifoldCFException, ServiceInterruption
   {
+    getSession();
     if (maxDocumentLength != null && length > maxDocumentLength.longValue())
       return false;
     return super.checkLengthIndexable(outputDescription,length);
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
index d401159..8e8b65a 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
@@ -39,6 +39,12 @@
   public static final int FETCH_INTERRUPTED = -104;
   public static final int FETCH_UNKNOWN_ERROR = -999;
 
+  /** Check whether the connection has expired.
+  *@param currentTime is the current time to use to judge if a connection has expired.
+  *@return true if the connection has expired, and should be closed.
+  */
+  public boolean hasExpired(long currentTime);
+
   /** Begin the fetch process.
   * @param fetchType is a short descriptive string describing the kind of fetch being requested.  This
   *        is used solely for logging purposes.
@@ -113,8 +119,13 @@
   public void doneFetch(IVersionActivity activities)
     throws ManifoldCFException;
 
-  /** Close the connection.  Call this to end this server connection.
+  /** Close the connection.  Call this to return the connection to
+  * its pool.
   */
-  public void close()
-    throws ManifoldCFException;
+  public void close();
+  
+  /** Destroy the connection.  Call this to close the connection.
+  */
+  public void destroy();
+  
 }
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
index db7b063..c121568 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
@@ -33,13 +33,13 @@
 * any given bin value as much as possible.  For that reason I've organized this structure
 * accordingly.
 */
-public class ThrottleDescription
+public class ThrottleDescription implements IThrottleSpec
 {
   public static final String _rcsid = "@(#)$Id: ThrottleDescription.java 988245 2010-08-23 18:39:35Z kwright $";
 
   /** This is the hash that contains everything.  It's keyed by the regexp string itself.
   * Values are ThrottleItem's. */
-  protected HashMap patternHash = new HashMap();
+  protected Map<String,ThrottleItem> patternHash = new HashMap<String,ThrottleItem>();
 
   /** Constructor.  Build the description from the ConfigParams. */
   public ThrottleDescription(ConfigParams configData)
@@ -146,17 +146,15 @@
   }
 
   /** Given a bin name, find the max open connections to use for that bin.
-  *@return -1 if no limit found.
+  *@return Integer.MAX_VALUE if no limit found.
   */
+  @Override
   public int getMaxOpenConnections(String binName)
   {
     // Go through the regexps and match; for each match, find the maximum possible.
     int maxCount = -1;
-    Iterator iter = patternHash.keySet().iterator();
-    while (iter.hasNext())
+    for (ThrottleItem ti : patternHash.values())
     {
-      String binDescription = (String)iter.next();
-      ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
       Integer limit = ti.getMaxOpenConnections();
       if (limit != null)
       {
@@ -169,22 +167,24 @@
         }
       }
     }
+    if (maxCount == -1)
+      maxCount = Integer.MAX_VALUE;
+    else if (maxCount == 0)
+      maxCount = 1;
     return maxCount;
   }
 
   /** Look up minimum milliseconds per byte for a bin.
   *@return 0.0 if no limit found.
   */
+  @Override
   public double getMinimumMillisecondsPerByte(String binName)
   {
     // Go through the regexps and match; for each match, find the maximum possible.
     double minMilliseconds = 0.0;
     boolean seenSomething = false;
-    Iterator iter = patternHash.keySet().iterator();
-    while (iter.hasNext())
+    for (ThrottleItem ti : patternHash.values())
     {
-      String binDescription = (String)iter.next();
-      ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
       Double limit = ti.getMinimumMillisecondsPerByte();
       if (limit != null)
       {
@@ -206,16 +206,14 @@
   /** Look up minimum milliseconds for a fetch for a bin.
   *@return 0 if no limit found.
   */
+  @Override
   public long getMinimumMillisecondsPerFetch(String binName)
   {
     // Go through the regexps and match; for each match, find the maximum possible.
     long minMilliseconds = 0L;
     boolean seenSomething = false;
-    Iterator iter = patternHash.keySet().iterator();
-    while (iter.hasNext())
+    for (ThrottleItem ti : patternHash.values())
     {
-      String binDescription = (String)iter.next();
-      ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
       Long limit = ti.getMinimumMillisecondsPerFetch();
       if (limit != null)
       {
@@ -239,7 +237,7 @@
   protected static class ThrottleItem
   {
     /** The bin-matching pattern. */
-    protected Pattern pattern;
+    protected final Pattern pattern;
     /** The minimum milliseconds between bytes, or null if no limit. */
     protected Double minimumMillisecondsPerByte = null;
     /** The minimum milliseconds per fetch, or null if no limit */
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
index e035198..e1f42da 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
@@ -100,6 +100,12 @@
 {
   public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $";
 
+  /** Web throttle group type */
+  protected static final String webThrottleGroupType = "_WEB_";
+  
+  /** Idle timeout */
+  protected static final long idleTimeout = 300000L;
+  
   /** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */
   protected static final boolean recordEverything = false;
 
@@ -109,18 +115,13 @@
   protected static final long TIME_6HRS = 6L * 60L * 60000L;
   protected static final long TIME_1DAY = 24L * 60L * 60000L;
 
+  /** The read chunk length */
+  protected static final int READ_CHUNK_LENGTH = 4096;
 
-  // The following static bin pools correspond to global resources that will be managed via ILockManager.
+  /** Connection pools.
+  /* This is a static hash of the connection pools in existence.  Each connection pool represents a set of identical connections. */
+  protected final static Map<ConnectionPoolKey,ConnectionPool> connectionPools = new HashMap<ConnectionPoolKey,ConnectionPool>();
   
-  /** This is the static pool of ConnectionBin's, keyed by bin name. */
-  protected static Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
-  /** This is the static pool of ThrottleBin's, keyed by bin name. */
-  protected static Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
-
-  /** This global lock protects the "distributed pool" resource, and insures that a connection
-  * can get pulled out of all the right pools and wind up in only the hands of one thread. */
-  protected static Integer poolLock = new Integer(0);
-
   /** Current host name */
   private static String currentHost = null;
   static
@@ -138,17 +139,13 @@
     }
   }
 
-  /** The read chunk length */
-  protected static final int READ_CHUNK_LENGTH = 4096;
-
-  /** Constructor.
+  /** Constructor.  Private since we never instantiate.
   */
-  public ThrottledFetcher()
+  private ThrottledFetcher()
   {
   }
 
 
-
   /** Obtain a connection to specified protocol, server, and port.  We use the protocol because the
   * setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if
   * we distinguish connections based on that.
@@ -164,15 +161,22 @@
   *@param connectionLimit isthe maximum number of connections permitted.
   *@return an IThrottledConnection object that can be used to fetch from the port.
   */
-  public static IThrottledConnection getConnection(String protocol, String server, int port,
+  public static IThrottledConnection getConnection(IThreadContext threadContext, String throttleGroupName,
+    String protocol, String server, int port,
     PageCredentials authentication,
     IKeystoreManager trustStore,
-    ThrottleDescription throttleDescription, String[] binNames,
+    IThrottleSpec throttleDescription, String[] binNames,
     int connectionLimit,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     throws ManifoldCFException
   {
-    // Create the https scheme for this connection
+    // Get a throttle groups handle
+    IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+    
+    // Create the appropruate throttle group, or update the throttle description for an existing one
+    throttleGroups.createOrUpdateThrottleGroup(webThrottleGroupType,throttleGroupName,throttleDescription);
+    
+    // Create the https scheme and trust store string for this connection
     javax.net.ssl.SSLSocketFactory baseFactory;
     String trustStoreString;
     if (trustStore != null)
@@ -186,781 +190,68 @@
       trustStoreString = null;
     }
 
-
-    ConnectionBin[] bins = new ConnectionBin[binNames.length];
-
-    // Now, start looking for a connection
-    int i = 0;
-    while (i < binNames.length)
+    // Construct a connection pool key
+    ConnectionPoolKey poolKey = new ConnectionPoolKey(protocol,server,port,authentication,
+      trustStoreString,proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+    
+    ConnectionPool p;
+    synchronized (connectionPools)
     {
-      String binName = binNames[i];
-
-      // Find or create the bin object
-      ConnectionBin cb;
-      synchronized (connectionBins)
+      p = connectionPools.get(poolKey);
+      if (p == null)
       {
-        cb = connectionBins.get(binName);
-        if (cb == null)
-        {
-          cb = new ConnectionBin(binName);
-          connectionBins.put(binName,cb);
-        }
-        //cb.sanityCheck();
+        // Construct a new IConnectionThrottler.
+        IConnectionThrottler connectionThrottler =
+          throttleGroups.obtainConnectionThrottler(webThrottleGroupType,throttleGroupName,binNames);
+        p = new ConnectionPool(connectionThrottler,protocol,server,port,authentication,baseFactory,
+          proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+        connectionPools.put(poolKey,p);
       }
-      bins[i] = cb;
-      i++;
     }
-
-    ThrottledConnection connectionToReuse;
-
-    long startTime = 0L;
-    if (Logging.connectors.isDebugEnabled())
+    
+    try
     {
-      startTime = System.currentTimeMillis();
-      Logging.connectors.debug("WEB: Waiting to start getting a connection to "+protocol+"://"+server+":"+port);
+      return p.grab();
     }
-
-    synchronized (poolLock)
+    catch (InterruptedException e)
     {
-
-      // If the number of outstanding connections is greater than the global limit, close pooled connections until we are under the limit
-      long idleTimeout = 64000L;
-      while (true)
-      {
-        int openCount = 0;
-
-        // Lock up everything for a moment
-        synchronized (connectionBins)
-        {
-          // Time out connections that have been idle too long.  To do this, we need to go through
-          // all connection bins and look at the pool
-          for (String binName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(binName);
-            openCount += cb.countConnections();
-          }
-        }
-
-        if (openCount < connectionLimit)
-          break;
-
-        if (idleTimeout == 0L)
-        {
-          // Can't actually conclude anything here unfortunately
-
-          // Logging.connectors.warn("Web: Exceeding connection limit!  Open count = "+Integer.toString(openCount)+"; limit = "+Integer.toString(connectionLimit));
-          break;
-        }
-        idleTimeout = idleTimeout/4L;
-
-        // Lock up everything for a moment, since otherwise we could delete something people
-        // expect to stick around.
-        synchronized (connectionBins)
-        {
-          // Time out connections that have been idle too long.  To do this, we need to go through
-          // all connection bins and look at the pool
-          for (String binName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(binName);
-            cb.flushIdleConnections(idleTimeout);
-          }
-        }
-      }
-
-      try
-      {
-        // Retry until we get the connection.
-        while (true)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Attempting to get connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-          i = 0;
-
-          connectionToReuse = null;
-
-          try
-          {
-
-            // Now, start looking for a connection
-            while (i < binNames.length)
-            {
-              String binName = binNames[i];
-              ConnectionBin cb = bins[i];
-
-              // Figure out the connection limit for this bin, based on the throttle description
-              int maxConnections = throttleDescription.getMaxOpenConnections(binName);
-
-              // If no restriction, use a very large value.
-              if (maxConnections == -1)
-                maxConnections = Integer.MAX_VALUE;
-              else if (maxConnections == 0)
-                maxConnections = 1;
-
-              // Now, do what we need to do to reserve our connection for this bin.
-              // If we can't reserve it now, we plan on undoing everything we did, so
-              // whatever we do must be reversible.  Furthermore, nothing we call here
-              // should actually wait(); that will occur if we can't get what we need out
-              // here at this level.
-
-              if (connectionToReuse != null)
-              {
-                // We have a reuse candidate already, so just make sure each remaining bin is within
-                // its limits.
-                cb.insureWithinLimits(maxConnections,connectionToReuse);
-              }
-              else
-              {
-                connectionToReuse = cb.findConnection(maxConnections,bins,protocol,server,port,authentication,trustStoreString,
-                  proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
-              }
-
-              // Increment after we successfully handled this bin
-              i++;
-            }
-
-            // That loop completed, meaning that we think we got a connection.  Now, go through all the bins and make sure there's enough time since the last
-            // fetch.  If not, we have to clean everything up and try again.
-            long currentTime = System.currentTimeMillis();
-
-            // Global lock needed to insure that fetch time is updated across all bins simultaneously
-            synchronized (connectionBins)
-            {
-              i = 0;
-              while (i < binNames.length)
-              {
-                String binName = binNames[i];
-                ConnectionBin cb = bins[i];
-                //cb.sanityCheck();
-                // Get the minimum time between fetches for this bin, based on the throttle description
-                long minMillisecondsPerFetch = throttleDescription.getMinimumMillisecondsPerFetch(binName);
-                if (cb.getLastFetchTime() + minMillisecondsPerFetch > currentTime)
-                  throw new WaitException(cb.getLastFetchTime() + minMillisecondsPerFetch - currentTime);
-                i++;
-              }
-              i = 0;
-              while (i < binNames.length)
-              {
-                ConnectionBin cb = bins[i++];
-                cb.setLastFetchTime(currentTime);
-              }
-            }
-
-          }
-          catch (Throwable e)
-          {
-            // We have to free everything and retry, because otherwise we are subject to deadlock.
-            // The only thing we have reserved is the connection, which we must free if there's a
-            // problem.
-
-            if (connectionToReuse != null)
-            {
-              // Return this connection to the pool.  That is, the pools for all the bins.
-              int k = 0;
-              while (k < binNames.length)
-              {
-                String binName = binNames[k++];
-                ConnectionBin cb;
-                synchronized (connectionBins)
-                {
-                  cb = connectionBins.get(binName);
-                  if (cb == null)
-                  {
-                    cb = new ConnectionBin(binName);
-                    connectionBins.put(binName,cb);
-                  }
-                }
-                //cb.sanityCheck();
-                cb.addToPool(connectionToReuse);
-                //cb.sanityCheck();
-              }
-              connectionToReuse = null;
-              // We should not need to notify here because nothing has really changed from
-              // when the attempt started to get the connection.  We just undid what we did.
-            }
-
-
-            if (e instanceof Error)
-              throw (Error)e;
-            if (e instanceof ManifoldCFException)
-              throw (ManifoldCFException)e;
-
-            if (e instanceof WaitException)
-            {
-              // Wait because we need a certain amount of time after a previous fetch.
-              WaitException we = (WaitException)e;
-              long waitAmount = we.getWaitAmount();
-              if (Logging.connectors.isDebugEnabled())
-                Logging.connectors.debug("WEB: Waiting "+new Long(waitAmount).toString()+" ms before starting fetch on "+protocol+"://"+server+":"+port);
-              // Really don't want to sleep inside the pool lock!
-              // The easiest thing to do instead is to use a timed wait.  There is no reason why we need
-              // to wake before the wait time is exceeded - but it's harmless, and the alternative is to
-              // do more reorganization than probably is wise.
-              poolLock.wait(waitAmount);
-              continue;
-            }
-
-            if (e instanceof PoolException)
-            {
-
-              if (Logging.connectors.isDebugEnabled())
-                Logging.connectors.debug("WEB: Going into wait for connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-              // Now, wait for something external to change.  The only thing that can help us is if
-              // some other thread frees a connection.
-              poolLock.wait();
-              // Go back around and try again.
-              continue;
-            }
-
-            throw new ManifoldCFException("Unexpected exception encountered: "+e.getMessage(),e);
-          }
-
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Successfully got connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-          // If we have a connection located, activate it.
-          if (connectionToReuse == null)
-            connectionToReuse = new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
-              proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
-          connectionToReuse.setup(throttleDescription);
-          return connectionToReuse;
-        }
-      }
-      catch (InterruptedException e)
-      {
-        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
-      }
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
     }
   }
 
-
   /** Flush connections that have timed out from inactivity. */
-  public static void flushIdleConnections()
+  public static void flushIdleConnections(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    synchronized (poolLock)
+    // Go through outstanding connection pools and clean them up.
+    synchronized (connectionPools)
     {
-      // Lock up everything for a moment, since otherwise we could delete something people
-      // expect to stick around.
-      synchronized (connectionBins)
+      for (ConnectionPool pool : connectionPools.values())
       {
-        // Time out connections that have been idle too long.  To do this, we need to go through
-        // all connection bins and look at the pool
-        for (String binName : connectionBins.keySet())
-        {
-          ConnectionBin cb = connectionBins.get(binName);
-          if (cb.flushIdleConnections(60000L))
-          {
-            // Bin is no longer doing anything; get rid of it.
-            // I've determined this is safe - inUseConnections is designed to prevent any active connection from getting
-            // whacked.
-            // Oops.  Hang results again when I enabled this, so out it goes again.
-            //connectionBins.remove(binName);
-            //binIter = connectionBins.keySet().iterator();
-          }
-        }
+        pool.flushIdleConnections();
       }
     }
   }
 
-  /** Connection pool for a bin.
-  * An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
-  * NOTE WELL: This resource must be constrained globally, across all JVMs!
-  * To do that, we need an ILockManager to handle the global data for each bin.
-  */
-  protected static class ConnectionBin
-  {
-    /** This is the bin name which this connection pool belongs to */
-    protected String binName;
-    /** This is the number of connections in this bin that are signed out and presumably in use */
-    protected int inUseConnections = 0;
-    /** This is the last time a fetch was done on this bin */
-    protected long lastFetchTime = 0L;
-    /** This object is what we synchronize on when we are waiting on a connection to free up for this
-    * bin.  This is a separate object, because we also want to protect the integrity of the
-    * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
-    protected Integer connectionWait = new Integer(0);
-    /** This map contains ThrottledConnection objects that are in the pool, and are not in use. */
-    protected HashMap freePool = new HashMap();
-
-    /** Constructor. */
-    public ConnectionBin(String binName)
-    {
-      this.binName = binName;
-    }
-
-    /** Get the bin name. */
-    public String getBinName()
-    {
-      return binName;
-    }
-
-    /** Note the creation of an active connection that belongs to this bin.  The slots all must
-    * have been reserved prior to the connection being created.
-    */
-    public synchronized void noteConnectionCreation()
-    {
-      inUseConnections++;
-    }
-
-    /** Note the destruction of an active connection that belongs to this bin.
-    */
-    public synchronized void noteConnectionDestruction()
-    {
-      inUseConnections--;
-    }
-
-
-    /** Activate a connection that should be in the pool.
-    * Removes the connection from the pool.
-    */
-    public synchronized void takeFromPool(ThrottledConnection tc)
-    {
-      // Remove this connection from the pool list
-      freePool.remove(tc);
-      inUseConnections++;
-    }
-
-    /** Put a connection into the pool.
-    */
-    public synchronized void addToPool(ThrottledConnection tc)
-    {
-      // Add this connection to the pool list
-      freePool.put(tc,tc);
-      inUseConnections--;
-    }
-
-    /** Verify that this bin is within limits.
-    */
-    public synchronized void insureWithinLimits(int maxConnections, ThrottledConnection existingConnection)
-      throws PoolException
-    {
-      //sanityCheck();
-
-      // See if the connection is in fact within the pool; if so, we just presume the limits are fine as they are.
-      // This is necessary because if the connection that's being checked for is freed, then we wreck the data structures.
-      if (existsInPool(existingConnection))
-        return;
-
-      while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
-      {
-        //sanityCheck();
-
-        // If there are any pool connections, free them one at a time
-        ThrottledConnection freeMe = getPoolConnection();
-        if (freeMe != null)
-        {
-          // It's okay to call activate since we guarantee that only one thread is trying to grab
-          // a connection at a time.
-          freeMe.activate();
-          freeMe.destroy();
-          continue;
-        }
-
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-      }
-    }
-
-    /** This method is called only when there is no existing connection yet identified that can be used
-    * for contacting the server and port specified.  This method returns a connection if a matching one can be found;
-    * otherwise it returns null.
-    * If a matching connection is found, it is activated before it is returned.  That removes the connection from all
-    * pools in which it lives.
-    */
-    public synchronized ThrottledConnection findConnection(int maxConnections,
-      ConnectionBin[] binNames, String protocol, String server, int port,
-      PageCredentials authentication, String trustStoreString,
-      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
-      throws PoolException
-    {
-      //sanityCheck();
-
-      // First, wait until there's no excess.
-      while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
-      {
-        //sanityCheck();
-        // If there are any pool connections, free them one at a time
-        ThrottledConnection freeMe = getPoolConnection();
-        if (freeMe != null)
-        {
-          // It's okay to call activate since we guarantee that only one thread is trying to grab
-          // a connection at a time.
-          freeMe.activate();
-          freeMe.destroy();
-          continue;
-        }
-
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-
-      }
-
-      // Wait until there's a free one
-      if (maxConnections > 0 && inUseConnections > maxConnections-1)
-      {
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-      }
-
-      // A null return means that there is no existing pooled connection that matches, and the  caller is free to create a new connection
-      ThrottledConnection rval = getPoolConnection();
-      if (rval == null)
-        return null;
-
-      // It's okay to call activate since we guarantee that only one thread is trying to grab
-      // a connection at a time.
-      rval.activate();
-      //sanityCheck();
-
-      if (!rval.matches(binNames,protocol,server,port,authentication,trustStoreString,
-        proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword))
-      {
-        // Destroy old connection.  That should free up space for a new creation.
-        rval.destroy();
-        // Return null to indicate that we can create a new connection now
-        return null;
-      }
-
-      // Existing entry matched.  Activate and return it.
-      return rval;
-    }
-
-    /** Note a new time for connection fetch for this pool.
-    *@param currentTime is the time the fetch was started.
-    */
-    public synchronized void setLastFetchTime(long currentTime)
-    {
-      if (currentTime > lastFetchTime)
-        lastFetchTime = currentTime;
-    }
-
-    /** Get the last fetch time.
-    *@return the time.
-    */
-    public synchronized long getLastFetchTime()
-    {
-      return lastFetchTime;
-    }
-
-    /** Count connections that are in use.
-    *@return connections that are in use.
-    */
-    public synchronized int countConnections()
-    {
-      return freePool.size() + inUseConnections;
-    }
-
-    /** Flush any idle connections.
-    *@return true if the connection bin is now, in fact, empty.
-    */
-    public synchronized boolean flushIdleConnections(long idleTimeout)
-    {
-      //sanityCheck();
-
-      // We have to time out the pool connections.  When there are no pool connections
-      // left, AND the in-use counts are zero, we can delete the whole thing.
-      Iterator iter = freePool.keySet().iterator();
-      while (iter.hasNext())
-      {
-        ThrottledConnection tc = (ThrottledConnection)iter.next();
-        if (tc.flushIdleConnections(idleTimeout))
-        {
-          // Can delete this connection, since it timed out.
-          tc.activate();
-          tc.destroy();
-          iter = freePool.keySet().iterator();
-        }
-      }
-
-      //sanityCheck();
-
-      return (freePool.size() == 0 && inUseConnections == 0);
-    }
-
-    /** Grab a connection from the current pool.  This does not remove the connection from the pool;
-    * it just sets it up so that later methods can do that.
-    */
-    protected ThrottledConnection getPoolConnection()
-    {
-      if (freePool.size() == 0)
-        return null;
-      Iterator iter = freePool.keySet().iterator();
-      ThrottledConnection rval = (ThrottledConnection)iter.next();
-      return rval;
-    }
-
-    /** Check if a connection exists in the pool already.
-    */
-    protected boolean existsInPool(ThrottledConnection tc)
-    {
-      return freePool.get(tc) != null;
-    }
-
-    public synchronized void sanityCheck()
-    {
-      // Make sure all the connections in the current pool in fact have a reference to this bin.
-      Iterator iter = freePool.keySet().iterator();
-      while (iter.hasNext())
-      {
-        ThrottledConnection tc = (ThrottledConnection)iter.next();
-        tc.mustHaveReference(this);
-      }
-    }
-
-  }
-
-  /** Throttles for a bin.
-  * An instance of this class keeps track of the information needed to bandwidth throttle access
-  * to a url belonging to a specific bin.
-  *
-  * In order to calculate
-  * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
-  * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
-  * window length is too long.
-  *
-  * One solution to this problem would be to keep a list of the individual fetches as records.  Then, we could
-  * "expire" a fetch by discarding the old record.  However, this is quite memory consumptive for all but the
-  * smallest intervals.
-  *
-  * Another, better, solution is to hook into the start and end of individual fetches.  These will, presumably, occur
-  * at the fastest possible rate without long pauses spent doing something else.  The only complication is that
-  * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
-  * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
-  * than keep records around.  The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
-  * acceptable.
-  *
-  * Some notes on the algorithms used to limit server bandwidth impact
-  * ==================================================================
-  *
-  * In a single connection case, the algorithm we'd want to use works like this.  On the first chunk of a series,
-  * the total length of time and the number of bytes are recorded.  Then, prior to each subsequent chunk, a calculation
-  * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
-  * access as a way of estimating how long it will take to fetch those next n bytes.
-  *
-  * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
-  * and harder still to "hit the target", because simultaneous fetches will intrude.  The strategy is therefore:
-  *
-  * 1) The first chunk of any series should proceed without interference from other connections to the same server.
-  *    The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
-  *
-  * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".  That is, if other
-  *    connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
-  *    took.  Thus, by generating end-time estimates based on this number, we are actually being conservative and
-  *    using less server bandwidth.
-  *
-  * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
-  *    new chunks from other connections can start.
-  *
-  * NOTE WELL: This resource must be constrained globally, across all JVMs!
-  * To do that, we need an ILockManager to handle the global data for each bin.
-  */
-  protected static class ThrottleBin
-  {
-    /** This is the bin name which this throttle belongs to. */
-    protected final String binName;
-    /** This is the reference count for this bin (which records active references) */
-    protected volatile int refCount = 0;
-    /** The inverse rate estimate of the first fetch, in ms/byte */
-    protected double rateEstimate = 0.0;
-    /** Flag indicating whether a rate estimate is needed */
-    protected volatile boolean estimateValid = false;
-    /** Flag indicating whether rate estimation is in progress yet */
-    protected volatile boolean estimateInProgress = false;
-    /** The start time of this series */
-    protected long seriesStartTime = -1L;
-    /** Total actual bytes read in this series; this includes fetches in progress */
-    protected long totalBytesRead = -1L;
-
-    /** Constructor. */
-    public ThrottleBin(String binName)
-    {
-      this.binName = binName;
-    }
-
-    /** Get the bin name. */
-    public String getBinName()
-    {
-      return binName;
-    }
-
-    /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
-    * May wait until schedule allows.
-    */
-    public void beginFetch()
-      throws InterruptedException
-    {
-      synchronized (this)
-      {
-        if (refCount == 0)
-        {
-          // Now, reset bandwidth throttling counters
-          estimateValid = false;
-          rateEstimate = 0.0;
-          totalBytesRead = 0L;
-          estimateInProgress = false;
-          seriesStartTime = -1L;
-        }
-        refCount++;
-      }
-
-    }
-
-    /** Abort the fetch.
-    */
-    public void abortFetch()
-    {
-      synchronized (this)
-      {
-        refCount--;
-      }
-    }
-    
-    /** Note the start of an individual byte read of a specified size.  Call this method just before the
-    * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
-    */
-    public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
-      throws InterruptedException
-    {
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        while (estimateInProgress)
-          wait();
-        if (estimateValid == false)
-        {
-          seriesStartTime = currentTime;
-          estimateInProgress = true;
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-          // Exit early; this thread isn't going to do any waiting
-          return;
-        }
-      }
-
-      // It is possible for the following code to get interrupted.  If that happens,
-      // we have to unstick the threads that are waiting on the estimate!
-      boolean finished = false;
-      try
-      {
-        long waitTime = 0L;
-        synchronized (this)
-        {
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-
-          // Estimate the time this read will take, and wait accordingly
-          long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
-          // Figure out how long the total byte count should take, to meet the constraint
-          long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
-          // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
-          // current time.  But it can't be negative.
-          waitTime = (desiredEndTime - estimatedTime) - currentTime;
-        }
-
-        if (waitTime > 0L)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Performing a read wait on bin '"+binName+"' of "+
-            new Long(waitTime).toString()+" ms.");
-          ManifoldCF.sleep(waitTime);
-        }
-        finished = true;
-      }
-      finally
-      {
-        if (!finished)
-        {
-          abortRead();
-        }
-      }
-    }
-
-    /** Abort a read in progress.
-    */
-    public void abortRead()
-    {
-      synchronized (this)
-      {
-        if (estimateInProgress)
-        {
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-    
-    /** Note the end of an individual read from the server.  Call this just after an individual read completes.
-    * Pass the actual number of bytes read to the method.
-    */
-    public void endRead(int originalCount, int actualCount)
-    {
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
-        if (estimateInProgress)
-        {
-          if (actualCount == 0)
-            // Didn't actually get any bytes, so use 0.0
-            rateEstimate = 0.0;
-          else
-            rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
-          estimateValid = true;
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-
-    /** Note the end of a fetch operation.  Call this method just after the fetch completes.
-    */
-    public boolean endFetch()
-    {
-      synchronized (this)
-      {
-        refCount--;
-        return (refCount == 0);
-      }
-
-    }
-
-  }
-
   /** Throttled connections.  Each instance of a connection describes the bins to which it belongs,
   * along with the actual open connection itself, and the last time the connection was used. */
   protected static class ThrottledConnection implements IThrottledConnection
   {
-    /** The connection has resolved pointers to the ConnectionBin structures that manage pool
-    * maximums.  These are ONLY valid when the connection is actually in the pool. */
-    protected ConnectionBin[] connectionBinArray;
-    /** The connection has resolved pointers to the ThrottleBin structures that help manage
-    * bandwidth throttling. */
-    protected ThrottleBin[] throttleBinArray;
-    /** These are the bandwidth limits, per bin */
-    protected double[] minMillisecondsPerByte;
-    /** Is the connection considered "active"? */
-    protected boolean isActive;
-    /** If not active, this is when it went inactive */
-    protected long inactiveTime = 0L;
-
+    /** Connection pool */
+    protected final ConnectionPool myPool;
+    /** Fetch throttler */
+    protected final IFetchThrottler fetchThrottler;
     /** Protocol */
-    protected String protocol;
+    protected final String protocol;
     /** Server */
-    protected String server;
+    protected final String server;
     /** Port */
-    protected int port;
+    protected final int port;
     /** Authentication */
-    protected PageCredentials authentication;
-    /** Trust store */
-    protected IKeystoreManager trustStore;
-    /** Trust store string */
-    protected String trustStoreString;
+    protected final PageCredentials authentication;
+
+    /** This is when the connection will expire.  Only valid if connection is in the pool. */
+    protected long expireTime = -1L;
 
     /** The http connection manager.  The pool is of size 1.  */
     protected PoolingClientConnectionManager connManager = null;
@@ -1003,10 +294,13 @@
 
     /** Constructor.  Create a connection with a specific server and port, and
     * register it as active against all bins. */
-    public ThrottledConnection(String protocol, String server, int port, PageCredentials authentication,
-      javax.net.ssl.SSLSocketFactory httpsSocketFactory, String trustStoreString, ConnectionBin[] connectionBins,
+    public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler,
+      String protocol, String server, int port, PageCredentials authentication,
+      javax.net.ssl.SSLSocketFactory httpsSocketFactory,
       String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     {
+      this.myPool = myPool;
+      this.fetchThrottler = fetchThrottler;
       this.proxyHost = proxyHost;
       this.proxyPort = proxyPort;
       this.proxyAuthDomain = proxyAuthDomain;
@@ -1017,168 +311,21 @@
       this.port = port;
       this.authentication = authentication;
       this.httpsSocketFactory = httpsSocketFactory;
-      this.trustStoreString = trustStoreString;
-      this.connectionBinArray = connectionBins;
-      this.throttleBinArray = new ThrottleBin[connectionBins.length];
-      this.minMillisecondsPerByte = new double[connectionBins.length];
-      this.isActive = true;
-      int i = 0;
-      while (i < connectionBins.length)
-      {
-        connectionBins[i].noteConnectionCreation();
-        // We don't keep throttle bin references around, since these are transient
-        throttleBinArray[i] = null;
-        minMillisecondsPerByte[i] = 0.0;
-        i++;
-      }
-
-
     }
 
-    public void mustHaveReference(ConnectionBin cb)
-    {
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        if (cb == connectionBinArray[i])
-          return;
-        i++;
-      }
-      String msg = "Connection bin "+cb.toString()+" owns connection "+this.toString()+" for "+protocol+server+":"+port+
-        " but there is no back reference!";
-      Logging.connectors.error(msg);
-      System.out.println(msg);
-      new Exception(msg).printStackTrace();
-      System.exit(3);
-      //throw new RuntimeException(msg);
-    }
-
-    /** See if this instances matches a given server and port. */
-    public boolean matches(ConnectionBin[] bins, String protocol, String server, int port, PageCredentials authentication,
-      String trustStoreString, String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
-    {
-      if (this.trustStoreString == null || trustStoreString == null)
-      {
-        if (this.trustStoreString != trustStoreString)
-          return false;
-      }
-      else
-      {
-        if (!this.trustStoreString.equals(trustStoreString))
-          return false;
-      }
-
-      if (this.authentication == null || authentication == null)
-      {
-        if (this.authentication != authentication)
-          return false;
-      }
-      else
-      {
-        if (!this.authentication.equals(authentication))
-          return false;
-      }
-
-      if (this.proxyHost == null || proxyHost == null)
-      {
-        if (this.proxyHost != proxyHost)
-          return false;
-      }
-      else
-      {
-        if (!this.proxyHost.equals(proxyHost))
-          return false;
-        if (this.proxyAuthDomain == null || proxyAuthDomain == null)
-        {
-          if (this.proxyAuthDomain != proxyAuthDomain)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthDomain.equals(proxyAuthDomain))
-            return false;
-        }
-        if (this.proxyAuthUsername == null || proxyAuthUsername == null)
-        {
-          if (this.proxyAuthUsername != proxyAuthUsername)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthUsername.equals(proxyAuthUsername))
-            return false;
-        }
-        if (this.proxyAuthPassword == null || proxyAuthPassword == null)
-        {
-          if (this.proxyAuthPassword != proxyAuthPassword)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthPassword.equals(proxyAuthPassword))
-            return false;
-        }
-      }
-      
-      if (this.proxyPort != proxyPort)
-        return false;
-      
-      
-      if (this.connectionBinArray.length != bins.length || !this.protocol.equals(protocol) || !this.server.equals(server) || this.port != port)
-        return false;
-      
-      int i = 0;
-      while (i < bins.length)
-      {
-        if (connectionBinArray[i] != bins[i])
-          return false;
-        i++;
-      }
-      return true;
-    }
-
-    /** Activate the connection. */
-    public void activate()
-    {
-      isActive = true;
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        connectionBinArray[i++].takeFromPool(this);
-      }
-    }
-
-    /** Set up the connection.  This allows us to feed all bins the correct bandwidth limit info.
+    /** Check whether the connection has expired.
+    *@param currentTime is the current time to use to judge if a connection has expired.
+    *@return true if the connection has expired, and should be closed.
     */
-    public void setup(ThrottleDescription description)
+    @Override
+    public boolean hasExpired(long currentTime)
     {
-      // Go through all bins, and set up the current limits.
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        String binName = connectionBinArray[i].getBinName();
-        minMillisecondsPerByte[i] = description.getMinimumMillisecondsPerByte(binName);
-        i++;
-      }
-    }
-
-    /** Do periodic bookkeeping.
-    *@return true if the connection is no longer valid, and can be removed. */
-    public boolean flushIdleConnections(long idleTimeout)
-    {
-      if (isActive)
-        return false;
-
       if (connManager != null)
       {
         connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS);
         connManager.closeExpiredConnections();
-        // Need to determine if there's a valid connection in the connection manager still, or if it is empty.
-        //return connManager.getConnectionsInPool() == 0;
-        return true;
       }
-      else
-        return true;
+      return (currentTime > expireTime);
     }
 
     /** Log the fetch of a number of bytes, from within a stream. */
@@ -1187,65 +334,10 @@
       fetchCounter += (long)count;
     }
 
-    /** Begin a read operation, from within a stream */
-    public void beginRead(int len)
-      throws InterruptedException
-    {
-      // Consult with throttle bins
-      int lastOneDone = 0;
-      try
-      {
-        for (int i = 0; i < throttleBinArray.length; i++)
-        {
-          throttleBinArray[i].beginRead(len,minMillisecondsPerByte[i]);
-          lastOneDone = i + 1;
-        }
-      }
-      finally
-      {
-        if (lastOneDone != throttleBinArray.length)
-        {
-          for (int i = 0; i < lastOneDone; i++)
-          {
-            throttleBinArray[i].abortRead();
-          }
-        }
-      }
-    }
-
-    /** End a read operation, from within a stream */
-    public void endRead(int origLen, int actualAmt)
-    {
-      // Consult with throttle bins
-      Throwable e = null;
-      for (int i = 0; i < throttleBinArray.length; i++)
-      {
-        try
-        {
-          throttleBinArray[i].endRead(origLen,actualAmt);
-        }
-        catch (Throwable e2)
-        {
-          e = e2;
-        }
-      }
-      if (e != null)
-      {
-        if (e instanceof RuntimeException)
-          throw (RuntimeException)e;
-        else if (e instanceof Error)
-          throw (Error)e;
-        else
-          throw new RuntimeException("Unknown exception: " + e.getMessage(),e);
-      }
-    }
-
     /** Destroy the connection forever */
-    protected void destroy()
+    @Override
+    public void destroy()
     {
-      if (isActive == false)
-        throw new RuntimeException("Trying to destroy an inactive connection");
-
       // Kill the actual connection object.
       if (connManager != null)
       {
@@ -1253,13 +345,6 @@
         connManager = null;
       }
 
-      // Call all the bins this belongs to, and decrement the in-use count.
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        ConnectionBin cb = connectionBinArray[i++];
-        cb.noteConnectionDestruction();
-      }
     }
 
 
@@ -1273,43 +358,15 @@
     {
       this.fetchType = fetchType;
       this.fetchCounter = 0L;
-      int lastCreated = 0;
       try
       {
-        // Find or create the needed throttle bins
-        for (int i = 0; i < throttleBinArray.length; i++)
-        {
-          // Access the bins as we need them, and drop them when ref count goes to zero
-          String binName = connectionBinArray[i].getBinName();
-          ThrottleBin tb;
-          synchronized (throttleBins)
-          {
-            tb = throttleBins.get(binName);
-            if (tb == null)
-            {
-              tb = new ThrottleBin(binName);
-              throttleBins.put(binName,tb);
-            }
-            tb.beginFetch();
-          }
-          throttleBinArray[i] = tb;
-          lastCreated = i + 1;
-        }
+        if (fetchThrottler.obtainFetchDocumentPermission() == false)
+          throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()");
       }
       catch (InterruptedException e)
       {
         throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
       }
-      finally
-      {
-        if (lastCreated != throttleBinArray.length)
-        {
-          for (int i = 0; i < lastCreated; i++)
-          {
-            throttleBinArray[i].abortFetch();
-          }
-        }
-      }
     }
 
     /** Execute the fetch and get the return code.  This method uses the
@@ -1600,7 +657,7 @@
       //httpClient.setCookieStore(cookieStore);
       
       // Create the thread
-      methodThread = new ExecuteMethodThread(this, httpClient, fetchMethod, cookieStore);
+      methodThread = new ExecuteMethodThread(this, fetchThrottler, httpClient, fetchMethod, cookieStore);
       try
       {
         methodThread.start();
@@ -1893,17 +950,6 @@
           methodThread.abort();
 
         long endTime = System.currentTimeMillis();
-        int i = 0;
-        while (i < throttleBinArray.length)
-        {
-          synchronized (throttleBins)
-          {
-            if (throttleBinArray[i].endFetch())
-              throttleBins.remove(throttleBinArray[i].getBinName());
-          }
-          throttleBinArray[i] = null;
-          i++;
-        }
 
         activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH,
           new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -1945,44 +991,13 @@
 
     }
 
-    /** Close the connection.  Call this to end this server connection.
+    /** Close the connection.  Call this to return the connection to its pool.
     */
     @Override
     public void close()
-      throws ManifoldCFException
     {
-      synchronized (poolLock)
-      {
-        // Verify that all the connections that exist are in fact sane
-        synchronized (connectionBins)
-        {
-          for (String connectionName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(connectionName);
-            //cb.sanityCheck();
-          }
-        }
-
-        // Leave the connection alive, but mark it as inactive, and return it to the appropriate pools.
-        isActive = false;
-        inactiveTime = System.currentTimeMillis();
-        int i = 0;
-        while (i < connectionBinArray.length)
-        {
-          connectionBinArray[i++].addToPool(this);
-        }
-        // Verify that all the connections that exist are in fact sane
-        synchronized (connectionBins)
-        {
-          for (String connectionName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(connectionName);
-            //cb.sanityCheck();
-          }
-        }
-        // Wake up everything waiting on the pool lock
-        poolLock.notifyAll();
-      }
+      expireTime = System.currentTimeMillis() + idleTimeout;
+      myPool.release(this);
     }
     
     protected void handleHTTPException(HttpException e, String activity)
@@ -2048,17 +1063,18 @@
   */
   protected static class ThrottledInputstream extends InputStream
   {
-    /** Stream throttling parameters */
-    protected double minimumMillisecondsPerBytePerServer;
+    /** Stream throttler */
+    protected final IStreamThrottler streamThrottler;
     /** The throttled connection we belong to */
-    protected ThrottledConnection throttledConnection;
+    protected final ThrottledConnection throttledConnection;
     /** The stream we are wrapping. */
-    protected InputStream inputStream;
+    protected final InputStream inputStream;
 
     /** Constructor.
     */
-    public ThrottledInputstream(ThrottledConnection connection, InputStream is)
+    public ThrottledInputstream(IStreamThrottler streamThrottler, ThrottledConnection connection, InputStream is)
     {
+      this.streamThrottler = streamThrottler;
       this.throttledConnection = connection;
       this.inputStream = is;
     }
@@ -2126,7 +1142,8 @@
     {
       try
       {
-        throttledConnection.beginRead(len);
+        if (streamThrottler.obtainReadPermission(len) == false)
+          throw new IllegalStateException("Unexpected result calling obtainReadPermission()");
         int amt = 0;
         try
         {
@@ -2136,10 +1153,10 @@
         finally
         {
           if (amt == -1)
-            throttledConnection.endRead(len,0);
+            streamThrottler.releaseReadPermission(len,0);
           else
           {
-            throttledConnection.endRead(len,amt);
+            streamThrottler.releaseReadPermission(len,amt);
             throttledConnection.logFetchCount(amt);
           }
         }
@@ -2226,6 +1243,10 @@
       {
         Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e);
       }
+      finally
+      {
+        streamThrottler.closeStream();
+      }
     }
 
   }
@@ -2298,6 +1319,8 @@
   {
     /** The connection */
     protected final ThrottledConnection theConnection;
+    /** The fetch throttler */
+    protected final IFetchThrottler fetchThrottler;
     /** Client and method, all preconfigured */
     protected final AbstractHttpClient httpClient;
     protected final HttpRequestBase executeMethod;
@@ -2317,12 +1340,13 @@
 
     protected Throwable generalException = null;
     
-    public ExecuteMethodThread(ThrottledConnection theConnection,
+    public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
       AbstractHttpClient httpClient, HttpRequestBase executeMethod, CookieStore cookieStore)
     {
       super();
       setDaemon(true);
       this.theConnection = theConnection;
+      this.fetchThrottler = fetchThrottler;
       this.httpClient = httpClient;
       this.executeMethod = executeMethod;
       this.cookieStore = cookieStore;
@@ -2419,7 +1443,7 @@
                   bodyStream = response.getEntity().getContent();
                   if (bodyStream != null)
                   {
-                    bodyStream = new ThrottledInputstream(theConnection,bodyStream);
+                    bodyStream = new ThrottledInputstream(fetchThrottler.createFetchStream(),theConnection,bodyStream);
                     if (gzip)
                       bodyStream = new GZIPInputStream(bodyStream);
                     else if (deflate)
@@ -2741,4 +1765,254 @@
 
   }
 
+  /** Connection pool key */
+  protected static class ConnectionPoolKey
+  {
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final String trustStoreString;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+    
+    public ConnectionPoolKey(String protocol,
+      String server, int port, PageCredentials authentication,
+      String trustStoreString, String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.trustStoreString = trustStoreString;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public int hashCode()
+    {
+      return protocol.hashCode() +
+        server.hashCode() +
+        (port * 31) +
+        ((authentication==null)?0:authentication.hashCode()) +
+        ((trustStoreString==null)?0:trustStoreString.hashCode()) +
+        ((proxyHost==null)?0:proxyHost.hashCode()) +
+        (proxyPort * 29) +
+        ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) +
+        ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) +
+        ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode());
+    }
+    
+    public boolean equals(Object o)
+    {
+      if (!(o instanceof ConnectionPoolKey))
+        return false;
+      ConnectionPoolKey other = (ConnectionPoolKey)o;
+      if (!server.equals(other.server) ||
+        port != other.port)
+        return false;
+      if (authentication == null || other.authentication == null)
+      {
+        if (authentication != other.authentication)
+          return false;
+      }
+      else
+      {
+        if (!authentication.equals(other.authentication))
+          return false;
+      }
+      if (trustStoreString == null || other.trustStoreString == null)
+      {
+        if (trustStoreString != other.trustStoreString)
+          return false;
+      }
+      else
+      {
+        if (!trustStoreString.equals(other.trustStoreString))
+          return false;
+      }
+      if (proxyHost == null || other.proxyHost == null)
+      {
+        if (proxyHost != other.proxyHost)
+          return false;
+      }
+      else
+      {
+        if (!proxyHost.equals(other.proxyHost))
+          return false;
+      }
+      if (proxyPort != other.proxyPort)
+        return false;
+      if (proxyAuthDomain == null || other.proxyAuthDomain == null)
+      {
+        if (proxyAuthDomain != other.proxyAuthDomain)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthDomain.equals(other.proxyAuthDomain))
+          return false;
+      }
+      if (proxyAuthUsername == null || other.proxyAuthUsername == null)
+      {
+        if (proxyAuthUsername != other.proxyAuthUsername)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthUsername.equals(other.proxyAuthUsername))
+          return false;
+      }
+      if (proxyAuthPassword == null || other.proxyAuthPassword == null)
+      {
+        if (proxyAuthPassword != other.proxyAuthPassword)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthPassword.equals(other.proxyAuthPassword))
+          return false;
+      }
+      return true;
+    }
+  }
+  
+  /** Each connection pool has identical connections we can draw on.
+  */
+  protected static class ConnectionPool
+  {
+    /** Throttler */
+    protected final IConnectionThrottler connectionThrottler;
+    
+    // If we need to create a connection, these are what we use
+    
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final javax.net.ssl.SSLSocketFactory baseFactory;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+
+    /** The actual pool of connections */
+    protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>();
+    
+    public ConnectionPool(IConnectionThrottler connectionThrottler,
+      String protocol,
+      String server, int port, PageCredentials authentication,
+      javax.net.ssl.SSLSocketFactory baseFactory,
+      String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.connectionThrottler = connectionThrottler;
+      
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.baseFactory = baseFactory;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public IThrottledConnection grab()
+      throws InterruptedException
+    {
+      // Wait for a connection
+      int result = connectionThrottler.waitConnectionAvailable();
+      if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+      {
+        // We are guaranteed to have a connection in the pool, unless there's a coding error.
+        synchronized (connections)
+        {
+          return connections.remove(connections.size()-1);
+        }
+      }
+      else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+      {
+        return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
+          protocol,server,port,authentication,baseFactory,
+          proxyHost,proxyPort,
+          proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+      }
+      else
+        throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+    }
+    
+    public void release(IThrottledConnection connection)
+    {
+      if (connectionThrottler.noteReturnedConnection())
+      {
+        // Destroy this connection
+        connection.destroy();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      else
+      {
+        // Return to pool
+        synchronized (connections)
+        {
+          connections.add(connection);
+        }
+        connectionThrottler.noteConnectionReturnedToPool();
+      }
+    }
+    
+    public void flushIdleConnections()
+    {
+      long currentTime = System.currentTimeMillis();
+      // First, remove connections that are over the quota
+      while (connectionThrottler.checkDestroyPooledConnection())
+      {
+        // Destroy the oldest ones first
+        IThrottledConnection connection;
+        synchronized (connections)
+        {
+          connection = connections.remove(0);
+        }
+        connection.close();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      // Now, get rid of expired connections
+      while (true)
+      {
+        boolean expired;
+        synchronized (connections)
+        {
+          expired = connections.size() > 0 && connections.get(0).hasExpired(currentTime);
+        }
+        if (!expired)
+          break;
+        // We found an expired connection!  Now tell the throttler that, and see if it agrees.
+        if (connectionThrottler.checkExpireConnection())
+        {
+          // Remove a connection from the pool, and destroy it.
+          // It's not guaranteed to be an expired one, but that's a rare occurrence, we expect.
+          IThrottledConnection connection;
+          synchronized (connections)
+          {
+            connection = connections.remove(0);
+          }
+          connection.destroy();
+          connectionThrottler.noteConnectionDestroyed();
+        }
+        else
+          break;
+      }
+    }
+    
+  }
 }
diff --git a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
index b358da5..64c78d9 100644
--- a/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
+++ b/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
@@ -160,6 +160,8 @@
   protected int connectionTimeoutMilliseconds = 60000;
   /** Socket timeout, milliseconds */
   protected int socketTimeoutMilliseconds = 300000;
+  /** Throttle group name */
+  protected String throttleGroupName = null;
 
   // Canonicalization enabling/disabling.  Eventually this will probably need to be by regular expression.
 
@@ -354,6 +356,9 @@
     {
       String x;
 
+      // Either set this from the connection name, or just have one.  Right now, we have one.
+      String throttleGroupName = "";
+      
       String emailAddress = params.getParameter(WebcrawlerConfig.PARAMETER_EMAIL);
       if (emailAddress == null)
         throw new ManifoldCFException("Missing email address");
@@ -406,7 +411,7 @@
   public void poll()
     throws ManifoldCFException
   {
-    ThrottledFetcher.flushIdleConnections();
+    ThrottledFetcher.flushIdleConnections(currentContext);
   }
 
   /** Check status of connection.
@@ -425,6 +430,7 @@
   public void disconnect()
     throws ManifoldCFException
   {
+    throttleGroupName = null;
     throttleDescription = null;
     credentialsDescription = null;
     trustsDescription = null;
@@ -711,7 +717,9 @@
 
                   // Prepare to perform the fetch, and decide what to do with the document.
                   //
-                  IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,ipAddress,port,
+                  IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,
+                    throttleGroupName,
+                    protocol,ipAddress,port,
                     credential,trustStore,throttleDescription,binNames,connectionLimit,
                     proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
                   try
@@ -5126,7 +5134,8 @@
       // We've successfully obtained a lock on reading robots for this server!  Now, guarantee that we'll free it, by instantiating a try/finally
       try
       {
-        IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,hostIPAddress,port,credential,
+        IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,throttleGroupName,
+          protocol,hostIPAddress,port,credential,
           trustStore,throttleDescription,binNames,connectionLimit,
           proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
         try
diff --git a/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java b/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
index 4b495a6..531c3d0 100644
--- a/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
+++ b/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
@@ -55,6 +55,8 @@
       ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
       // Get the output connector pool handle
       IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+      // Throttler subsystem
+      IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
       
       /* For HSQLDB debugging...
       IDBInterface database = DBInterfaceFactory.make(threadContext,
@@ -88,6 +90,9 @@
           
           // Do the cleanup
           outputConnectorPool.pollAllConnectors();
+          // Poll connection bins
+          throttleGroups.poll();
+          // Expire objects
           cacheManager.expireObjects(System.currentTimeMillis());
           
           // Sleep for the retry interval.
diff --git a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java
new file mode 100644
index 0000000..ad99a7e
--- /dev/null
+++ b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.apiservice;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors.  The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Constructor.
+  */
+  public IdleCleanupThread()
+    throws ManifoldCFException
+  {
+    super();
+    setName("Idle cleanup thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    Logging.root.debug("Start up idle cleanup thread");
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      // Get the cache handle.
+      ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+      
+      IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+      IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+      IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+      IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+      
+      IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          // Do the cleanup
+          repositoryConnectorPool.pollAllConnectors();
+          outputConnectorPool.pollAllConnectors();
+          authorityConnectorPool.pollAllConnectors();
+          mappingConnectorPool.pollAllConnectors();
+          
+          throttleGroups.poll();
+          
+          cacheManager.expireObjects(System.currentTimeMillis());
+          
+          // Sleep for the retry interval.
+          ManifoldCF.sleep(5000L);
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("API service ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("API service could not start - shutting down");
+      Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+
+  }
+
+}
diff --git a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
index 1916cfa..cfe250e 100644
--- a/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
+++ b/framework/api-service/src/main/java/org/apache/manifoldcf/apiservice/ServletListener.java
@@ -29,11 +29,16 @@
 {
   public static final String _rcsid = "@(#)$Id$";
 
+  protected IdleCleanupThread idleCleanupThread = null;
+  
   public void contextInitialized(ServletContextEvent sce)
   {
     try
     {
-      ManifoldCF.initializeEnvironment(ThreadContextFactory.make());
+      IThreadContext threadContext = ThreadContextFactory.make();
+      ManifoldCF.initializeEnvironment(threadContext);
+      idleCleanupThread = new IdleCleanupThread();
+      idleCleanupThread.start();
     }
     catch (ManifoldCFException e)
     {
@@ -43,7 +48,21 @@
   
   public void contextDestroyed(ServletContextEvent sce)
   {
-    ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+    try
+    {
+      while (true)
+      {
+        if (idleCleanupThread == null)
+          break;
+        idleCleanupThread.interrupt();
+        if (!idleCleanupThread.isAlive())
+          idleCleanupThread = null;
+      }
+    }
+    finally
+    {
+      ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+    }
   }
 
 }
diff --git a/framework/build.xml b/framework/build.xml
index 4b17af1..262d99c 100644
--- a/framework/build.xml
+++ b/framework/build.xml
@@ -1497,6 +1497,7 @@
             <test name="org.apache.manifoldcf.core.common.DateTest" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.fuzzyml.TestFuzzyML" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.lockmanager.TestZooKeeperLocks" todir="test-output"/>
+            <test name="org.apache.manifoldcf.core.throttler.TestThrottler" todir="test-output"/>
 
         </junit>
     </target>
diff --git a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java
new file mode 100644
index 0000000..8e1d50d
--- /dev/null
+++ b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.combinedservice;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors.  The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Constructor.
+  */
+  public IdleCleanupThread()
+    throws ManifoldCFException
+  {
+    super();
+    setName("Idle cleanup thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    Logging.root.debug("Start up idle cleanup thread");
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      // Get the cache handle.
+      ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+      
+      IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+      IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+      IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+      IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+      
+      IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          // Do the cleanup
+          repositoryConnectorPool.pollAllConnectors();
+          outputConnectorPool.pollAllConnectors();
+          authorityConnectorPool.pollAllConnectors();
+          mappingConnectorPool.pollAllConnectors();
+          
+          throttleGroups.poll();
+          
+          cacheManager.expireObjects(System.currentTimeMillis());
+          
+          // Sleep for the retry interval.
+          ManifoldCF.sleep(5000L);
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("Combined service ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("Combined service could not start - shutting down");
+      Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+
+  }
+
+}
diff --git a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
index b7b5300..b091414 100644
--- a/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
+++ b/framework/combined-service/src/main/java/org/apache/manifoldcf/combinedservice/ServletListener.java
@@ -31,6 +31,7 @@
   public static final String _rcsid = "@(#)$Id$";
 
   protected static AgentsThread agentsThread = null;
+  protected IdleCleanupThread idleCleanupThread = null;
 
   public void contextInitialized(ServletContextEvent sce)
   {
@@ -44,8 +45,14 @@
       ManifoldCF.registerThisAgent(tc);
       ManifoldCF.reregisterAllConnectors(tc);
 
+      // This is for the UI and API components
+      idleCleanupThread = new IdleCleanupThread();
+      idleCleanupThread.start();
+
+      // This is for the agents process
       agentsThread = new AgentsThread(ManifoldCF.getProcessID());
       agentsThread.start();
+
     }
     catch (ManifoldCFException e)
     {
@@ -65,6 +72,15 @@
         agentsThread = null;
         AgentsDaemon.clearAgentsShutdownSignal(tc);
       }
+      
+      while (true)
+      {
+        if (idleCleanupThread == null)
+          break;
+        idleCleanupThread.interrupt();
+        if (!idleCleanupThread.isAlive())
+          idleCleanupThread = null;
+      }
     }
     catch (InterruptedException e)
     {
diff --git a/framework/core/pom.xml b/framework/core/pom.xml
index 1c365a4..985e2f5 100644
--- a/framework/core/pom.xml
+++ b/framework/core/pom.xml
@@ -92,5 +92,23 @@
       <artifactId>zookeeper</artifactId>
       <version>${zookeeper.version}</version>
     </dependency>
+    <dependency>
+      <groupId>postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>${hsqldb.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <version>${derby.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java b/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
index 19e8ca7..27037be 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
@@ -572,6 +572,8 @@
         // Compute MaximumTarget
         SumClass sumClass = new SumClass(serviceName);
         lockManager.scanServiceData(serviceTypeName, sumClass);
+        //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+        
         int numServices = sumClass.getNumServices();
         if (numServices == 0)
           return;
@@ -614,11 +616,13 @@
         {
           // We want a fast ramp up, so make this proportional to globalMax
           int increment = globalMax >> 2;
-          if (increment < 0)
+          if (increment == 0)
             increment = 1;
           optimalTarget += increment;
         }
         
+        //System.out.println(serviceTypeName+":maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
         // Now compute actual target
         int target = maximumTarget;
         if (target > fairTarget)
@@ -626,6 +630,7 @@
         if (target > optimalTarget)
           target = optimalTarget;
         
+        //System.out.println(serviceTypeName+":Picking target="+target+"; localInUse="+localInUse);
         // Write these values to the service data variables.
         // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
         // So, that's why we have a write lock around the pool calculations.
@@ -635,6 +640,7 @@
         // Now, update our localMax
         if (target == localMax)
           return;
+        //System.out.println(serviceTypeName+":Updating target: "+target);
         // Compute the number of instances in use locally
         localInUse = localMax - numFree;
         localMax = target;
@@ -647,6 +653,35 @@
       {
         lockManager.leaveWriteLock(targetCalcLockName);
       }
+      
+      // Finally, free pooled instances in excess of target
+      while (stack.size() > 0 && stack.size() > numFree)
+      {
+        // Try to find a connector instance that is not actually connected.
+        // These are likely to be at the front of the queue, since those are the
+        // oldest.
+        int j;
+        for (j = 0; j < stack.size(); j++)
+        {
+          if (!stack.get(j).isConnected())
+            break;
+        }
+        T rc;
+        if (j == stack.size())
+          rc = stack.remove(stack.size()-1);
+        else
+          rc = stack.remove(j);
+        rc.setThreadContext(threadContext);
+        try
+        {
+          rc.disconnect();
+        }
+        finally
+        {
+          rc.clearThreadContext();
+        }
+      }
+
     }
 
     /** Flush unused connectors.
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
new file mode 100644
index 0000000..1d76f00
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
@@ -0,0 +1,112 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+import java.util.*;
+
+/** An IConnectionThrottler object is not thread-local.  It gates connection
+* creation and pool management.
+* The underlying model is a pool of connections.  A connection gets pulled off the pool and
+* used to perform a fetch.  If there are insufficient connections in the pool, and there is
+* sufficient capacity to create a new connection, a connection will be created instead.
+* When the fetch is done, the connection is returned, and then there is a decision whether
+* or not to put the connection back into the pool, or to destroy it.  Finally, the pool is
+* periodically evaluated, and connections may be destroyed if either they have expired,
+* or the allocated connections are still over capacity.
+*
+* This object does not in itself contain a connection pool - but it is intended to assist
+* in the management of that pool.  Specifically, it tracks connections that are in the
+* pool, and connections that are handed out for use, and performs ALL the waiting needed
+* due to the pool being empty and/or the number of active connections being at or over
+* the quota.
+*/
+public interface IConnectionThrottler
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // For grabbing a connection for use
+  
+  /** Get the connection from the pool */
+  public final static int CONNECTION_FROM_POOL = 0;
+  /** Create a connection */
+  public final static int CONNECTION_FROM_CREATION = 1;
+  /** Pool shutting down */
+  public final static int CONNECTION_FROM_NOWHERE = -1;
+  
+  /** Get permission to grab a connection for use.  If this object believes there is a connection
+  * available in the pool, it will update its pool size variable and return   If not, this method
+  * evaluates whether a new connection should be created.  If neither condition is true, it
+  * waits until a connection is available.
+  *@return whether to take the connection from the pool, or create one, or whether the
+  * throttler is being shut down.
+  */
+  public int waitConnectionAvailable()
+    throws InterruptedException;
+  
+  /** For a new connection, obtain the fetch throttler to use for the connection.
+  * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+  * the calling code is expected to create a connection using the result of this method.
+  *@return the fetch throttler for a new connection.
+  */
+  public IFetchThrottler getNewConnectionFetchThrottler();
+  
+  /** This method indicates whether a formerly in-use connection should be placed back
+  * in the pool or destroyed.
+  *@return true if the connection should not be put into the pool but should instead
+  *  simply be destroyed.  If true is returned, the caller MUST call noteConnectionDestroyed()
+  *  after the connection is destroyed in order for the bookkeeping to work.  If false
+  *  is returned, the caller MUST call noteConnectionReturnedToPool() after the connection
+  *  is returned to the pool.
+  */
+  public boolean noteReturnedConnection();
+  
+  /** This method calculates whether a connection should be taken from the pool and destroyed
+  /* in order to meet quota requirements.  If this method returns
+  /* true, you MUST remove a connection from the pool, and you MUST call
+  /* noteConnectionDestroyed() afterwards.
+  *@return true if a pooled connection should be destroyed.  If true is returned, the
+  * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+  */
+  public boolean checkDestroyPooledConnection();
+  
+  /** Connection expiration is tricky, because even though a connection may be identified as
+  * being expired, at the very same moment it could be handed out in another thread.  So there
+  * is a natural race condition present.
+  * The way the connection throttler deals with that is to allow the caller to reserve a connection
+  * for expiration.  This must be called BEFORE the actual identified connection is removed from the
+  * connection pool.  If the value returned by this method is "true", then a connection MUST be removed
+  * from the pool and destroyed, whether or not the identified connection is actually still available for
+  * destruction or not.
+  *@return true if a connection from the pool can be expired.  If true is returned, noteConnectionDestruction()
+  *  MUST be called once the connection has actually been destroyed.
+  */
+  public boolean checkExpireConnection();
+  
+  /** Note that a connection has been returned to the pool.  Call this method after a connection has been
+  * placed back into the pool and is available for use.
+  */
+  public void noteConnectionReturnedToPool();
+  
+  /** Note that a connection has been destroyed.  Call this method ONLY after noteReturnedConnection()
+  * or checkDestroyPooledConnection() returns true, AND the connection has been already
+  * destroyed.
+  */
+  public void noteConnectionDestroyed();
+  
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
new file mode 100644
index 0000000..91d51f4
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
@@ -0,0 +1,46 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** An IFetchThrottler object is meant to be used as part of a fetch cycle.  It is not
+* thread-local, and does not require access to a thread context.  It thus also does not
+* throw ManifoldCFExceptions.  It is thus suitable for use in background threads, etc.
+* These objects are typically created by IConnectionThrottler objects - they are not meant
+* to be created directly.
+*/
+public interface IFetchThrottler
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Get permission to fetch a document.  This grants permission to start
+  * fetching a single document, within the connection that has already been
+  * granted permission that created this object.
+  *@return false if the throttler is being shut down.
+  */
+  public boolean obtainFetchDocumentPermission()
+    throws InterruptedException;
+  
+  /** Open a fetch stream.  When done (or aborting), call
+  * IStreamThrottler.closeStream() to note the completion of the document
+  * fetch activity.
+  *@return the stream throttler to use to throttle the actual data access.
+  */
+  public IStreamThrottler createFetchStream();
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
new file mode 100644
index 0000000..b8e1759
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
@@ -0,0 +1,50 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** An IConnectionThrottler object is meant to be embedded in an InputStream.  It is not
+* thread-local, and does not require access to a thread context.  It thus also does not
+* throw ManifoldCFExceptions.  It is thus suitable for use in background threads, etc.
+* These objects are typically created by IFetchThrottler objects - they are not meant
+* to be created directly.
+*/
+public interface IStreamThrottler
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
+  * The throttle group, bin names, etc are already known
+  * to this specific interface object, so it is unnecessary to include them here.
+  *@param byteCount is the number of bytes to get permissions to read.
+  *@return true if the wait took place as planned, or false if the system is being shut down.
+  */
+  public boolean obtainReadPermission(int byteCount)
+    throws InterruptedException;
+    
+  /** Note the completion of the read of a block of bytes.  Call this after
+  * obtainReadPermission() was successfully called, and bytes were successfully read.
+  *@param origByteCount is the originally requested number of bytes to get permissions to read.
+  *@param actualByteCount is the number of bytes actually read.
+  */
+  public void releaseReadPermission(int origByteCount, int actualByteCount);
+  
+  /** Note the stream being closed.
+  */
+  public void closeStream();
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
new file mode 100644
index 0000000..0e5df5d
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
@@ -0,0 +1,86 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+import java.util.*;
+
+/** An IThrottleGroups object is thread-local and creates a virtual pool
+* of connections to resources whose access needs to be throttled in number, 
+* rate of use, and byte rate.
+*/
+public interface IThrottleGroups
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Get all existing throttle groups for a throttle group type.
+  * The throttle group type typically describes a connector class, while the throttle group represents
+  * a namespace of bin names specific to that connector class.
+  *@param throttleGroupType is the throttle group type.
+  *@return the set of throttle groups for that group type.
+  */
+  public Set<String> getThrottleGroups(String throttleGroupType)
+    throws ManifoldCFException;
+  
+  /** Remove a throttle group.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  */
+  public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+    throws ManifoldCFException;
+  
+  /** Create or update a throttle group.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param throttleSpec is the desired throttle specification object.
+  */
+  public void createOrUpdateThrottleGroup(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+    throws ManifoldCFException;
+
+  /** Construct connection throttler for connections with specific bin names.  This object is meant to be embedded with a connection
+  * pool of similar objects, and used to gate the creation of new connections in that pool.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param binNames are the connection type bin names.
+  *@return the connection throttling object, or null if the pool is being shut down.
+  */
+  public IConnectionThrottler obtainConnectionThrottler(String throttleGroupType, String throttleGroup, String[] binNames)
+    throws ManifoldCFException;
+  
+  /** Poll periodically, to update cluster-wide statistics and allocation.
+  *@param throttleGroupType is the throttle group type to update.
+  */
+  public void poll(String throttleGroupType)
+    throws ManifoldCFException;
+
+  /** Poll periodically, to update ALL cluster-wide statistics and allocation.
+  */
+  public void poll()
+    throws ManifoldCFException;
+
+  /** Free all unused resources.
+  */
+  public void freeUnusedResources()
+    throws ManifoldCFException;
+  
+  /** Shut down throttler permanently.
+  */
+  public void destroy()
+    throws ManifoldCFException;
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
new file mode 100644
index 0000000..3b8cfcd
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
@@ -0,0 +1,44 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+
+/** An IThrottleSpec object describes what throttling criteria to apply
+* per bin.
+*/
+public interface IThrottleSpec
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Given a bin name, find the max open connections to use for that bin.
+  *@return Integer.MAX_VALUE if no limit found.
+  */
+  public int getMaxOpenConnections(String binName);
+
+  /** Look up minimum milliseconds per byte for a bin.
+  *@return 0.0 if no limit found.
+  */
+  public double getMinimumMillisecondsPerByte(String binName);
+
+  /** Look up minimum milliseconds for a fetch for a bin.
+  *@return 0 if no limit found.
+  */
+  public long getMinimumMillisecondsPerFetch(String binName);
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
new file mode 100644
index 0000000..96c4dc2
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
@@ -0,0 +1,50 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** Thread-local IThrottleGroups factory.
+*/
+public class ThrottleGroupsFactory
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // name to use in thread context pool of objects
+  private final static String objectName = "_ThrottleGroups_";
+
+  private ThrottleGroupsFactory()
+  {
+  }
+
+  /** Make a connection throttle handle.
+  *@param tc is the thread context.
+  *@return the handle.
+  */
+  public static IThrottleGroups make(IThreadContext tc)
+    throws ManifoldCFException
+  {
+    Object o = tc.get(objectName);
+    if (o == null || !(o instanceof IThrottleGroups))
+    {
+      o = new org.apache.manifoldcf.core.throttler.ThrottleGroups(tc);
+      tc.save(objectName,o);
+    }
+    return (IThrottleGroups)o;
+  }
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
index a5e23ef..90617e7 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
@@ -511,6 +511,63 @@
     zookeeperWatcher = null;
   }
   
+  public static String zooKeeperSafeName(String input)
+  {
+    // Escape "/" characters
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < input.length(); i++)
+    {
+      char x = input.charAt(i);
+      if (x == '/')
+        sb.append('\\').append('0');
+      else if (x == '\u007f')
+        sb.append('\\').append('1');
+      else if (x == '\\')
+        sb.append('\\').append('\\');
+      else if (x >= '\u0000' && x < '\u0020')
+        sb.append('\\').append(x + '\u0040');
+      else if (x >= '\u0080' && x < '\u00a0')
+        sb.append('\\').append(x + '\u0060' - '\u0080');
+      else
+        sb.append(x);
+    }
+    return sb.toString();
+  }
+
+  public static String zooKeeperDecodeSafeName(String input)
+  {
+    // Escape "/" characters
+    StringBuilder sb = new StringBuilder();
+    int i = 0;
+    while (i < input.length())
+    {
+      char x = input.charAt(i);
+      if (x == '\\')
+      {
+        i++;
+        if (i == input.length())
+          throw new RuntimeException("Supposedly safe zookeeper name is not properly encoded!!");
+        x = input.charAt(i);
+        if (x == '0')
+          sb.append('/');
+        else if (x == '1')
+          sb.append('\u007f');
+        else if (x == '\\')
+          sb.append('\\');
+        else if (x >= '\u0040' && x < '\u0060')
+          sb.append(x - '\u0040');
+        else if (x >= '\u0060' && x < '\u0080')
+          sb.append(x - '\u0060' + '\u0080');
+        else
+          throw new RuntimeException("Supposedly safe zookeeper name is not properly encoded!!");
+      }
+      else
+        sb.append(x);
+      i++;
+    }
+    return sb.toString();
+  }
+
   // Protected methods
   
   /** Create a node and a sequential child node.  Neither node has any data.
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
index 7526880..abeee94 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
@@ -152,7 +152,9 @@
           if (serviceName == null)
             serviceName = constructUniqueServiceName(connection, serviceType);
 
-          String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+          String encodedServiceName = ZooKeeperConnection.zooKeeperSafeName(serviceName);
+          
+          String activePath = buildServiceTypeActivePath(serviceType, encodedServiceName);
           if (connection.checkNodeExists(activePath))
             throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"' is already active");
           // First, see where we stand.
@@ -162,11 +164,11 @@
           List<String> children = connection.getChildren(registrationNodePath);
           boolean foundService = false;
           boolean foundActiveService = false;
-          for (String registeredServiceName : children)
+          for (String encodedRegisteredServiceName : children)
           {
-            if (registeredServiceName.equals(serviceName))
+            if (encodedRegisteredServiceName.equals(encodedServiceName))
               foundService = true;
-            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
               foundActiveService = true;
           }
           
@@ -197,17 +199,17 @@
           if (unregisterAll)
           {
             // Unregister all (since we did a global cleanup)
-            for (String registeredServiceName : children)
+            for (String encodedRegisteredServiceName : children)
             {
-              if (!registeredServiceName.equals(serviceName))
-                connection.deleteChild(registrationNodePath, registeredServiceName);
+              if (!encodedRegisteredServiceName.equals(encodedServiceName))
+                connection.deleteChild(registrationNodePath, encodedRegisteredServiceName);
             }
           }
 
           // Now, register (if needed)
           if (!foundService)
           {
-            connection.createChild(registrationNodePath, serviceName);
+            connection.createChild(registrationNodePath, encodedServiceName);
           }
           
           // Last, set the appropriate active flag
@@ -248,7 +250,7 @@
         enterServiceRegistryWriteLock(connection, serviceType);
         try
         {
-          String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+          String activePath = buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName));
           connection.setNodeData(activePath, (serviceData==null)?new byte[0]:serviceData);
         }
         finally
@@ -284,7 +286,7 @@
         enterServiceRegistryReadLock(connection, serviceType);
         try
         {
-          String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+          String activePath = buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName));
           return connection.getNodeData(activePath);
         }
         finally
@@ -321,13 +323,13 @@
         {
           String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
           List<String> children = connection.getChildren(registrationNodePath);
-          for (String registeredServiceName : children)
+          for (String encodedRegisteredServiceName : children)
           {
-            String activeNodePath = buildServiceTypeActivePath(serviceType, registeredServiceName);
+            String activeNodePath = buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName);
             if (connection.checkNodeExists(activeNodePath))
             {
               byte[] serviceData = connection.getNodeData(activeNodePath);
-              if (dataAcceptor.acceptServiceData(registeredServiceName, serviceData))
+              if (dataAcceptor.acceptServiceData(ZooKeeperConnection.zooKeeperDecodeSafeName(encodedRegisteredServiceName), serviceData))
                 break;
             }
           }
@@ -368,9 +370,9 @@
           String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
           List<String> children = connection.getChildren(registrationNodePath);
           int activeServiceCount = 0;
-          for (String registeredServiceName : children)
+          for (String encodedRegisteredServiceName : children)
           {
-            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
               activeServiceCount++;
           }
           return activeServiceCount;
@@ -417,25 +419,25 @@
           // Presumably the caller will lather, rinse, and repeat.
           String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
           List<String> children = connection.getChildren(registrationNodePath);
-          String serviceName = null;
-          for (String registeredServiceName : children)
+          String encodedServiceName = null;
+          for (String encodedRegisteredServiceName : children)
           {
-            if (!connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+            if (!connection.checkNodeExists(buildServiceTypeActivePath(serviceType, encodedRegisteredServiceName)))
             {
-              serviceName = registeredServiceName;
+              encodedServiceName = encodedRegisteredServiceName;
               break;
             }
           }
-          if (serviceName == null)
+          if (encodedServiceName == null)
             return true;
           
           // Found one, in serviceName, at position i
           // Ideally, we should signal at this point that we're cleaning up after it, and then leave
           // the exclusive lock, so that other activity can take place.  MHL
-          cleanup.cleanUpService(serviceName);
+          cleanup.cleanUpService(ZooKeeperConnection.zooKeeperDecodeSafeName(encodedServiceName));
 
           // Unregister the service.
-          connection.deleteChild(registrationNodePath, serviceName);
+          connection.deleteChild(registrationNodePath, encodedServiceName);
           return false;
         }
         finally
@@ -473,7 +475,7 @@
         enterServiceRegistryWriteLock(connection, serviceType);
         try
         {
-          connection.deleteNode(buildServiceTypeActivePath(serviceType, serviceName));
+          connection.deleteNode(buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName)));
         }
         finally
         {
@@ -510,7 +512,7 @@
         enterServiceRegistryReadLock(connection, serviceType);
         try
         {
-          return connection.checkNodeExists(buildServiceTypeActivePath(serviceType, serviceName));
+          return connection.checkNodeExists(buildServiceTypeActivePath(serviceType, ZooKeeperConnection.zooKeeperSafeName(serviceName)));
         }
         finally
         {
@@ -576,7 +578,7 @@
   */
   protected static String makeServiceCounterName(String serviceType)
   {
-    return SERVICETYPE_ANONYMOUS_COUNTER_PREFIX + serviceType;
+    return SERVICETYPE_ANONYMOUS_COUNTER_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
   }
   
   /** Read service counter.
@@ -621,21 +623,21 @@
   */
   protected static String buildServiceTypeLockPath(String serviceType)
   {
-    return SERVICETYPE_LOCK_PATH_PREFIX + serviceType;
+    return SERVICETYPE_LOCK_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
   }
   
   /** Build a zk path for the active node for a specific service of a specific type.
   */
-  protected static String buildServiceTypeActivePath(String serviceType, String serviceName)
+  protected static String buildServiceTypeActivePath(String serviceType, String encodedServiceName)
   {
-    return SERVICETYPE_ACTIVE_PATH_PREFIX + serviceType + "-" + serviceName;
+    return SERVICETYPE_ACTIVE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType) + "-" + encodedServiceName;
   }
   
   /** Build a zk path for the registration node for a specific service type.
   */
   protected static String buildServiceTypeRegistrationPath(String serviceType)
   {
-    return SERVICETYPE_REGISTER_PATH_PREFIX + serviceType;
+    return SERVICETYPE_REGISTER_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(serviceType);
   }
   
   // Shared configuration
@@ -731,7 +733,7 @@
       ZooKeeperConnection connection = pool.grab();
       try
       {
-        connection.setGlobalFlag(FLAG_PATH_PREFIX + flagName);
+        connection.setGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
       }
       finally
       {
@@ -756,7 +758,7 @@
       ZooKeeperConnection connection = pool.grab();
       try
       {
-        connection.clearGlobalFlag(FLAG_PATH_PREFIX + flagName);
+        connection.clearGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
       }
       finally
       {
@@ -782,7 +784,7 @@
       ZooKeeperConnection connection = pool.grab();
       try
       {
-        return connection.checkGlobalFlag(FLAG_PATH_PREFIX + flagName);
+        return connection.checkGlobalFlag(FLAG_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(flagName));
       }
       finally
       {
@@ -809,7 +811,7 @@
       ZooKeeperConnection connection = pool.grab();
       try
       {
-        return connection.readData(RESOURCE_PATH_PREFIX + resourceName);
+        return connection.readData(RESOURCE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(resourceName));
       }
       finally
       {
@@ -836,7 +838,7 @@
       ZooKeeperConnection connection = pool.grab();
       try
       {
-        connection.writeData(RESOURCE_PATH_PREFIX + resourceName, data);
+        connection.writeData(RESOURCE_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(resourceName), data);
       }
       finally
       {
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
index dd1f599..b68c970 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockObject.java
@@ -41,7 +41,7 @@
   {
     super(lockPool,lockKey);
     this.pool = pool;
-    this.lockPath = LOCK_PATH_PREFIX + lockKey.toString();
+    this.lockPath = LOCK_PATH_PREFIX + ZooKeeperConnection.zooKeeperSafeName(lockKey.toString());
   }
 
   @Override
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java b/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
index 632274e..59683d8 100644
--- a/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
@@ -253,6 +253,8 @@
           masterDatabaseUsername = LockManagerFactory.getStringProperty(threadContext,masterDatabaseUsernameProperty,"manifoldcf");
           masterDatabasePassword = LockManagerFactory.getStringProperty(threadContext,masterDatabasePasswordProperty,"local_pg_passwd");
 
+          // Register the throttler for cleanup on shutdown
+          addShutdownHook(new ThrottlerShutdown());
           // Register the file tracker for cleanup on shutdown
           tracker = new FileTrack();
           addShutdownHook(tracker);
@@ -1383,6 +1385,38 @@
 
   }
 
+  /** Class that cleans up throttler on exit */
+  protected static class ThrottlerShutdown implements IShutdownHook
+  {
+    public ThrottlerShutdown()
+    {
+    }
+    
+    @Override
+    public void doCleanup(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      IThrottleGroups connectionThrottler = ThrottleGroupsFactory.make(threadContext);
+      connectionThrottler.destroy();
+    }
+    
+    /** Finalizer, which is designed to catch class unloading that tomcat 5.5 does.
+    */
+    protected void finalize()
+      throws Throwable
+    {
+      try
+      {
+        doCleanup(ThreadContextFactory.make());
+      }
+      finally
+      {
+        super.finalize();
+      }
+    }
+
+  }
+  
   /** Class that cleans up database handles on exit */
   protected static class DatabaseShutdown implements IShutdownHook
   {
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
new file mode 100644
index 0000000..32d64c2
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
@@ -0,0 +1,437 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.concurrent.atomic.*;
+import java.util.*;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out throttling for connections,
+* on a bin-by-bin basis.  It is *not*, however, a connection pool.  Actually establishing
+* connections, and pooling established connections, is functionality that must reside in the
+* caller.
+*
+* The 'connections' each connection bin tracks are connections outstanding that share this bin name.
+* Not all such connections are identical; some may in fact have entirely different sets of
+* bins associated with them, but they all have the specific bin in common.  Since each bin has its
+* own unique limit, this effectively means that in order to get a connection, you need to find an
+* available slot in ALL of its constituent connection bins.  If the connections are pooled, it makes
+* the most sense to divide the pool up by characteristics such that identical connections are all
+* handled together - and it is reasonable to presume that an identical connection has identical
+* connection bins.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ConnectionBin
+{
+  /** True if this bin is alive still */
+  protected boolean isAlive = true;
+  /** This is the bin name which this connection pool belongs to */
+  protected final String binName;
+  /** Service type name */
+  protected final String serviceTypeName;
+  /** The (anonymous) service name */
+  protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+  
+  /** This is the maximum number of active connections allowed for this bin */
+  protected int maxActiveConnections = 0;
+  
+  /** This is the local maximum number of active connections allowed for this bin */
+  protected int localMax = 0;
+  /** This is the number of connections in this bin that have been reserved - that is, they
+  * are promised to various callers, but those callers have not yet committed to obtaining them. */
+  protected int reservedConnections = 0;
+  /** This is the number of connections in this bin that are connected; immaterial whether they are
+  * in use or in a pool somewhere. */
+  protected int inUseConnections = 0;
+
+  /** The service type prefix for connection bins */
+  protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
+
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
+  
+  /** Random number */
+  protected final static Random randomNumberGenerator = new Random();
+
+  /** Constructor. */
+  public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+    throws ManifoldCFException
+  {
+    this.binName = binName;
+    this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+    // Now, register and activate service anonymously, and record the service name we get.
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+  }
+
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+  {
+    return serviceTypePrefix + throttlingGroupName + "_" + binName;
+  }
+
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+  
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Update the maximum number of active connections.
+  */
+  public synchronized void updateMaxActiveConnections(int maxActiveConnections)
+  {
+    // Update the number; the poller will wake up any waiting threads.
+    this.maxActiveConnections = maxActiveConnections;
+  }
+
+  /** Wait for a connection to become available, in the context of an existing connection pool.
+  *@param poolCount is the number of connections in the pool times the number of bins per connection.
+  * This parameter is only ever changed in this class!!
+  *@return a recommendation as to how to proceed, using the IConnectionThrottler values.  If the
+  * recommendation is to create a connection, a slot will be reserved for that purpose.  A
+  * subsequent call to noteConnectionCreation() will be needed to confirm the reservation, or clearReservation() to
+  * release the reservation.
+  */
+  public synchronized int waitConnectionAvailable(AtomicInteger poolCount)
+    throws InterruptedException
+  {
+    // Reserved connections keep a slot available which can't be used by anyone else.
+    // Connection bins are always sorted so that deadlocks can't occur.
+    // Once all slots are reserved, the caller will go ahead and create the necessary connection
+    // and convert the reservation to a new connection.
+    
+    while (true)
+    {
+      if (!isAlive)
+        return IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+      int currentPoolCount = poolCount.get();
+      if (currentPoolCount > 0)
+      {
+        // Recommendation is to pull the connection from the pool.
+        poolCount.set(currentPoolCount - 1);
+        return IConnectionThrottler.CONNECTION_FROM_POOL;
+      }
+      if (inUseConnections + reservedConnections < localMax)
+      {
+        reservedConnections++;
+        return IConnectionThrottler.CONNECTION_FROM_CREATION;
+      }
+      // Wait for a connection to free up.  Note that it is up to the caller to free stuff up.
+      wait();
+    }
+  }
+  
+  /** Undo what we had decided to do before.
+  *@param recommendation is the decision returned by waitForConnection() above.
+  */
+  public synchronized void undoReservation(int recommendation, AtomicInteger poolCount)
+  {
+    if (recommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
+    {
+      if (reservedConnections == 0)
+        throw new IllegalStateException("Can't clear a reservation we don't have");
+      reservedConnections--;
+      notifyAll();
+    }
+    else if (recommendation == IConnectionThrottler.CONNECTION_FROM_POOL)
+    {
+      poolCount.set(poolCount.get() + 1);
+      notifyAll();
+    }
+  }
+  
+  /** Note the creation of an active connection that belongs to this bin.  The connection MUST
+  * have been reserved prior to the connection being created.
+  */
+  public synchronized void noteConnectionCreation()
+  {
+    if (reservedConnections == 0)
+      throw new IllegalStateException("Creating a connection when no connection slot reserved!");
+    reservedConnections--;
+    inUseConnections++;
+    // No notification needed because the total number of reserved+active connections did not change.
+  }
+
+  /** Figure out whether we are currently over target or not for this bin.
+  */
+  public synchronized boolean shouldReturnedConnectionBeDestroyed()
+  {
+    // We don't count reserved connections here because those are not yet committed
+    return inUseConnections > localMax;
+  }
+  
+  public static final int CONNECTION_DESTROY = 0;
+  public static final int CONNECTION_POOLEMPTY = 1;
+  public static final int CONNECTION_WITHINBOUNDS = 2;
+  
+  /** Figure out whether we are currently over target or not for this bin, and whether a
+  * connection should be pulled from the pool and destroyed.
+  * Note that this is tricky in conjunction with other bins, because those other bins
+  * may conclude that we can't destroy a connection.  If so, we just return the stolen
+  * connection back to the pool.
+  *@return CONNECTION_DESTROY, CONNECTION_POOLEMPTY, or CONNECTION_WITHINBOUNDS.
+  */
+  public synchronized int shouldPooledConnectionBeDestroyed(AtomicInteger poolCount)
+  {
+    int currentPoolCount = poolCount.get();
+    if (currentPoolCount > 0)
+    {
+      // Consider it removed from the pool for the purposes of consideration.  If we change our minds, we'll
+      // return it, and no harm done.
+      poolCount.set(currentPoolCount-1);
+      // We don't count reserved connections here because those are not yet committed.
+      if (inUseConnections > localMax)
+      {
+        return CONNECTION_DESTROY;
+      }
+      return CONNECTION_WITHINBOUNDS;
+    }
+    return CONNECTION_POOLEMPTY;
+  }
+
+  /** Check only if there's a pooled connection, and make moves to take it from the pool.
+  */
+  public synchronized boolean hasPooledConnection(AtomicInteger poolCount)
+  {
+    int currentPoolCount = poolCount.get();
+    if (currentPoolCount > 0)
+    {
+      poolCount.set(currentPoolCount-1);
+      return true;
+    }
+    return false;
+  }
+  
+  /** Undo the decision to destroy a pooled connection.
+  */
+  public synchronized void undoPooledConnectionDecision(AtomicInteger poolCount)
+  {
+    poolCount.set(poolCount.get() + 1);
+    notifyAll();
+  }
+  
+  /** Note a connection returned to the pool.
+  */
+  public synchronized void noteConnectionReturnedToPool(AtomicInteger poolCount)
+  {
+    poolCount.set(poolCount.get() + 1);
+    // Wake up threads possibly waiting on a pool return.
+    notifyAll();
+  }
+  
+  /** Note the destruction of an active connection that belongs to this bin.
+  */
+  public synchronized void noteConnectionDestroyed()
+  {
+    inUseConnections--;
+    notifyAll();
+  }
+
+  /** Poll this bin */
+  public synchronized void poll(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // The meat of the cross-cluster apportionment algorithm goes here!
+    // Two global numbers each service posts: "in-use" and "target".  At no time does a service *ever* post either a "target"
+    // that, together with all other active service targets, is in excess of the max.  Also, at no time a service post
+    // a target that, when added to the other "in-use" values, exceeds the max.  If the "in-use" values everywhere else
+    // already equal or exceed the max, then the target will be zero.
+    // The target quota is calculated as follows:
+    // (1) Target is summed, excluding ours.  This is GlobalTarget.
+    // (2) In-use is summed, excluding ours.  This is GlobalInUse.
+    // (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
+    //     smaller, but never less than zero.
+    // (4) Our FairTarget is computed.  The FairTarget divides the Maximum by the number of services, and adds
+    //     1 randomly based on the remainder.
+    // (5) We compute OptimalTarget as follows: We start with current local target.  If current local target
+    //    exceeds current local in-use count, we adjust OptimalTarget downward by one.  Otherwise we increase it
+    //    by one.
+    // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+      //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      int globalTarget = sumClass.getGlobalTarget();
+      int globalInUse = sumClass.getGlobalInUse();
+      int maximumTarget = maxActiveConnections - globalTarget;
+      if (maximumTarget > maxActiveConnections - globalInUse)
+        maximumTarget = maxActiveConnections - globalInUse;
+      if (maximumTarget < 0)
+        maximumTarget = 0;
+        
+      // Compute FairTarget
+      int fairTarget = maxActiveConnections / numServices;
+      int remainder = maxActiveConnections % numServices;
+      // Randomly choose whether we get an addition to the FairTarget
+      if (randomNumberGenerator.nextInt(numServices) < remainder)
+        fairTarget++;
+        
+      // Compute OptimalTarget
+      int localInUse = inUseConnections;
+      int optimalTarget = localMax;
+      if (localMax > localInUse)
+        optimalTarget--;
+      else
+      {
+        // We want a fast ramp up, so make this proportional to maxActiveConnections
+        int increment = maxActiveConnections >> 2;
+        if (increment == 0)
+          increment = 1;
+        optimalTarget += increment;
+      }
+        
+      //System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
+      // Now compute actual target
+      int target = maximumTarget;
+      if (target > fairTarget)
+        target = fairTarget;
+      if (target > optimalTarget)
+        target = optimalTarget;
+        
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
+        
+      // Now, update our localMax, if it needs it.
+      if (target == localMax)
+        return;
+      localMax = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+  }
+
+  /** Shut down the bin, and release everything that is waiting on it.
+  */
+  public synchronized void shutDown(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    isAlive = false;
+    notifyAll();
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.endServiceActivity(serviceTypeName, serviceName);
+  }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected int globalTargetTally = 0;
+    protected int globalInUseTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        globalInUseTally += unpackInUse(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public int getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public int getGlobalInUse()
+    {
+      return globalInUseTally;
+    }
+    
+  }
+  
+  protected static int unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[0]) & 0xff) +
+      ((((int)data[1]) << 8) & 0xff00) +
+      ((((int)data[2]) << 16) & 0xff0000) +
+      ((((int)data[3]) << 24) & 0xff000000);
+  }
+
+  protected static int unpackInUse(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[4]) & 0xff) +
+      ((((int)data[5]) << 8) & 0xff00) +
+      ((((int)data[6]) << 16) & 0xff0000) +
+      ((((int)data[7]) << 24) & 0xff000000);
+  }
+
+  protected static byte[] pack(int target, int inUse)
+  {
+    byte[] rval = new byte[8];
+    rval[0] = (byte)(target & 0xff);
+    rval[1] = (byte)((target >> 8) & 0xff);
+    rval[2] = (byte)((target >> 16) & 0xff);
+    rval[3] = (byte)((target >> 24) & 0xff);
+    rval[4] = (byte)(inUse & 0xff);
+    rval[5] = (byte)((inUse >> 8) & 0xff);
+    rval[6] = (byte)((inUse >> 16) & 0xff);
+    rval[7] = (byte)((inUse >> 24) & 0xff);
+    return rval;
+  }
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
new file mode 100644
index 0000000..c05b28c
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
@@ -0,0 +1,368 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out fetch rate throttling for connections,
+* on a bin-by-bin basis. 
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class FetchBin
+{
+  /** This is set to true until the bin is shut down. */
+  protected boolean isAlive = true;
+  /** This is the bin name which this connection pool belongs to */
+  protected final String binName;
+  /** Service type name */
+  protected final String serviceTypeName;
+  /** The (anonymous) service name */
+  protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
+  /** This is the minimum time between fetches for this bin, in ms. */
+  protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+  /** The local minimum time between fetches */
+  protected long localMinimum = Long.MAX_VALUE;
+
+  /** This is the last time a fetch was done on this bin */
+  protected long lastFetchTime = 0L;
+  /** Is the next fetch reserved? */
+  protected boolean reserveNextFetch = false;
+
+  /** The service type prefix for fetch bins */
+  protected final static String serviceTypePrefix = "_FETCHBIN_";
+
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
+  /** Constructor. */
+  public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+    throws ManifoldCFException
+  {
+    this.binName = binName;
+    this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+    // Now, register and activate service anonymously, and record the service name we get.
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+  }
+
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+  {
+    return serviceTypePrefix + throttlingGroupName + "_" + binName;
+  }
+
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Update the maximum number of active connections.
+  */
+  public synchronized void updateMinTimeBetweenFetches(long minTimeBetweenFetches)
+  {
+    // Update the number and wake up any waiting threads; they will take care of everything.
+    this.minTimeBetweenFetches = minTimeBetweenFetches;
+  }
+
+  /** Reserve a request to fetch a document from this bin.  The actual fetch is not yet committed
+  * with this call, but if it succeeds for all bins associated with the document, then the caller
+  * has permission to do the fetch, and can update the last fetch time.
+  *@return false if the fetch bin is being shut down.
+  */
+  public synchronized boolean reserveFetchRequest()
+    throws InterruptedException
+  {
+    // First wait for the ability to even get the next fetch from this bin
+    while (true)
+    {
+      if (!isAlive)
+        return false;
+      if (!reserveNextFetch)
+      {
+        reserveNextFetch = true;
+        return true;
+      }
+      wait();
+    }
+  }
+  
+  /** Clear reserved request.
+  */
+  public synchronized void clearReservation()
+  {
+    if (!reserveNextFetch)
+      throw new IllegalStateException("Can't clear a fetch reservation we don't have");
+    reserveNextFetch = false;
+    notifyAll();
+  }
+  
+  /** Wait the necessary time to do the fetch.  Presumes we've reserved the next fetch
+  * rights already, via reserveFetchRequest().
+  *@return false if the wait did not complete because the bin was shut down.
+  */
+  public synchronized boolean waitNextFetch()
+    throws InterruptedException
+  {
+    if (!reserveNextFetch)
+      throw new IllegalStateException("No fetch request reserved!");
+    
+    while (true)
+    {
+      if (!isAlive)
+        // Leave it to the caller to undo reservations
+        return false;
+      if (localMinimum == Long.MAX_VALUE)
+      {
+        // wait forever - but eventually someone will set a smaller interval and wake us up.
+        wait();
+      }
+      else
+      {
+        long currentTime = System.currentTimeMillis();
+        // Compute how long we have to wait, based on the current time and the time of the last fetch.
+        long waitAmt = lastFetchTime + localMinimum - currentTime;
+        if (waitAmt <= 0L)
+        {
+          // Note actual time we start the fetch.
+          if (currentTime > lastFetchTime)
+            lastFetchTime = currentTime;
+          reserveNextFetch = false;
+          return true;
+        }
+        wait(waitAmt);
+      }
+    }
+  }
+  
+  /** Poll this bin */
+  public synchronized void poll(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // This is where the cross-cluster logic happens.
+      // Each service records the following information:
+      // -- the target rate, in fetches per millisecond
+      // -- the earliest possible time for the service's next fetch, in ms from start of epoch
+      // Target rates are apportioned in fetches-per-ms space, as follows:
+      // (1) Target rate is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target rate by taking the minimum of MaximumTarget, FairTarget.
+      // The earliest time for the next fetch is computed as follows:
+      // (1) Find the LATEST most recent fetch time across the services, including an updated time for
+      //   the local service.
+      // (2) Compute the next possible fetch time, using the Target rate and that fetch time.
+      // (3) The new targeted fetch time will be set to that value.
+
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      long earliestTargetTime = sumClass.getEarliestTime();
+      long currentTime = System.currentTimeMillis();
+      
+      if (lastFetchTime == 0L)
+        earliestTargetTime = currentTime;
+      else if (earliestTargetTime > lastFetchTime)
+        earliestTargetTime = lastFetchTime;
+      
+      // Now, compute the target rate
+      double globalMaxFetchesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minTimeBetweenFetches == 0.0)
+      {
+        //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+        globalMaxFetchesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxFetchesPerMillisecond;
+        fairTarget = globalMaxFetchesPerMillisecond;
+      }
+      else
+      {
+        globalMaxFetchesPerMillisecond = 1.0 / minTimeBetweenFetches;
+        //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+        maximumTarget = globalMaxFetchesPerMillisecond - globalTarget;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxFetchesPerMillisecond / numServices;
+      }
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+
+      long target;
+      if (inverseTarget == 0.0)
+        target = Long.MAX_VALUE;
+      else
+        target = (long)(1.0/inverseTarget +0.5);
+      
+      long nextFetchTime = earliestTargetTime + target;
+      
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, nextFetchTime));
+
+      // Update local parameters: the rate, and the next time.
+      // But in order to update the next time, we have to update the last time.
+      if (target == localMinimum && earliestTargetTime == lastFetchTime)
+        return;
+      //System.out.println(binName+":Setting localMinimum="+target+"; last fetch time="+earliestTargetTime);
+      localMinimum = target;
+      lastFetchTime = earliestTargetTime;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
+  }
+
+  /** Shut the bin down, and wake up all threads waiting on it.
+  */
+  public synchronized void shutDown(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    isAlive = false;
+    notifyAll();
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.endServiceActivity(serviceTypeName, serviceName);
+  }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    protected long earliestTime = Long.MAX_VALUE;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        long checkTime = unpackEarliestTime(serviceData);
+        if (checkTime < earliestTime)
+          earliestTime = checkTime;
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public long getEarliestTime()
+    {
+      return earliestTime;
+    }
+  }
+
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0.0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static long unpackEarliestTime(byte[] data)
+  {
+    if (data == null || data.length != 16)
+      return Long.MAX_VALUE;
+    return (((long)data[8]) & 0xffL) +
+      ((((long)data[9]) << 8) & 0xff00L) +
+      ((((long)data[10]) << 16) & 0xff0000L) +
+      ((((long)data[11]) << 24) & 0xff000000L) +
+      ((((long)data[12]) << 32) & 0xff00000000L) +
+      ((((long)data[13]) << 40) & 0xff0000000000L) +
+      ((((long)data[14]) << 48) & 0xff000000000000L) +
+      ((((long)data[15]) << 56) & 0xff00000000000000L);
+  }
+
+  protected static byte[] pack(double targetDouble, long earliestTime)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    byte[] rval = new byte[16];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    rval[8] = (byte)(earliestTime & 0xffL);
+    rval[9] = (byte)((earliestTime >> 8) & 0xffL);
+    rval[10] = (byte)((earliestTime >> 16) & 0xffL);
+    rval[11] = (byte)((earliestTime >> 24) & 0xffL);
+    rval[12] = (byte)((earliestTime >> 32) & 0xffL);
+    rval[13] = (byte)((earliestTime >> 40) & 0xffL);
+    rval[14] = (byte)((earliestTime >> 48) & 0xffL);
+    rval[15] = (byte)((earliestTime >> 56) & 0xffL);
+    return rval;
+  }
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
new file mode 100644
index 0000000..dac1d39
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
@@ -0,0 +1,450 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+
+/** Throttles for a bin.
+* An instance of this class keeps track of the information needed to bandwidth throttle access
+* to a url belonging to a specific bin.
+*
+* In order to calculate
+* the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
+* For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
+* window length is too long.
+*
+* One solution to this problem would be to keep a list of the individual fetches as records.  Then, we could
+* "expire" a fetch by discarding the old record.  However, this is quite memory consumptive for all but the
+* smallest intervals.
+*
+* Another, better, solution is to hook into the start and end of individual fetches.  These will, presumably, occur
+* at the fastest possible rate without long pauses spent doing something else.  The only complication is that
+* fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
+* For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
+* than keep records around.  The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
+* acceptable.
+*
+* Some notes on the algorithms used to limit server bandwidth impact
+* ==================================================================
+*
+* In a single connection case, the algorithm we'd want to use works like this.  On the first chunk of a series,
+* the total length of time and the number of bytes are recorded.  Then, prior to each subsequent chunk, a calculation
+* is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
+* access as a way of estimating how long it will take to fetch those next n bytes.
+*
+* For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
+* and harder still to "hit the target", because simultaneous fetches will intrude.  The strategy is therefore:
+*
+* 1) The first chunk of any series should proceed without interference from other connections to the same server.
+*    The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
+*
+* 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".  That is, if other
+*    connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
+*    took.  Thus, by generating end-time estimates based on this number, we are actually being conservative and
+*    using less server bandwidth.
+*
+* 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
+*    new chunks from other connections can start.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ThrottleBin
+{
+  /** This signals whether the bin is alive or not. */
+  protected boolean isAlive = true;
+  /** This is the bin name which this throttle belongs to. */
+  protected final String binName;
+  /** Service type name */
+  protected final String serviceTypeName;
+  /** The (anonymous) service name */
+  protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
+  /** The minimum milliseconds per byte */
+  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+  /** The local minimum milliseconds per byte */
+  protected double localMinimum = Double.MAX_VALUE;
+  
+  /** This is the reference count for this bin (which records active references) */
+  protected volatile int refCount = 0;
+  /** The inverse rate estimate of the first fetch, in ms/byte */
+  protected double rateEstimate = 0.0;
+  /** Flag indicating whether a rate estimate is needed */
+  protected volatile boolean estimateValid = false;
+  /** Flag indicating whether rate estimation is in progress yet */
+  protected volatile boolean estimateInProgress = false;
+  /** The start time of this series */
+  protected long seriesStartTime = -1L;
+  /** Total actual bytes read in this series; this includes fetches in progress */
+  protected long totalBytesRead = -1L;
+
+  /** The service type prefix for throttle bins */
+  protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+  
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
+
+  /** Constructor. */
+  public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
+    throws ManifoldCFException
+  {
+    this.binName = binName;
+    this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
+    // Now, register and activate service anonymously, and record the service name we get.
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
+  }
+
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
+  {
+    return serviceTypePrefix + throttlingGroupName + "_" + binName;
+  }
+  
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Update minimumMillisecondsPerBytePerServer */
+  public synchronized void updateMinimumMillisecondsPerByte(double min)
+  {
+    this.minimumMillisecondsPerByte = min;
+  }
+  
+  /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
+  * May wait until schedule allows.
+  */
+  public void beginFetch()
+  {
+    synchronized (this)
+    {
+      if (refCount == 0)
+      {
+        // Now, reset bandwidth throttling counters
+        estimateValid = false;
+        rateEstimate = 0.0;
+        totalBytesRead = 0L;
+        estimateInProgress = false;
+        seriesStartTime = -1L;
+      }
+      refCount++;
+    }
+
+  }
+
+  /** Abort the fetch.
+  */
+  public void abortFetch()
+  {
+    synchronized (this)
+    {
+      refCount--;
+    }
+  }
+    
+  /** Note the start of an individual byte read of a specified size.  Call this method just before the
+  * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
+  *@return false if the wait was interrupted due to the bin being shut down.
+  */
+  public boolean beginRead(int byteCount)
+    throws InterruptedException
+  {
+
+    synchronized (this)
+    {
+      while (true)
+      {
+        if (!isAlive)
+          return false;
+        if (estimateInProgress)
+        {
+          wait();
+          continue;
+        }
+
+        // Update the current time
+        long currentTime = System.currentTimeMillis();
+        
+        if (estimateValid == false)
+        {
+          seriesStartTime = currentTime;
+          estimateInProgress = true;
+          // Add these bytes to the estimated total
+          totalBytesRead += (long)byteCount;
+          // Exit early; this thread isn't going to do any waiting
+          return true;
+        }
+
+        // If we haven't set a proper throttle yet, wait until we do.
+        if (localMinimum == Double.MAX_VALUE)
+        {
+          wait();
+          continue;
+        }
+        
+        // Estimate the time this read will take, and wait accordingly
+        long estimatedTime = (long)(rateEstimate * (double)byteCount);
+
+        // Figure out how long the total byte count should take, to meet the constraint
+        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
+
+
+        // The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
+        // current time.  But it can't be negative.
+        long waitTime = (desiredEndTime - estimatedTime) - currentTime;
+
+        // If no wait is needed, go ahead and update what needs to be updated and exit.  Otherwise, do the wait.
+        if (waitTime <= 0L)
+        {
+          // Add these bytes to the estimated total
+          totalBytesRead += (long)byteCount;
+          return true;
+        }
+        
+        this.wait(waitTime);
+        // Back around again...
+      }
+    }
+  }
+
+  /** Abort a read in progress.
+  */
+  public void abortRead()
+  {
+    synchronized (this)
+    {
+      if (estimateInProgress)
+      {
+        estimateInProgress = false;
+        notifyAll();
+      }
+    }
+  }
+    
+  /** Note the end of an individual read from the server.  Call this just after an individual read completes.
+  * Pass the actual number of bytes read to the method.
+  */
+  public void endRead(int originalCount, int actualCount)
+  {
+    synchronized (this)
+    {
+      totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
+      if (estimateInProgress)
+      {
+        if (actualCount == 0)
+          // Didn't actually get any bytes, so use 0.0
+          rateEstimate = 0.0;
+        else
+          rateEstimate = ((double)(System.currentTimeMillis() - seriesStartTime))/(double)actualCount;
+        estimateValid = true;
+        estimateInProgress = false;
+        notifyAll();
+      }
+    }
+  }
+
+  /** Note the end of a fetch operation.  Call this method just after the fetch completes.
+  */
+  public boolean endFetch()
+  {
+    synchronized (this)
+    {
+      refCount--;
+      return (refCount == 0);
+    }
+
+  }
+
+  /** Poll this bin */
+  public synchronized void poll(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    
+    // Enter write lock
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // The cross-cluster apportionment of byte fetching goes here.
+      // For byte-rate throttling, the apportioning algorithm is simple.  First, it's done
+      // in bytes per millisecond, which is the inverse of what we actually use for the
+      // rest of this class.  Each service posts its current value for the maximum bytes
+      // per millisecond, and a target value for the same.
+      // The target value is computed as follows:
+      // (1) Target is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
+
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      double globalMaxBytesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minimumMillisecondsPerByte == 0.0)
+      {
+        //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+        globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxBytesPerMillisecond;
+        fairTarget = globalMaxBytesPerMillisecond;
+      }
+      else
+      {
+        globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+        //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+        maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxBytesPerMillisecond / numServices;
+      }
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+
+      //System.out.println(binName+":Inverse target = "+inverseTarget+"; maximumTarget = "+maximumTarget+"; fairTarget = "+fairTarget);
+      
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget));
+
+      // Update our local minimum.
+      double target;
+      if (inverseTarget == 0.0)
+        target = Double.MAX_VALUE;
+      else
+        target = 1.0 / inverseTarget;
+      
+      // Reset local minimum, if it has changed.
+      if (target == localMinimum)
+        return;
+      //System.out.println(binName+":Updating local minimum to "+target);
+      localMinimum = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
+  }
+
+  /** Shut down this bin.
+  */
+  public synchronized void shutDown(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    isAlive = false;
+    notifyAll();
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.endServiceActivity(serviceTypeName, serviceName);
+  }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+  }
+  
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0.0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static byte[] pack(double targetDouble)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    byte[] rval = new byte[8];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    return rval;
+  }
+
+
+}
+
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java
new file mode 100644
index 0000000..cb38c42
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java
@@ -0,0 +1,134 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+
+/** An implementation of IThrottleGroups, which establishes a JVM-wide
+* pool of throttlers that can be used as a resource by any connector that needs
+* it.
+*/
+public class ThrottleGroups implements IThrottleGroups
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** The thread context */
+  protected final IThreadContext threadContext;
+    
+  /** The actual static pool */
+  protected final static Throttler throttler = new Throttler();
+  
+  /** Constructor */
+  public ThrottleGroups(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    this.threadContext = threadContext;
+  }
+  
+  /** Get all existing throttle groups for a throttle group type.
+  * The throttle group type typically describes a connector class, while the throttle group represents
+  * a namespace of bin names specific to that connector class.
+  *@param throttleGroupType is the throttle group type.
+  *@return the set of throttle groups for that group type.
+  */
+  @Override
+  public Set<String> getThrottleGroups(String throttleGroupType)
+    throws ManifoldCFException
+  {
+    return throttler.getThrottleGroups(threadContext, throttleGroupType);
+  }
+  
+  /** Remove a throttle group.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  */
+  @Override
+  public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+    throws ManifoldCFException
+  {
+    throttler.removeThrottleGroup(threadContext, throttleGroupType, throttleGroup);
+  }
+  
+  /** Set or update throttle specification for a throttle group.  This creates the
+  * throttle group if it does not yet exist.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param throttleSpec is the desired throttle specification object.
+  */
+  @Override
+  public void createOrUpdateThrottleGroup(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+    throws ManifoldCFException
+  {
+    throttler.createOrUpdateThrottleGroup(threadContext, throttleGroupType, throttleGroup, throttleSpec);
+  }
+
+  /** Construct connection throttler for connections with specific bin names.  This object is meant to be embedded with a connection
+  * pool of similar objects, and used to gate the creation of new connections in that pool.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param binNames are the connection type bin names.
+  *@return the connection throttling object, or null if the pool is being shut down.
+  */
+  @Override
+  public IConnectionThrottler obtainConnectionThrottler(String throttleGroupType, String throttleGroup, String[] binNames)
+    throws ManifoldCFException
+  {
+    java.util.Arrays.sort(binNames);
+    return throttler.obtainConnectionThrottler(threadContext, throttleGroupType, throttleGroup, binNames);
+  }
+
+  /** Poll periodically, to update cluster-wide statistics and allocation.
+  *@param throttleGroupType is the throttle group type to update.
+  */
+  @Override
+  public void poll(String throttleGroupType)
+    throws ManifoldCFException
+  {
+    throttler.poll(threadContext, throttleGroupType);
+  }
+  
+  /** Poll periodically, to update ALL cluster-wide statistics and allocation.
+  */
+  @Override
+  public void poll()
+    throws ManifoldCFException
+  {
+    throttler.poll(threadContext);
+  }
+
+  /** Free unused resources.
+  */
+  @Override
+  public void freeUnusedResources()
+    throws ManifoldCFException
+  {
+    throttler.freeUnusedResources(threadContext);
+  }
+  
+  /** Shut down throttler permanently.
+  */
+  @Override
+  public void destroy()
+    throws ManifoldCFException
+  {
+    throttler.destroy(threadContext);
+  }
+
+}
diff --git a/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
new file mode 100644
index 0000000..e3d4406
--- /dev/null
+++ b/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
@@ -0,0 +1,1111 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/** A Throttler object creates a virtual pool of connections to resources
+* whose access needs to be throttled in number, rate of use, and byte rate.
+* This code is modeled on the code for distributed connection pools, and is intended
+* to work in a similar manner.  Basically, a periodic assessment is done about what the
+* local throttling parameters should be (on a per-pool basis), and the local throttling
+* activities then adjust what they are doing based on the new parameters.  A service
+* model is used to keep track of which pools have what clients working with them.
+* This implementation has the advantage that:
+* (1) Only local throttling ever takes place on a method-by-method basis, which makes
+*   it possible to use throttling even in streams and background threads;
+* (2) Throttling resources are apportioned fairly, on average, between all the various
+*   cluster members, so it is unlikely that any persistent starvation conditions can
+*   arise.
+*/
+public class Throttler
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Throttle group hash table.  Keyed by throttle group type, value is throttling groups */
+  protected final Map<String,ThrottlingGroups> throttleGroupsHash = new HashMap<String,ThrottlingGroups>();
+
+  /** Create a throttler instance.  Usually there will be one of these per connector
+  * type that needs throttling.
+  */
+  public Throttler()
+  {
+  }
+  
+  // There are a lot of synchronizers to coordinate here.  They are indeed hierarchical.  It is not possible to simply
+  // throw a synchronizer at every level, and require that we hold all of them, because when we wait somewhere in the
+  // inner level, we will continue to hold locks and block access to all the outer levels.
+  //
+  // Instead, I've opted for a model whereby individual resources are protected.  This is tricky to coordinate, though,
+  // because (for instance) after a resource has been removed from the hash table, it had better be cleaned up
+  // thoroughly before the outer lock is removed, or two versions of the resource might wind up coming into existence.
+  // The general rule is therefore:
+  // (1) Creation or deletion of resources involves locking the parent where the resource is being added or removed
+  // (2) Anything that waits CANNOT also add or remove.
+  
+  /** Get all existing throttle groups for a throttle group type.
+  * The throttle group type typically describes a connector class, while the throttle group represents
+  * a namespace of bin names specific to that connector class.
+  *@param throttleGroupType is the throttle group type.
+  *@return the set of throttle groups for that group type.
+  */
+  public Set<String> getThrottleGroups(IThreadContext threadContext, String throttleGroupType)
+    throws ManifoldCFException
+  {
+    synchronized (throttleGroupsHash)
+    {
+      return throttleGroupsHash.keySet();
+    }
+  }
+  
+  /** Remove a throttle group.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  */
+  public void removeThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup)
+    throws ManifoldCFException
+  {
+    // Removal.  Lock the whole hierarchy.
+    synchronized (throttleGroupsHash)
+    {
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg != null)
+      {
+        tg.removeThrottleGroup(threadContext, throttleGroup);
+      }
+    }
+  }
+  
+  /** Set or update throttle specification for a throttle group.  This creates the
+  * throttle group if it does not yet exist.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param throttleSpec is the desired throttle specification object.
+  */
+  public void createOrUpdateThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+    throws ManifoldCFException
+  {
+    // Potential addition.  Lock the whole hierarchy.
+    synchronized (throttleGroupsHash)
+    {
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg == null)
+      {
+        tg = new ThrottlingGroups(throttleGroupType);
+        throttleGroupsHash.put(throttleGroupType, tg);
+      }
+      tg.createOrUpdateThrottleGroup(threadContext, throttleGroup, throttleSpec);
+    }
+  }
+
+  /** Construct connection throttler for connections with specific bin names.  This object is meant to be embedded with a connection
+  * pool of similar objects, and used to gate the creation of new connections in that pool.
+  *@param throttleGroupType is the throttle group type.
+  *@param throttleGroup is the throttle group.
+  *@param binNames are the connection type bin names.
+  *@return the connection throttling object, or null if the pool is being shut down.
+  */
+  public IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
+    throws ManifoldCFException
+  {
+    // No waiting, so lock the entire tree.
+    synchronized (throttleGroupsHash)
+    {
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg != null)
+        return tg.obtainConnectionThrottler(threadContext, throttleGroup, binNames);
+      return null;
+    }
+  }
+  
+  /** Poll periodically.
+  */
+  public void poll(IThreadContext threadContext, String throttleGroupType)
+    throws ManifoldCFException
+  {
+    // No waiting, so lock the entire tree.
+    synchronized (throttleGroupsHash)
+    {
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg != null)
+        tg.poll(threadContext);
+    }
+      
+  }
+
+  /** Poll ALL bins periodically.
+  */
+  public void poll(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // No waiting, so lock the entire tree.
+    synchronized (throttleGroupsHash)
+    {
+      for (ThrottlingGroups tg : throttleGroupsHash.values())
+      {
+        tg.poll(threadContext);
+      }
+    }
+      
+  }
+  
+  /** Free unused resources.
+  */
+  public void freeUnusedResources(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // This potentially affects the entire hierarchy.
+    // Go through the whole pool and clean it out
+    synchronized (throttleGroupsHash)
+    {
+      Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
+      while (iter.hasNext())
+      {
+        ThrottlingGroups p = iter.next();
+        p.freeUnusedResources(threadContext);
+      }
+    }
+  }
+  
+  /** Shut down all throttlers and deregister them.
+  */
+  public void destroy(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // This affects the entire hierarchy, so lock the whole thing.
+    // Go through the whole pool and clean it out
+    synchronized (throttleGroupsHash)
+    {
+      Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
+      while (iter.hasNext())
+      {
+        ThrottlingGroups p = iter.next();
+        p.destroy(threadContext);
+        iter.remove();
+      }
+    }
+  }
+
+  // Protected methods and classes
+  
+  protected static String buildThrottlingGroupName(String throttlingGroupType, String throttlingGroupName)
+  {
+    return throttlingGroupType + "_" + throttlingGroupName;
+  }
+
+  /** This class represents a throttling group pool */
+  protected class ThrottlingGroups
+  {
+    /** The throttling group type for this throttling group pool */
+    protected final String throttlingGroupTypeName;
+    /** The pool of individual throttle group services for this pool, keyed by throttle group name */
+    protected final Map<String,ThrottlingGroup> groups = new HashMap<String,ThrottlingGroup>();
+    
+    public ThrottlingGroups(String throttlingGroupTypeName)
+    {
+      this.throttlingGroupTypeName = throttlingGroupTypeName;
+    }
+    
+    /** Update throttle specification */
+    public void createOrUpdateThrottleGroup(IThreadContext threadContext, String throttleGroup, IThrottleSpec throttleSpec)
+      throws ManifoldCFException
+    {
+      synchronized (groups)
+      {
+        ThrottlingGroup g = groups.get(throttleGroup);
+        if (g == null)
+        {
+          g = new ThrottlingGroup(threadContext, throttlingGroupTypeName, throttleGroup, throttleSpec);
+          groups.put(throttleGroup, g);
+        }
+        else
+        {
+          g.updateThrottleSpecification(throttleSpec);
+        }
+      }
+    }
+    
+    /** Obtain connection throttler.
+    *@return the throttler, or null of the hierarchy has changed.
+    */
+    public IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String throttleGroup, String[] binNames)
+      throws ManifoldCFException
+    {
+      synchronized (groups)
+      {
+        ThrottlingGroup g = groups.get(throttleGroup);
+        if (g == null)
+          return null;
+        return g.obtainConnectionThrottler(threadContext, binNames);
+      }
+    }
+    
+    /** Remove specified throttle group */
+    public void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
+      throws ManifoldCFException
+    {
+      // Must synch the whole thing, because otherwise there would be a risk of someone recreating the
+      // group right after we removed it from the map, and before we destroyed it.
+      synchronized (groups)
+      {
+        ThrottlingGroup g = groups.remove(throttleGroup);
+        if (g != null)
+        {
+          g.destroy(threadContext);
+        }
+      }
+    }
+    
+    /** Poll this set of throttle groups.
+    */
+    public void poll(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      synchronized (groups)
+      {
+        Iterator<String> iter = groups.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String throttleGroup = iter.next();
+          ThrottlingGroup p = groups.get(throttleGroup);
+          p.poll(threadContext);
+        }
+      }
+    }
+
+    /** Free unused resources */
+    public void freeUnusedResources(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      synchronized (groups)
+      {
+        Iterator<ThrottlingGroup> iter = groups.values().iterator();
+        while (iter.hasNext())
+        {
+          ThrottlingGroup g = iter.next();
+          g.freeUnusedResources(threadContext);
+        }
+      }
+    }
+    
+    /** Destroy and shutdown all */
+    public void destroy(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      synchronized (groups)
+      {
+        Iterator<ThrottlingGroup> iter = groups.values().iterator();
+        while (iter.hasNext())
+        {
+          ThrottlingGroup p = iter.next();
+          p.destroy(threadContext);
+          iter.remove();
+        }
+      }
+    }
+  }
+  
+  /** This class represents a throttling group, of a specific throttling group type.  It basically
+  * describes an entire self-consistent throttling environment.
+  */
+  protected class ThrottlingGroup
+  {
+    /** The throttling group name */
+    protected final String throttlingGroupName;
+    /** The current throttle spec */
+    protected IThrottleSpec throttleSpec;
+    
+    /** The connection bins */
+    protected final Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
+    /** The fetch bins */
+    protected final Map<String,FetchBin> fetchBins = new HashMap<String,FetchBin>();
+    /** The throttle bins */
+    protected final Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
+
+    // For synchronization, we use several in this class.
+    // Modification to the connectionBins, fetchBins, or throttleBins hashes uses the appropriate local synchronizer.
+    // Changes to other local variables use the main synchronizer.
+    
+    /** Constructor
+    */
+    public ThrottlingGroup(IThreadContext threadContext, String throttlingGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+      throws ManifoldCFException
+    {
+      this.throttlingGroupName = buildThrottlingGroupName(throttlingGroupType, throttleGroup);
+      this.throttleSpec = throttleSpec;
+      // Once all that is done, perform the initial setting of all the bin cutoffs
+      poll(threadContext);
+    }
+
+    /** Create a bunch of bins, corresponding to the bin names specified.
+    * Note that this also registers them as services etc.
+    *@param binNames describes the set of bins to create.
+    */
+    public synchronized IConnectionThrottler obtainConnectionThrottler(IThreadContext threadContext, String[] binNames)
+      throws ManifoldCFException
+    {
+      synchronized (connectionBins)
+      {
+        for (String binName : binNames)
+        {
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin == null)
+          {
+            bin = new ConnectionBin(threadContext, throttlingGroupName, binName);
+            connectionBins.put(binName, bin);
+          }
+        }
+      }
+      
+      synchronized (fetchBins)
+      {
+        for (String binName : binNames)
+        {
+          FetchBin bin = fetchBins.get(binName);
+          if (bin == null)
+          {
+            bin = new FetchBin(threadContext, throttlingGroupName, binName);
+            fetchBins.put(binName, bin);
+          }
+        }
+      }
+      
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin == null)
+          {
+            bin = new ThrottleBin(threadContext, throttlingGroupName, binName);
+            throttleBins.put(binName, bin);
+          }
+        }
+      }
+      
+      return new ConnectionThrottler(this, binNames);
+    }
+    
+    /** Update the throttle spec.
+    *@param throttleSpec is the new throttle spec for this throttle group.
+    */
+    public synchronized void updateThrottleSpecification(IThrottleSpec throttleSpec)
+      throws ManifoldCFException
+    {
+      this.throttleSpec = throttleSpec;
+    }
+    
+    
+    // IConnectionThrottler support methods
+    
+    /** Wait for a connection to become available.
+    *@param poolCount is a description of how many connections
+    * are available in the current pool, across all bins.
+    *@return the IConnectionThrottler codes for results.
+    */
+    public int waitConnectionAvailable(String[] binNames, AtomicInteger poolCount)
+      throws InterruptedException
+    {
+      // Each bin can signal something different.  Bins that signal
+      // CONNECTION_FROM_NOWHERE are shutting down, but there's also
+      // apparently the conflicting possibilities of distinct answers of
+      // CONNECTION_FROM_POOL and CONNECTION_FROM_CREATION.
+      // However: the pool count we track is in fact N * the actual pool count,
+      // where N is the number of bins in each connection.  This means that a conflict 
+      // is ALWAYS due to two entities simultaneously calling waitConnectionAvailable(),
+      // and deadlocking each other.  The solution is therefore to back off and retry.
+
+      // This is the retry loop
+      while (true)
+      {
+        int currentRecommendation = IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+        
+        boolean retry = false;
+
+        // First, make sure all the bins exist, and reserve a slot in each
+        int i = 0;
+        while (i < binNames.length)
+        {
+          String binName = binNames[i];
+          ConnectionBin bin;
+          synchronized (connectionBins)
+          {
+            bin = connectionBins.get(binName);
+          }
+          if (bin != null)
+          {
+            // Reserve a slot
+            int result = bin.waitConnectionAvailable(poolCount);
+            if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE ||
+              (currentRecommendation != IConnectionThrottler.CONNECTION_FROM_NOWHERE && currentRecommendation != result))
+            {
+              // Release previous reservations, and either return, or retry
+              while (i > 0)
+              {
+                i--;
+                binName = binNames[i];
+                synchronized (connectionBins)
+                {
+                  bin = connectionBins.get(binName);
+                }
+                if (bin != null)
+                  bin.undoReservation(currentRecommendation, poolCount);
+              }
+              if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+                return result;
+              // Break out of the outer loop so we can retry
+              retry = true;
+              break;
+            }
+            if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+              currentRecommendation = result;
+          }
+          i++;
+        }
+        
+        if (retry)
+          continue;
+        
+        // Complete the reservation process (if that is what we decided)
+        if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
+        {
+          // All reservations have been made!  Convert them.
+          for (String binName : binNames)
+          {
+            ConnectionBin bin;
+            synchronized (connectionBins)
+            {
+              bin = connectionBins.get(binName);
+            }
+            if (bin != null)
+              bin.noteConnectionCreation();
+          }
+        }
+
+        return currentRecommendation;
+      }
+      
+    }
+    
+    public IFetchThrottler getNewConnectionFetchThrottler(String[] binNames)
+    {
+      return new FetchThrottler(this, binNames);
+    }
+    
+    public boolean noteReturnedConnection(String[] binNames)
+    {
+      // If ANY of the bins think the connection should be destroyed, then that will be
+      // the recommendation.
+      synchronized (connectionBins)
+      {
+        boolean destroyConnection = false;
+
+        for (String binName : binNames)
+        {
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+          {
+            destroyConnection |= bin.shouldReturnedConnectionBeDestroyed();
+          }
+        }
+        
+        return destroyConnection;
+      }
+    }
+    
+    public boolean checkDestroyPooledConnection(String[] binNames, AtomicInteger poolCount)
+    {
+      // Only if all believe we can destroy a pool connection, will we do it.
+      // This is because some pools may be empty, etc.
+      synchronized (connectionBins)
+      {
+        boolean destroyConnection = false;
+
+        int i = 0;
+        while (i < binNames.length)
+        {
+          String binName = binNames[i];
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+          {
+            int result = bin.shouldPooledConnectionBeDestroyed(poolCount);
+            if (result == ConnectionBin.CONNECTION_POOLEMPTY)
+            {
+              // Give up now, and undo all the other bins
+              while (i > 0)
+              {
+                i--;
+                binName = binNames[i];
+                bin = connectionBins.get(binName);
+                bin.undoPooledConnectionDecision(poolCount);
+              }
+              return false;
+            }
+            else if (result == ConnectionBin.CONNECTION_DESTROY)
+            {
+              destroyConnection = true;
+            }
+          }
+          i++;
+        }
+        
+        if (destroyConnection)
+          return true;
+        
+        // Undo pool reservation, since everything is apparently within bounds.
+        for (String binName : binNames)
+        {
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+            bin.undoPooledConnectionDecision(poolCount);
+        }
+        
+        return false;
+      }
+
+    }
+    
+    /** Connection expiration is tricky, because even though a connection may be identified as
+    * being expired, at the very same moment it could be handed out in another thread.  So there
+    * is a natural race condition present.
+    * The way the connection throttler deals with that is to allow the caller to reserve a connection
+    * for expiration.  This must be called BEFORE the actual identified connection is removed from the
+    * connection pool.  If the value returned by this method is "true", then a connection MUST be removed
+    * from the pool and destroyed, whether or not the identified connection is actually still available for
+    * destruction or not.
+    *@return true if a connection from the pool can be expired.  If true is returned, noteConnectionDestruction()
+    *  MUST be called once the connection has actually been destroyed.
+    */
+    public boolean checkExpireConnection(String[] binNames, AtomicInteger poolCount)
+    {
+      synchronized (connectionBins)
+      {
+        int i = 0;
+        while (i < binNames.length)
+        {
+          String binName = binNames[i];
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+          {
+            if (!bin.hasPooledConnection(poolCount))
+            {
+              // Give up now, and undo all the other bins
+              while (i > 0)
+              {
+                i--;
+                binName = binNames[i];
+                bin = connectionBins.get(binName);
+                bin.undoPooledConnectionDecision(poolCount);
+              }
+              return false;
+            }
+          }
+          i++;
+        }
+        return true;
+      }
+    }
+
+    public void noteConnectionReturnedToPool(String[] binNames, AtomicInteger poolCount)
+    {
+      synchronized (connectionBins)
+      {
+        for (String binName : binNames)
+        {
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+            bin.noteConnectionReturnedToPool(poolCount);
+        }
+      }
+    }
+
+    public void noteConnectionDestroyed(String[] binNames)
+    {
+      synchronized (connectionBins)
+      {
+        for (String binName : binNames)
+        {
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+            bin.noteConnectionDestroyed();
+        }
+      }
+    }
+    
+    // IFetchThrottler support methods
+    
+    /** Get permission to fetch a document.  This grants permission to start
+    * fetching a single document, within the connection that has already been
+    * granted permission that created this object.
+    *@param binNames are the names of the bins.
+    *@return false if being shut down
+    */
+    public boolean obtainFetchDocumentPermission(String[] binNames)
+      throws InterruptedException
+    {
+      // First, make sure all the bins exist, and reserve a slot in each
+      int i = 0;
+      while (i < binNames.length)
+      {
+        String binName = binNames[i];
+        FetchBin bin;
+        synchronized (fetchBins)
+        {
+          bin = fetchBins.get(binName);
+        }
+        // Reserve a slot
+        if (bin == null || !bin.reserveFetchRequest())
+        {
+          // Release previous reservations, and return null
+          while (i > 0)
+          {
+            i--;
+            binName = binNames[i];
+            synchronized (fetchBins)
+            {
+              bin = fetchBins.get(binName);
+            }
+            if (bin != null)
+              bin.clearReservation();
+          }
+          return false;
+        }
+        i++;
+      }
+      
+      // All reservations have been made!  Convert them.
+      // (These are guaranteed to succeed - but they may wait)
+      i = 0;
+      while (i < binNames.length)
+      {
+        String binName = binNames[i];
+        FetchBin bin;
+        synchronized (fetchBins)
+        {
+          bin = fetchBins.get(binName);
+        }
+        if (bin != null)
+        {
+          if (!bin.waitNextFetch())
+          {
+            // Undo the reservations we haven't processed yet
+            while (i < binNames.length)
+            {
+              binName = binNames[i];
+              synchronized (fetchBins)
+              {
+                bin = fetchBins.get(binName);
+              }
+              if (bin != null)
+                bin.clearReservation();
+              i++;
+            }
+            return false;
+          }
+        }
+        i++;
+      }
+      return true;
+    }
+    
+    public IStreamThrottler createFetchStream(String[] binNames)
+    {
+      // Do a "begin fetch" for all throttle bins
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin != null)
+            bin.beginFetch();
+        }
+      }
+      
+      return new StreamThrottler(this, binNames);
+    }
+    
+    // IStreamThrottler support methods
+    
+    /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
+    * The throttle group, bin names, etc are already known
+    * to this specific interface object, so it is unnecessary to include them here.
+    *@param byteCount is the number of bytes to get permissions to read.
+    *@return true if the wait took place as planned, or false if the system is being shut down.
+    */
+    public boolean obtainReadPermission(String[] binNames, int byteCount)
+      throws InterruptedException
+    {
+      int i = 0;
+      while (i < binNames.length)
+      {
+        String binName = binNames[i];
+        ThrottleBin bin;
+        synchronized (throttleBins)
+        {
+          bin = throttleBins.get(binName);
+        }
+        if (bin == null || !bin.beginRead(byteCount))
+        {
+          // End bins we've already done, and exit
+          while (i > 0)
+          {
+            i--;
+            binName = binNames[i];
+            synchronized (throttleBins)
+            {
+              bin = throttleBins.get(binName);
+            }
+            if (bin != null)
+              bin.endRead(byteCount,0);
+          }
+          return false;
+        }
+        i++;
+      }
+      return true;
+    }
+      
+    /** Note the completion of the read of a block of bytes.  Call this after
+    * obtainReadPermission() was successfully called, and bytes were successfully read.
+    *@param origByteCount is the originally requested number of bytes to get permissions to read.
+    *@param actualByteCount is the number of bytes actually read.
+    */
+    public void releaseReadPermission(String[] binNames, int origByteCount, int actualByteCount)
+    {
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin != null)
+            bin.endRead(origByteCount, actualByteCount);
+        }
+      }
+    }
+
+    /** Note the stream being closed.
+    */
+    public void closeStream(String[] binNames)
+    {
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin != null)
+            bin.endFetch();
+        }
+      }
+    }
+
+    // Bookkeeping methods
+    
+    /** Call this periodically.
+    */
+    public synchronized void poll(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      // Go through all existing bins and update each one.
+      synchronized (connectionBins)
+      {
+        for (ConnectionBin bin : connectionBins.values())
+        {
+          bin.updateMaxActiveConnections(throttleSpec.getMaxOpenConnections(bin.getBinName()));
+          bin.poll(threadContext);
+        }
+      }
+  
+      synchronized (fetchBins)
+      {
+        for (FetchBin bin : fetchBins.values())
+        {
+          bin.updateMinTimeBetweenFetches(throttleSpec.getMinimumMillisecondsPerFetch(bin.getBinName()));
+          bin.poll(threadContext);
+        }
+      }
+      
+      synchronized (throttleBins)
+      {
+        for (ThrottleBin bin : throttleBins.values())
+        {
+          bin.updateMinimumMillisecondsPerByte(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
+          bin.poll(threadContext);
+        }
+      }
+      
+    }
+    
+    /** Free unused resources.
+    */
+    public synchronized void freeUnusedResources(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      // Does nothing; there are not really resources to free
+    }
+    
+    /** Destroy this pool.
+    */
+    public synchronized void destroy(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      synchronized (connectionBins)
+      {
+        Iterator<ConnectionBin> binIter = connectionBins.values().iterator();
+        while (binIter.hasNext())
+        {
+          ConnectionBin bin = binIter.next();
+          bin.shutDown(threadContext);
+          binIter.remove();
+        }
+      }
+      
+      synchronized (fetchBins)
+      {
+        Iterator<FetchBin> binIter = fetchBins.values().iterator();
+        while (binIter.hasNext())
+        {
+          FetchBin bin = binIter.next();
+          bin.shutDown(threadContext);
+          binIter.remove();
+        }
+      }
+      
+      synchronized (throttleBins)
+      {
+        Iterator<ThrottleBin> binIter = throttleBins.values().iterator();
+        while (binIter.hasNext())
+        {
+          ThrottleBin bin = binIter.next();
+          bin.shutDown(threadContext);
+          binIter.remove();
+        }
+      }
+
+    }
+  }
+  
+  /** Connection throttler implementation class.
+  * This class instance stores some parameters and links back to ThrottlingGroup.  But each class instance
+  * models a connection pool with the specified bins.  But the description of each pool consists of more than just
+  * the bin names that describe the throttling - it also may include connection parameters which we have
+  * no insight into at this level.
+  *
+  * Thus, in order to do pool tracking properly, we cannot simply rely on the individual connection bin instances
+  * to do all the work, since they cannot distinguish between different pools properly.  So that leaves us with
+  * two choices.  (1) We can somehow push the separate pool instance parameters down to the connection bin
+  * level, or (2) the connection bins cannot actually do any waiting or blocking.
+  *
+  * The benefit of having blocking take place in connection bins is that they are in fact designed to be precisely
+  * the thing you would want to synchronize on.   If we presume that the waits happen in those classes,
+  * then we need the ability to send in our local pool count to them, and we need to be able to "wake up"
+  * those underlying classes when the local pool count changes.
+  */
+  protected static class ConnectionThrottler implements IConnectionThrottler
+  {
+    protected final ThrottlingGroup parent;
+    protected final String[] binNames;
+    
+    // Keep track of local pool parameters.
+
+    /** This is the number of connections in the pool, times the number of bins per connection */
+    protected final AtomicInteger poolCount = new AtomicInteger(0);
+
+    public ConnectionThrottler(ThrottlingGroup parent, String[] binNames)
+    {
+      this.parent = parent;
+      this.binNames = binNames;
+    }
+    
+    /** Get permission to grab a connection for use.  If this object believes there is a connection
+    * available in the pool, it will update its pool size variable and return   If not, this method
+    * evaluates whether a new connection should be created.  If neither condition is true, it
+    * waits until a connection is available.
+    *@return whether to take the connection from the pool, or create one, or whether the
+    * throttler is being shut down.
+    */
+    @Override
+    public int waitConnectionAvailable()
+      throws InterruptedException
+    {
+      return parent.waitConnectionAvailable(binNames, poolCount);
+    }
+    
+    /** For a new connection, obtain the fetch throttler to use for the connection.
+    * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+    * the calling code is expected to create a connection using the result of this method.
+    *@return the fetch throttler for a new connection.
+    */
+    @Override
+    public IFetchThrottler getNewConnectionFetchThrottler()
+    {
+      return parent.getNewConnectionFetchThrottler(binNames);
+    }
+    
+    /** For returning a connection from use, there is only one method.  This method signals
+    /* whether a formerly in-use connection should be placed back in the pool or destroyed.
+    *@return true if the connection should NOT be put into the pool but should instead
+    *  simply be destroyed.  If true is returned, the caller MUST call noteConnectionDestroyed()
+    *  (below) in order for the bookkeeping to work.
+    */
+    @Override
+    public boolean noteReturnedConnection()
+    {
+      return parent.noteReturnedConnection(binNames);
+    }
+    
+    /** This method calculates whether a connection should be taken from the pool and destroyed
+    /* in order to meet quota requirements.  If this method returns
+    /* true, you MUST remove a connection from the pool, and you MUST call
+    /* noteConnectionDestroyed() afterwards.
+    *@return true if a pooled connection should be destroyed.  If true is returned, the
+    * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+    */
+    @Override
+    public boolean checkDestroyPooledConnection()
+    {
+      return parent.checkDestroyPooledConnection(binNames, poolCount);
+    }
+    
+    /** Connection expiration is tricky, because even though a connection may be identified as
+    * being expired, at the very same moment it could be handed out in another thread.  So there
+    * is a natural race condition present.
+    * The way the connection throttler deals with that is to allow the caller to reserve a connection
+    * for expiration.  This must be called BEFORE the actual identified connection is removed from the
+    * connection pool.  If the value returned by this method is "true", then a connection MUST be removed
+    * from the pool and destroyed, whether or not the identified connection is actually still available for
+    * destruction or not.
+    *@return true if a connection from the pool can be expired.  If true is returned, noteConnectionDestruction()
+    *  MUST be called once the connection has actually been destroyed.
+    */
+    @Override
+    public boolean checkExpireConnection()
+    {
+      return parent.checkExpireConnection(binNames, poolCount);
+    }
+    
+    /** Note that a connection has been returned to the pool.  Call this method after a connection has been
+    * placed back into the pool and is available for use.
+    */
+    @Override
+    public void noteConnectionReturnedToPool()
+    {
+      parent.noteConnectionReturnedToPool(binNames, poolCount);
+    }
+
+    /** Note that a connection has been destroyed.  Call this method ONLY after noteReturnedConnection()
+    * or checkDestroyPooledConnection() returns true, AND the connection has been already
+    * destroyed.
+    */
+    @Override
+    public void noteConnectionDestroyed()
+    {
+      parent.noteConnectionDestroyed(binNames);
+    }
+  }
+  
+  /** Fetch throttler implementation class.
+  * This basically stores some parameters and links back to ThrottlingGroup.
+  */
+  protected static class FetchThrottler implements IFetchThrottler
+  {
+    protected final ThrottlingGroup parent;
+    protected final String[] binNames;
+    
+    public FetchThrottler(ThrottlingGroup parent, String[] binNames)
+    {
+      this.parent = parent;
+      this.binNames = binNames;
+    }
+    
+    /** Get permission to fetch a document.  This grants permission to start
+    * fetching a single document, within the connection that has already been
+    * granted permission that created this object.
+    *@return false if the throttler is being shut down.
+    */
+    @Override
+    public boolean obtainFetchDocumentPermission()
+      throws InterruptedException
+    {
+      return parent.obtainFetchDocumentPermission(binNames);
+    }
+    
+    /** Open a fetch stream.  When done (or aborting), call
+    * IStreamThrottler.closeStream() to note the completion of the document
+    * fetch activity.
+    *@return the stream throttler to use to throttle the actual data access.
+    */
+    @Override
+    public IStreamThrottler createFetchStream()
+    {
+      return parent.createFetchStream(binNames);
+    }
+
+  }
+  
+  /** Stream throttler implementation class.
+  * This basically stores some parameters and links back to ThrottlingGroup.
+  */
+  protected static class StreamThrottler implements IStreamThrottler
+  {
+    protected final ThrottlingGroup parent;
+    protected final String[] binNames;
+    
+    public StreamThrottler(ThrottlingGroup parent, String[] binNames)
+    {
+      this.parent = parent;
+      this.binNames = binNames;
+    }
+
+    /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
+    * The throttle group, bin names, etc are already known
+    * to this specific interface object, so it is unnecessary to include them here.
+    *@param byteCount is the number of bytes to get permissions to read.
+    *@return true if the wait took place as planned, or false if the system is being shut down.
+    */
+    @Override
+    public boolean obtainReadPermission(int byteCount)
+      throws InterruptedException
+    {
+      return parent.obtainReadPermission(binNames, byteCount);
+    }
+      
+    /** Note the completion of the read of a block of bytes.  Call this after
+    * obtainReadPermission() was successfully called, and bytes were successfully read.
+    *@param origByteCount is the originally requested number of bytes to get permissions to read.
+    *@param actualByteCount is the number of bytes actually read.
+    */
+    @Override
+    public void releaseReadPermission(int origByteCount, int actualByteCount)
+    {
+      parent.releaseReadPermission(binNames, origByteCount, actualByteCount);
+    }
+
+    /** Note the stream being closed.
+    */
+    @Override
+    public void closeStream()
+    {
+      parent.closeStream(binNames);
+    }
+
+  }
+  
+}
diff --git a/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java b/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
new file mode 100644
index 0000000..2a38d01
--- /dev/null
+++ b/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
@@ -0,0 +1,516 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import org.junit.*;
+import static org.junit.Assert.*;
+
+public class TestThrottler extends org.apache.manifoldcf.core.tests.BaseDerby
+{
+  @Test
+  public void multiThreadConnectionPoolTest()
+    throws Exception
+  {
+    // First, create the throttle group.
+    IThreadContext threadContext = ThreadContextFactory.make();
+    IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+    tg.createOrUpdateThrottleGroup("test","test",new ThrottleSpec());
+    
+    // We create a pretend connection pool
+    IConnectionThrottler connectionThrottler = tg.obtainConnectionThrottler("test","test",new String[]{"A","B","C"});
+    System.out.println("Connection throttler obtained");
+    
+    // How best to test this?
+    // Well, what I'm going to do is to have multiple threads active.  Each one will do perfectly sensible things
+    // while generating a log that includes timestamps for everything that happens.  At the end, the log will be
+    // analyzed for violations of throttling policy.
+    
+    PollingThread pt = new PollingThread();
+    pt.start();
+
+    EventLog eventLog = new EventLog();
+    
+    int numThreads = 10;
+    
+    TesterThread[] threads = new TesterThread[numThreads];
+    for (int i = 0; i < numThreads; i++)
+    {
+      threads[i] = new TesterThread(connectionThrottler, eventLog);
+      threads[i].start();
+    }
+    
+    // Now, join all the threads at the end
+    for (int i = 0; i < numThreads; i++)
+    {
+      threads[i].finishUp();
+    }
+    
+    pt.interrupt();
+    pt.finishUp();
+
+    // Shut down the throttle group
+    tg.removeThrottleGroup("test","test");
+
+    // Finally, do the log analysis
+    eventLog.analyze();
+    
+    System.out.println("Done test");
+  }
+  
+  protected static class PollingThread extends Thread
+  {
+    protected Throwable exception = null;
+    
+    public PollingThread()
+    {
+    }
+    
+    public void run()
+    {
+      try
+      {
+        IThreadContext threadContext = ThreadContextFactory.make();
+        IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+        
+        while (true)
+        {
+          throttleGroups.poll("test");
+          Thread.sleep(1000L);
+        }
+      }
+      catch (InterruptedException e)
+      {
+      }
+      catch (Exception e)
+      {
+        exception = e;
+      }
+
+    }
+    
+    public void finishUp()
+      throws Exception
+    {
+      join();
+      if (exception != null)
+      {
+        if (exception instanceof RuntimeException)
+          throw (RuntimeException)exception;
+        else if (exception instanceof Error)
+          throw (Error)exception;
+        else if (exception instanceof Exception)
+          throw (Exception)exception;
+        else
+          throw new RuntimeException("Unknown exception: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+      }
+    }
+
+  }
+  
+  protected static class TesterThread extends Thread
+  {
+    protected final EventLog eventLog;
+    protected final IConnectionThrottler connectionThrottler;
+    protected Throwable exception = null;
+    
+    public TesterThread(IConnectionThrottler connectionThrottler, EventLog eventLog)
+    {
+      this.connectionThrottler = connectionThrottler;
+      this.eventLog = eventLog;
+    }
+    
+    public void run()
+    {
+      try
+      {
+        int numberConnectionCycles = 3;
+        int numberFetchesPerCycle = 3;
+        
+        for (int k = 0; k < numberConnectionCycles; k++)
+        {
+          // First grab a connection.
+          int rval = connectionThrottler.waitConnectionAvailable();
+          if (rval == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+            throw new Exception("Unexpected return value from waitConnectionAvailable()");
+          IFetchThrottler fetchThrottler;
+          if (rval == IConnectionThrottler.CONNECTION_FROM_CREATION)
+          {
+            // Pretend to create the connection
+            eventLog.addLogEntry(new ConnectionCreatedEvent());
+          }
+          else
+          {
+            // Pretend to get it from the pool
+            eventLog.addLogEntry(new ConnectionFromPoolEvent());
+          }
+          fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+
+          for (int l = 0; l < numberFetchesPerCycle; l++)
+          {
+            // Perform a fake fetch
+            if (fetchThrottler.obtainFetchDocumentPermission() == false)
+              throw new Exception("Unexpected return value for obtainFetchDocumentPermission()");
+            eventLog.addLogEntry(new FetchStartEvent());
+            IStreamThrottler streamThrottler = fetchThrottler.createFetchStream();
+            try
+            {
+              // Do one read
+              if (streamThrottler.obtainReadPermission(1000) == false)
+                throw new Exception("False from obtainReadPermission!");
+              eventLog.addLogEntry(new ReadStartEvent(1000));
+              streamThrottler.releaseReadPermission(1000, 1000);
+              eventLog.addLogEntry(new ReadDoneEvent(1000));
+              // Do another read
+              if (streamThrottler.obtainReadPermission(1000) == false)
+                throw new Exception("False from obtainReadPermission!");
+              eventLog.addLogEntry(new ReadStartEvent(1000));
+              streamThrottler.releaseReadPermission(1000, 1000);
+              eventLog.addLogEntry(new ReadDoneEvent(1000));
+              // Do a third read
+              if (streamThrottler.obtainReadPermission(1000) == false)
+                throw new Exception("False from obtainReadPermission!");
+              eventLog.addLogEntry(new ReadStartEvent(1000));
+              streamThrottler.releaseReadPermission(1000, 100);
+              eventLog.addLogEntry(new ReadDoneEvent(100));
+            }
+            finally
+            {
+              // Close the stream
+              streamThrottler.closeStream();
+            }
+            eventLog.addLogEntry(new FetchDoneEvent());
+          }
+          
+          // Pretend to release the connection
+          boolean destroyIt = connectionThrottler.noteReturnedConnection();
+          if (destroyIt)
+          {
+            eventLog.addLogEntry(new ConnectionDestroyedEvent());
+            connectionThrottler.noteConnectionDestroyed();
+          }
+          else
+          {
+            eventLog.addLogEntry(new ConnectionReturnedToPoolEvent());
+            connectionThrottler.noteConnectionReturnedToPool();
+          }
+        }
+      }
+      catch (Exception e)
+      {
+        e.printStackTrace();
+        exception = e;
+      }
+    }
+    
+    public void finishUp()
+      throws Exception
+    {
+      join();
+      if (exception != null)
+      {
+        if (exception instanceof RuntimeException)
+          throw (RuntimeException)exception;
+        else if (exception instanceof Error)
+          throw (Error)exception;
+        else if (exception instanceof Exception)
+          throw (Exception)exception;
+        else
+          throw new RuntimeException("Unknown exception: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+      }
+    }
+    
+  }
+  
+  protected static class ThrottleSpec implements IThrottleSpec
+  {
+    public ThrottleSpec()
+    {
+    }
+    
+    /** Given a bin name, find the max open connections to use for that bin.
+    *@return -1 if no limit found.
+    */
+    @Override
+    public int getMaxOpenConnections(String binName)
+    {
+      if (binName.equals("A"))
+        return 3;
+      if (binName.equals("B"))
+        return 4;
+      return Integer.MAX_VALUE;
+    }
+
+    /** Look up minimum milliseconds per byte for a bin.
+    *@return 0.0 if no limit found.
+    */
+    @Override
+    public double getMinimumMillisecondsPerByte(String binName)
+    {
+      if (binName.equals("B"))
+        return 1.0;
+      if (binName.equals("C"))
+        return 1.5;
+      return 0.0;
+    }
+
+    /** Look up minimum milliseconds for a fetch for a bin.
+    *@return 0 if no limit found.
+    */
+    @Override
+    public long getMinimumMillisecondsPerFetch(String binName)
+    {
+      if (binName.equals("A"))
+        return 5;
+      if (binName.equals("C"))
+        return 20;
+      return 0;
+    }
+
+  }
+  
+  protected static class EventLog
+  {
+    protected final List<LogEntry> logList = new ArrayList<LogEntry>();
+    
+    public EventLog()
+    {
+    }
+    
+    public synchronized void addLogEntry(LogEntry x)
+    {
+      System.out.println(x.toString());
+      logList.add(x);
+    }
+    
+    public synchronized void analyze()
+      throws Exception
+    {
+      State s = new State();
+      for (LogEntry l : logList)
+      {
+        l.apply(s);
+      }
+      // Success!
+    }
+    
+  }
+  
+  protected static abstract class LogEntry
+  {
+    protected final long timestamp;
+    
+    public LogEntry(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+    
+    public abstract void apply(State state)
+      throws Exception;
+    
+    public String toString()
+    {
+      return "Time: "+timestamp;
+    }
+    
+  }
+  
+  protected static class ConnectionCreatedEvent extends LogEntry
+  {
+    public ConnectionCreatedEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      if (state.outstandingConnections + 1 > 3)
+        throw new Exception("Too many outstanding connections at once!");
+      state.outstandingConnections++;
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection created";
+    }
+
+  }
+
+  protected static class ConnectionDestroyedEvent extends LogEntry
+  {
+    public ConnectionDestroyedEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      state.outstandingConnections--;
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection destroyed";
+    }
+
+  }
+
+  protected static class ConnectionFromPoolEvent extends LogEntry
+  {
+    public ConnectionFromPoolEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection from pool";
+    }
+
+  }
+
+  protected static class ConnectionReturnedToPoolEvent extends LogEntry
+  {
+    public ConnectionReturnedToPoolEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection back to pool";
+    }
+
+  }
+
+  protected static class FetchStartEvent extends LogEntry
+  {
+    public FetchStartEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      if (timestamp < state.lastFetch + 20L - 1L)
+        throw new Exception("Fetch too fast: took place in "+ (timestamp - state.lastFetch) + " milliseconds");
+      state.lastFetch = timestamp;
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Fetch start";
+    }
+  }
+  
+  protected static class FetchDoneEvent extends LogEntry
+  {
+    public FetchDoneEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Fetch done";
+    }
+  }
+  
+  protected static class ReadStartEvent extends LogEntry
+  {
+    final int proposed;
+    
+    public ReadStartEvent(int proposed)
+    {
+      super(System.currentTimeMillis());
+      this.proposed = proposed;
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      if (state.firstByteReadTime == -1L)
+        state.firstByteReadTime = timestamp;
+      else
+      {
+        // Calculate running minimum amount of time it should have taken for the bytes given
+        long minTime = (long)(((double)state.byteTotal) * 1.5 + 0.5);
+        if (timestamp - state.firstByteReadTime < minTime)
+          throw new Exception("Took too short a time to read "+state.byteTotal+" bytes: "+(timestamp - state.firstByteReadTime));
+      }
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Read start("+proposed+")";
+    }
+  }
+
+  protected static class ReadDoneEvent extends LogEntry
+  {
+    final int actual;
+    
+    public ReadDoneEvent(int actual)
+    {
+      super(System.currentTimeMillis());
+      this.actual = actual;
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      state.byteTotal += actual;
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Read done("+actual+")";
+    }
+  }
+
+  protected static class State
+  {
+    public int outstandingConnections = 0;
+    public long lastFetch = 0L;
+    public long firstByteReadTime = -1L;
+    public long byteTotal = 0L;
+  }
+  
+}
diff --git a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java
new file mode 100644
index 0000000..870d34a
--- /dev/null
+++ b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/IdleCleanupThread.java
@@ -0,0 +1,141 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawlerui;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.authorities.interfaces.*;
+import org.apache.manifoldcf.core.system.Logging;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically calls the cleanup method in all connected repository connectors.  The ostensible purpose
+* is to allow the connectors to shutdown idle connections etc.
+*/
+public class IdleCleanupThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Constructor.
+  */
+  public IdleCleanupThread()
+    throws ManifoldCFException
+  {
+    super();
+    setName("Idle cleanup thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    Logging.root.debug("Start up idle cleanup thread");
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      // Get the cache handle.
+      ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
+      
+      IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
+      IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+      IAuthorityConnectorPool authorityConnectorPool = AuthorityConnectorPoolFactory.make(threadContext);
+      IMappingConnectorPool mappingConnectorPool = MappingConnectorPoolFactory.make(threadContext);
+      
+      IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          // Do the cleanup
+          repositoryConnectorPool.pollAllConnectors();
+          outputConnectorPool.pollAllConnectors();
+          authorityConnectorPool.pollAllConnectors();
+          mappingConnectorPool.pollAllConnectors();
+          
+          throttleGroups.poll();
+          
+          cacheManager.expireObjects(System.currentTimeMillis());
+          
+          // Sleep for the retry interval.
+          ManifoldCF.sleep(5000L);
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            Logging.root.error("Idle cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.root.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("Crawler UI ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.root.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("Crawler UI could not start - shutting down");
+      Logging.root.fatal("IdleCleanupThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+
+  }
+
+}
diff --git a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
index 2ea2362..92a2e86 100644
--- a/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
+++ b/framework/crawler-ui/src/main/java/org/apache/manifoldcf/crawlerui/ServletListener.java
@@ -29,11 +29,16 @@
 {
   public static final String _rcsid = "@(#)$Id$";
 
+  protected IdleCleanupThread idleCleanupThread = null;
+
   public void contextInitialized(ServletContextEvent sce)
   {
     try
     {
-      ManifoldCF.initializeEnvironment(ThreadContextFactory.make());
+      IThreadContext threadContext = ThreadContextFactory.make();
+      ManifoldCF.initializeEnvironment(threadContext);
+      idleCleanupThread = new IdleCleanupThread();
+      idleCleanupThread.start();
     }
     catch (ManifoldCFException e)
     {
@@ -43,7 +48,21 @@
   
   public void contextDestroyed(ServletContextEvent sce)
   {
-    ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+    try
+    {
+      while (true)
+      {
+        if (idleCleanupThread == null)
+          break;
+        idleCleanupThread.interrupt();
+        if (!idleCleanupThread.isAlive())
+          idleCleanupThread = null;
+      }
+    }
+    finally
+    {
+      ManifoldCF.cleanUpEnvironment(ThreadContextFactory.make());
+    }
   }
 
 }
diff --git a/framework/crawler-ui/src/main/webapp/editauthority.jsp b/framework/crawler-ui/src/main/webapp/editauthority.jsp
index a31b8e8..978dcca 100644
--- a/framework/crawler-ui/src/main/webapp/editauthority.jsp
+++ b/framework/crawler-ui/src/main/webapp/editauthority.jsp
@@ -544,7 +544,7 @@
 		    <table class="displaytable">
 			<tr><td class="separator" colspan="5"><hr/></td></tr>
 			<tr>
-				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.PerJVMColon")%></nobr></td>
+				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editauthority.MaxConnectionsColon")%></nobr></td>
 				<td class="value" colspan="4"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
 			</tr>
 		    </table>
diff --git a/framework/crawler-ui/src/main/webapp/editconnection.jsp b/framework/crawler-ui/src/main/webapp/editconnection.jsp
index 914cda2..1906cef 100644
--- a/framework/crawler-ui/src/main/webapp/editconnection.jsp
+++ b/framework/crawler-ui/src/main/webapp/editconnection.jsp
@@ -470,7 +470,7 @@
 		    <table class="displaytable">
 			<tr><td class="separator" colspan="2"><hr/></td></tr>
 			<tr>
-				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.Maxconnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.PerJVMColon")%></nobr></td>
+				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editconnection.MaxconnectionsColon")%></nobr></td>
 				<td class="value"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
 			</tr>
 			<tr>
diff --git a/framework/crawler-ui/src/main/webapp/editmapper.jsp b/framework/crawler-ui/src/main/webapp/editmapper.jsp
index a566118..da54a05 100644
--- a/framework/crawler-ui/src/main/webapp/editmapper.jsp
+++ b/framework/crawler-ui/src/main/webapp/editmapper.jsp
@@ -471,7 +471,7 @@
 		    <table class="displaytable">
 			<tr><td class="separator" colspan="5"><hr/></td></tr>
 			<tr>
-				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.PerJVMColon")%></nobr></td>
+				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editmapper.MaxConnectionsColon")%></nobr></td>
 				<td class="value" colspan="4"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
 			</tr>
 		    </table>
diff --git a/framework/crawler-ui/src/main/webapp/editoutput.jsp b/framework/crawler-ui/src/main/webapp/editoutput.jsp
index 6e6a3ee..6324d97 100644
--- a/framework/crawler-ui/src/main/webapp/editoutput.jsp
+++ b/framework/crawler-ui/src/main/webapp/editoutput.jsp
@@ -400,7 +400,7 @@
 		    <table class="displaytable">
 			<tr><td class="separator" colspan="2"><hr/></td></tr>
 			<tr>
-				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.MaxConnections")%></nobr><br/><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.PerJVMColon")%></nobr></td>
+				<td class="description"><nobr><%=Messages.getBodyString(pageContext.getRequest().getLocale(),"editoutput.MaxConnectionsColon")%></nobr></td>
 				<td class="value"><input type="text" size="6" name="maxconnections" value='<%=Integer.toString(maxConnections)%>'/></td>
 			</tr>
 		    </table>
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
index 065467b..9b6150b 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
@@ -5845,7 +5845,10 @@
     // (3) If the connector has some other model, we look at the start time.  A start
     // time of 0 implies a full scan, while any other start time implies an incremental
     // scan.
-
+    
+    // Always reset document schedules for those documents already pending!
+    jobQueue.resetPendingDocumentSchedules(jobID);
+    
     // Complete connector model is told everything, so no delete phase.
     if (connectorModel == IRepositoryConnector.MODEL_ADD_CHANGE_DELETE)
     {
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
index b6b0b6e..a5e6c24 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
@@ -717,6 +717,28 @@
     TrackerClass.noteJobChange(jobID,"Prepare full scan");
   }
 
+  /** Reset schedule for all PENDINGPURGATORY entries.
+  *@param jobID is the job identifier.
+  */
+  public void resetPendingDocumentSchedules(Long jobID)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    // Do not reset priorities here!  They should all be blank at this point.
+    map.put(checkTimeField,new Long(0L));
+    map.put(checkActionField,actionToString(ACTION_RESCAN));
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(jobIDField,jobID),
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_PENDINGPURGATORY),
+        statusToString(STATUS_PENDING)})});
+    performUpdate(map,"WHERE "+query,list,null);
+    noteModifications(0,1,0);
+  }
+  
   /** For ADD_CHANGE_DELETE jobs where the specifications have been changed,
   * we must reconsider every existing document.  So reconsider them all.
   *@param jobID is the job identifier.
diff --git a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
index 8f599b1..2bd2d3e 100644
--- a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
+++ b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_en_US.properties
@@ -107,8 +107,7 @@
 editoutput.CancelOutputConnectionEditing=Cancel output connection editing
 editoutput.Save=Save
 editoutput.SaveThisOutputConnection=Save this output connection
-editoutput.MaxConnections=Max connections
-editoutput.PerJVMColon=(per JVM):
+editoutput.MaxConnectionsColon=Max connections:
 editoutput.EditOutputConnection=Edit output connection
 editoutput.NameColon=Name:
 editoutput.DescriptionColon=Description:
@@ -203,8 +202,7 @@
 editauthority.ConnectionTypeColon=Connection type:
 editauthority.Continue=Continue
 editauthority.ContinueToNextPage=Continue to next page
-editauthority.MaxConnections=Max connections
-editauthority.PerJVMColon=(per JVM):
+editauthority.MaxConnectionsColon=Max connections:
 editauthority.Cancel=Cancel
 editauthority.CancelAuthorityEditing=Cancel authority editing
 editauthority.Save=Save
@@ -236,8 +234,7 @@
 editmapper.ConnectionTypeColon=Connection type:
 editmapper.Continue=Continue
 editmapper.ContinueToNextPage=Continue to next page
-editmapper.MaxConnections=Max connections
-editmapper.PerJVMColon=(per JVM):
+editmapper.MaxConnectionsColon=Max connections:
 editmapper.Cancel=Cancel
 editmapper.CancelMappingEditing=Cancel mapping editing
 editmapper.Save=Save
@@ -279,8 +276,7 @@
 editconnection.CancelConnectionEditing=Cancel connection editing
 editconnection.NameColon=Name:
 editconnection.DescriptionColon=Description:
-editconnection.Maxconnections=Max connections
-editconnection.PerJVMColon=(per JVM):
+editconnection.MaxconnectionsColon=Max connections:
 editconnection.ThrottlingColon=Throttling:
 editconnection.Add=Add
 editconnection.BinRegularExpression=Bin regular expression
diff --git a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
index f64add8..b30c7ba 100644
--- a/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
+++ b/framework/ui-core/src/main/native2ascii/org/apache/manifoldcf/ui/i18n/common_ja_JP.properties
@@ -107,8 +107,7 @@
 editoutput.CancelOutputConnectionEditing=出力コネクションの編集をキャンセル
 editoutput.Save=保存
 editoutput.SaveThisOutputConnection=出力コネクションを保存
-editoutput.MaxConnections=最大コネクション数
-editoutput.PerJVMColon=(/JVM):
+editoutput.MaxConnectionsColon=最大コネクション数:
 editoutput.EditOutputConnection=出力コネクションを編集
 editoutput.NameColon=名前:
 editoutput.DescriptionColon=説明:
@@ -203,8 +202,7 @@
 editauthority.ConnectionTypeColon=コネクションタイプ:
 editauthority.Continue=次へ
 editauthority.ContinueToNextPage=次のページに進む
-editauthority.MaxConnections=最大コネクション数
-editauthority.PerJVMColon=(/JVM):
+editauthority.MaxConnectionsColon=最大コネクション数:
 editauthority.Cancel=キャンセル
 editauthority.CancelAuthorityEditing=権限の編集をキャンセル
 editauthority.Save=保存
@@ -236,8 +234,7 @@
 editmapper.ConnectionTypeColon=コネクションタイプ:
 editmapper.Continue=次へ
 editmapper.ContinueToNextPage=次のページに進む
-editmapper.MaxConnections=最大コネクション数
-editmapper.PerJVMColon=(/JVM):
+editmapper.MaxConnectionsColon=最大コネクション数:
 editmapper.Cancel=キャンセル
 editmapper.CancelMappingEditing=マッピングの編集をキャンセル
 editmapper.Save=保存
@@ -279,8 +276,7 @@
 editconnection.CancelConnectionEditing=コネクションの編集をキャンセル
 editconnection.NameColon=名前:
 editconnection.DescriptionColon=説明:
-editconnection.Maxconnections=最大コネクション数
-editconnection.PerJVMColon=(/JVM):
+editconnection.MaxconnectionsColon=最大コネクション数:
 editconnection.ThrottlingColon=スロットリング:
 editconnection.Add=追加
 editconnection.BinRegularExpression=Bin正規表現
diff --git a/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml b/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
index 062da97..dab03d8 100644
--- a/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
+++ b/site/src/documentation/content/xdocs/en_US/how-to-build-and-deploy.xml
@@ -873,6 +873,7 @@
             <tr><td>org.apache.manifoldcf.hsqldbdatabaseport</td><td>No</td><td>The HSQLDB remote server port.</td></tr>
             <tr><td>org.apache.manifoldcf.hsqldbdatabaseinstance</td><td>No</td><td>The HSQLDB remote database instance name.</td></tr>
             <tr><td>org.apache.manifoldcf.mysql.server</td><td>No</td><td>The MySQL server name.  Defaults to 'localhost'.</td></tr>
+            <tr><td>org.apache.manifoldcf.mysql.client</td><td>No</td><td>The MySQL client property.  Defaults to 'localhost'.  You may want to set this to '%' for a multi-machine setup.</td></tr>
             <tr><td>org.apache.manifoldcf.lockmanagerclass</td><td>No</td><td>Specifies the class to use to implement synchronization.  Default
                 is either file-based synchronization or in-memory synchronization, using the org.apache.manifoldcf.core.lockmanager.LockManager class.
                 Options include org.apache.manifoldcf.core.lockmanager.BaseLockManager, org.apache.manifoldcf.core.FileLockManager, and
diff --git a/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml b/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
index 3a60e6d..dab03d8 100644
--- a/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
+++ b/site/src/documentation/content/xdocs/ja_JP/how-to-build-and-deploy.xml
@@ -442,8 +442,9 @@
             <tr><td><em>start-database[.sh|.bat]</em></td><td>script to start the HSQLDB database</td></tr>
             <tr><td><em>initialize[.sh|.bat]</em></td><td>script to create the database instance, create all database tables, and register connectors</td></tr>
             <tr><td><em>start-webapps[.sh|.bat]</em></td><td>script to start Jetty with the ManifoldCF web applications deployed</td></tr>
-            <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the agents process</td></tr>
-            <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop a running agents process cleanly</td></tr>
+            <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the (first) agents process</td></tr>
+            <tr><td><em>start-agents-2[.sh|.bat]</em></td><td>script to start a second agents process</td></tr>
+            <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop all running agents processes cleanly</td></tr>
             <tr><td><em>lock-clean[.sh|.bat]</em></td><td>script to clean up dirty locks (run only when all webapps and processes are stopped)</td></tr>
           </table>
           <p></p>
@@ -452,7 +453,7 @@
             <p></p>
             <p>If you run the file-based multiprocess model, after you first start the database (using <em>start-database[.sh|.bat]</em>), you will need to initialize the database before you start the agents process or use the crawler UI.  To do this, all you need to do is
                 run the <em>initialize[.sh|.bat]</em> script.  Then, you will need to start the web applications (using <em>start-webapps[.sh|.bat]</em>) and the agents process (using
-                <em>start-agents[.sh|.bat]</em>).</p>
+                <em>start-agents[.sh|.bat]</em>), and optionally the second agents process (using <em>start-agents-2[.sh|.bat]</em>).</p>
             <p></p>
           </section>
 
@@ -480,8 +481,9 @@
             <tr><td><em>start-database[.sh|.bat]</em></td><td>script to start the HSQLDB database</td></tr>
             <tr><td><em>initialize[.sh|.bat]</em></td><td>script to create the database instance, create all database tables, and register connectors</td></tr>
             <tr><td><em>start-webapps[.sh|.bat]</em></td><td>script to start Jetty with the ManifoldCF web applications deployed</td></tr>
-            <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start the agents process</td></tr>
-            <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop a running agents process cleanly</td></tr>
+            <tr><td><em>start-agents[.sh|.bat]</em></td><td>script to start (the first) agents process</td></tr>
+            <tr><td><em>start-agents-2[.sh|.bat]</em></td><td>script to start a second agents process</td></tr>
+            <tr><td><em>stop-agents[.sh|.bat]</em></td><td>script to stop all running agents processes cleanly</td></tr>
           </table>
           <p></p>
           <section>
@@ -494,8 +496,8 @@
               <li>Initialize the ManifoldCF shared configuration data (using <em>setglobalproperties[.sh|.bat]</em>)</li>
               <li>Start the database (using <em>start-database[.sh|.bat]</em>)</li>
               <li>Initialize the database (using <em>initialize[.sh|.bat]</em>)</li>
-              <li>Start the agents process (using <em>start-agents[.sh|.bat]</em></li>
-              <li>Start the web applications (using <em>start-webapps[.sh|.bat]</em></li>
+              <li>Start the agents process (using <em>start-agents[.sh|.bat]</em>, and optionally <em>start-agents-2[.sh|.bat]</em>)</li>
+              <li>Start the web applications (using <em>start-webapps[.sh|.bat]</em>)</li>
             </ol>
             <p></p>
           </section>
@@ -871,6 +873,7 @@
             <tr><td>org.apache.manifoldcf.hsqldbdatabaseport</td><td>No</td><td>The HSQLDB remote server port.</td></tr>
             <tr><td>org.apache.manifoldcf.hsqldbdatabaseinstance</td><td>No</td><td>The HSQLDB remote database instance name.</td></tr>
             <tr><td>org.apache.manifoldcf.mysql.server</td><td>No</td><td>The MySQL server name.  Defaults to 'localhost'.</td></tr>
+            <tr><td>org.apache.manifoldcf.mysql.client</td><td>No</td><td>The MySQL client property.  Defaults to 'localhost'.  You may want to set this to '%' for a multi-machine setup.</td></tr>
             <tr><td>org.apache.manifoldcf.lockmanagerclass</td><td>No</td><td>Specifies the class to use to implement synchronization.  Default
                 is either file-based synchronization or in-memory synchronization, using the org.apache.manifoldcf.core.lockmanager.LockManager class.
                 Options include org.apache.manifoldcf.core.lockmanager.BaseLockManager, org.apache.manifoldcf.core.FileLockManager, and