blob: ab931e8601e432ebe898121f41b06533d6a923a2 [file] [log] [blame]
/* $Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $ */
/**`
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.connectors.webcrawler;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.common.XThreadInputStream;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import org.apache.manifoldcf.crawler.system.ManifoldCF;
import java.util.*;
import java.io.*;
import java.net.*;
import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.NameValuePair;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.conn.ssl.AllowAllHostnameVerifier;
import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.NTCredentials;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.AbstractHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.util.EntityUtils;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.HttpStatus;
import org.apache.http.HttpHost;
import org.apache.http.Header;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.message.BasicHeader;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.client.params.HttpClientParams;
import org.apache.http.client.params.CookiePolicy;
import org.apache.http.cookie.params.CookieSpecPNames;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.cookie.CookieOrigin;
import org.apache.http.cookie.ClientCookie;
import org.apache.http.cookie.Cookie;
import org.apache.http.impl.cookie.BasicPathHandler;
import org.apache.http.impl.cookie.BrowserCompatSpec;
import org.apache.http.cookie.CookieSpecFactory;
import org.apache.http.cookie.CookieSpec;
import org.apache.http.cookie.MalformedCookieException;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.client.RedirectException;
import org.apache.http.client.CircularRedirectException;
import org.apache.http.NoHttpResponseException;
import org.apache.http.HttpException;
/** This class uses httpclient to fetch stuff from webservers. However, it additionally controls the fetch
* rate in two ways: first, controlling the overall bandwidth used per server, and second, limiting the number
* of simultaneous open connections per server.
* An instance of this class would very probably need to have a lifetime consistent with the long-term nature
* of these values, and be static.
*/
public class ThrottledFetcher
{
public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $";
/** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */
protected static final boolean recordEverything = false;
protected static final long TIME_2HRS = 7200000L;
protected static final long TIME_5MIN = 300000L;
protected static final long TIME_15MIN = 1500000L;
protected static final long TIME_6HRS = 6L * 60L * 60000L;
protected static final long TIME_1DAY = 24L * 60L * 60000L;
/** This is the static pool of ConnectionBin's, keyed by bin name. */
protected static HashMap connectionBins = new HashMap();
/** This is the static pool of ThrottleBin's, keyed by bin name. */
protected static HashMap throttleBins = new HashMap();
/** This global lock protects the "distributed pool" resource, and insures that a connection
* can get pulled out of all the right pools and wind up in only the hands of one thread. */
protected static Integer poolLock = new Integer(0);
/** Current host name */
private static String currentHost = null;
static
{
// Find the current host name
try
{
java.net.InetAddress addr = java.net.InetAddress.getLocalHost();
// Get hostname
currentHost = addr.getHostName();
}
catch (java.net.UnknownHostException e)
{
}
}
/** The read chunk length */
protected static final int READ_CHUNK_LENGTH = 4096;
/** Constructor.
*/
public ThrottledFetcher()
{
}
/** Obtain a connection to specified protocol, server, and port. We use the protocol because the
* setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if
* we distinguish connections based on that.
*@param protocol is the protocol, e.g. "http"
*@param server is the server IP address, e.g. "10.32.65.1"
*@param port is the port to connect to, e.g. 80. Pass -1 if the default port for the protocol is desired.
*@param authentication is the page credentials object to use for the fetch. If null, no credentials are available.
*@param trustStore is the current trust store in effect for the fetch.
*@param binNames is the set of bins, in order, that should be used for throttling this connection.
* Note that the bin names for a given IP address and port MUST be the same for every connection!
* This must be enforced by whatever it is that builds the bins - it must do so given an IP and port.
*@param throttleDescription is the description of all the throttling that should take place.
*@param connectionLimit isthe maximum number of connections permitted.
*@return an IThrottledConnection object that can be used to fetch from the port.
*/
public static IThrottledConnection getConnection(String protocol, String server, int port,
PageCredentials authentication,
IKeystoreManager trustStore,
ThrottleDescription throttleDescription, String[] binNames,
int connectionLimit,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
// Create the https scheme for this connection
javax.net.ssl.SSLSocketFactory baseFactory;
String trustStoreString;
if (trustStore != null)
{
baseFactory = trustStore.getSecureSocketFactory();
trustStoreString = trustStore.getString();
}
else
{
baseFactory = KeystoreManagerFactory.getTrustingSecureSocketFactory();
trustStoreString = null;
}
ConnectionBin[] bins = new ConnectionBin[binNames.length];
// Now, start looking for a connection
int i = 0;
while (i < binNames.length)
{
String binName = binNames[i];
// Find or create the bin object
ConnectionBin cb;
synchronized (connectionBins)
{
cb = (ConnectionBin)connectionBins.get(binName);
if (cb == null)
{
cb = new ConnectionBin(binName);
connectionBins.put(binName,cb);
}
//cb.sanityCheck();
}
bins[i] = cb;
i++;
}
ThrottledConnection connectionToReuse;
long startTime = 0L;
if (Logging.connectors.isDebugEnabled())
{
startTime = System.currentTimeMillis();
Logging.connectors.debug("WEB: Waiting to start getting a connection to "+protocol+"://"+server+":"+port);
}
synchronized (poolLock)
{
// If the number of outstanding connections is greater than the global limit, close pooled connections until we are under the limit
long idleTimeout = 64000L;
while (true)
{
int openCount = 0;
// Lock up everything for a moment
synchronized (connectionBins)
{
// Time out connections that have been idle too long. To do this, we need to go through
// all connection bins and look at the pool
Iterator binIter = connectionBins.keySet().iterator();
while (binIter.hasNext())
{
String binName = (String)binIter.next();
ConnectionBin cb = (ConnectionBin)connectionBins.get(binName);
openCount += cb.countConnections();
}
}
if (openCount < connectionLimit)
break;
if (idleTimeout == 0L)
{
// Can't actually conclude anything here unfortunately
// Logging.connectors.warn("Web: Exceeding connection limit! Open count = "+Integer.toString(openCount)+"; limit = "+Integer.toString(connectionLimit));
break;
}
idleTimeout = idleTimeout/4L;
// Lock up everything for a moment, since otherwise we could delete something people
// expect to stick around.
synchronized (connectionBins)
{
// Time out connections that have been idle too long. To do this, we need to go through
// all connection bins and look at the pool
Iterator binIter = connectionBins.keySet().iterator();
while (binIter.hasNext())
{
String binName = (String)binIter.next();
ConnectionBin cb = (ConnectionBin)connectionBins.get(binName);
cb.flushIdleConnections(idleTimeout);
}
}
}
try
{
// Retry until we get the connection.
while (true)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Attempting to get connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
i = 0;
connectionToReuse = null;
try
{
// Now, start looking for a connection
while (i < binNames.length)
{
String binName = binNames[i];
ConnectionBin cb = bins[i];
// Figure out the connection limit for this bin, based on the throttle description
int maxConnections = throttleDescription.getMaxOpenConnections(binName);
// If no restriction, use a very large value.
if (maxConnections == -1)
maxConnections = Integer.MAX_VALUE;
else if (maxConnections == 0)
maxConnections = 1;
// Now, do what we need to do to reserve our connection for this bin.
// If we can't reserve it now, we plan on undoing everything we did, so
// whatever we do must be reversible. Furthermore, nothing we call here
// should actually wait(); that will occur if we can't get what we need out
// here at this level.
if (connectionToReuse != null)
{
// We have a reuse candidate already, so just make sure each remaining bin is within
// its limits.
cb.insureWithinLimits(maxConnections,connectionToReuse);
}
else
{
connectionToReuse = cb.findConnection(maxConnections,bins,protocol,server,port,authentication,trustStoreString,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
}
// Increment after we successfully handled this bin
i++;
}
// That loop completed, meaning that we think we got a connection. Now, go through all the bins and make sure there's enough time since the last
// fetch. If not, we have to clean everything up and try again.
long currentTime = System.currentTimeMillis();
// Global lock needed to insure that fetch time is updated across all bins simultaneously
synchronized (connectionBins)
{
i = 0;
while (i < binNames.length)
{
String binName = binNames[i];
ConnectionBin cb = bins[i];
//cb.sanityCheck();
// Get the minimum time between fetches for this bin, based on the throttle description
long minMillisecondsPerFetch = throttleDescription.getMinimumMillisecondsPerFetch(binName);
if (cb.getLastFetchTime() + minMillisecondsPerFetch > currentTime)
throw new WaitException(cb.getLastFetchTime() + minMillisecondsPerFetch - currentTime);
i++;
}
i = 0;
while (i < binNames.length)
{
ConnectionBin cb = bins[i++];
cb.setLastFetchTime(currentTime);
}
}
}
catch (Throwable e)
{
// We have to free everything and retry, because otherwise we are subject to deadlock.
// The only thing we have reserved is the connection, which we must free if there's a
// problem.
if (connectionToReuse != null)
{
// Return this connection to the pool. That is, the pools for all the bins.
int k = 0;
while (k < binNames.length)
{
String binName = binNames[k++];
ConnectionBin cb;
synchronized (connectionBins)
{
cb = (ConnectionBin)connectionBins.get(binName);
if (cb == null)
{
cb = new ConnectionBin(binName);
connectionBins.put(binName,cb);
}
}
//cb.sanityCheck();
cb.addToPool(connectionToReuse);
//cb.sanityCheck();
}
connectionToReuse = null;
// We should not need to notify here because nothing has really changed from
// when the attempt started to get the connection. We just undid what we did.
}
if (e instanceof Error)
throw (Error)e;
if (e instanceof ManifoldCFException)
throw (ManifoldCFException)e;
if (e instanceof WaitException)
{
// Wait because we need a certain amount of time after a previous fetch.
WaitException we = (WaitException)e;
long waitAmount = we.getWaitAmount();
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Waiting "+new Long(waitAmount).toString()+" ms before starting fetch on "+protocol+"://"+server+":"+port);
// Really don't want to sleep inside the pool lock!
// The easiest thing to do instead is to use a timed wait. There is no reason why we need
// to wake before the wait time is exceeded - but it's harmless, and the alternative is to
// do more reorganization than probably is wise.
poolLock.wait(waitAmount);
continue;
}
if (e instanceof PoolException)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Going into wait for connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
// Now, wait for something external to change. The only thing that can help us is if
// some other thread frees a connection.
poolLock.wait();
// Go back around and try again.
continue;
}
throw new ManifoldCFException("Unexpected exception encountered: "+e.getMessage(),e);
}
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Successfully got connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
// If we have a connection located, activate it.
if (connectionToReuse == null)
connectionToReuse = new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
connectionToReuse.setup(throttleDescription);
return connectionToReuse;
}
}
catch (InterruptedException e)
{
throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
}
/** Flush connections that have timed out from inactivity. */
public static void flushIdleConnections()
throws ManifoldCFException
{
synchronized (poolLock)
{
// Lock up everything for a moment, since otherwise we could delete something people
// expect to stick around.
synchronized (connectionBins)
{
// Time out connections that have been idle too long. To do this, we need to go through
// all connection bins and look at the pool
Iterator binIter = connectionBins.keySet().iterator();
while (binIter.hasNext())
{
String binName = (String)binIter.next();
ConnectionBin cb = (ConnectionBin)connectionBins.get(binName);
if (cb.flushIdleConnections(60000L))
{
// Bin is no longer doing anything; get rid of it.
// I've determined this is safe - inUseConnections is designed to prevent any active connection from getting
// whacked.
// Oops. Hang results again when I enabled this, so out it goes again.
//connectionBins.remove(binName);
//binIter = connectionBins.keySet().iterator();
}
}
}
}
}
/** Connection pool for a bin.
* An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
*/
protected static class ConnectionBin
{
/** This is the bin name which this connection pool belongs to */
protected String binName;
/** This is the number of connections in this bin that are signed out and presumably in use */
protected int inUseConnections = 0;
/** This is the last time a fetch was done on this bin */
protected long lastFetchTime = 0L;
/** This object is what we synchronize on when we are waiting on a connection to free up for this
* bin. This is a separate object, because we also want to protect the integrity of the
* ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
protected Integer connectionWait = new Integer(0);
/** This map contains ThrottledConnection objects that are in the pool, and are not in use. */
protected HashMap freePool = new HashMap();
/** Constructor. */
public ConnectionBin(String binName)
{
this.binName = binName;
}
/** Get the bin name. */
public String getBinName()
{
return binName;
}
/** Note the creation of an active connection that belongs to this bin. The slots all must
* have been reserved prior to the connection being created.
*/
public synchronized void noteConnectionCreation()
{
inUseConnections++;
}
/** Note the destruction of an active connection that belongs to this bin.
*/
public synchronized void noteConnectionDestruction()
{
inUseConnections--;
}
/** Activate a connection that should be in the pool.
* Removes the connection from the pool.
*/
public synchronized void takeFromPool(ThrottledConnection tc)
{
// Remove this connection from the pool list
freePool.remove(tc);
inUseConnections++;
}
/** Put a connection into the pool.
*/
public synchronized void addToPool(ThrottledConnection tc)
{
// Add this connection to the pool list
freePool.put(tc,tc);
inUseConnections--;
}
/** Verify that this bin is within limits.
*/
public synchronized void insureWithinLimits(int maxConnections, ThrottledConnection existingConnection)
throws PoolException
{
//sanityCheck();
// See if the connection is in fact within the pool; if so, we just presume the limits are fine as they are.
// This is necessary because if the connection that's being checked for is freed, then we wreck the data structures.
if (existsInPool(existingConnection))
return;
while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
{
//sanityCheck();
// If there are any pool connections, free them one at a time
ThrottledConnection freeMe = getPoolConnection();
if (freeMe != null)
{
// It's okay to call activate since we guarantee that only one thread is trying to grab
// a connection at a time.
freeMe.activate();
freeMe.destroy();
continue;
}
// Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
throw new PoolException("Waiting for a connection");
}
}
/** This method is called only when there is no existing connection yet identified that can be used
* for contacting the server and port specified. This method returns a connection if a matching one can be found;
* otherwise it returns null.
* If a matching connection is found, it is activated before it is returned. That removes the connection from all
* pools in which it lives.
*/
public synchronized ThrottledConnection findConnection(int maxConnections,
ConnectionBin[] binNames, String protocol, String server, int port,
PageCredentials authentication, String trustStoreString,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws PoolException
{
//sanityCheck();
// First, wait until there's no excess.
while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
{
//sanityCheck();
// If there are any pool connections, free them one at a time
ThrottledConnection freeMe = getPoolConnection();
if (freeMe != null)
{
// It's okay to call activate since we guarantee that only one thread is trying to grab
// a connection at a time.
freeMe.activate();
freeMe.destroy();
continue;
}
// Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
throw new PoolException("Waiting for a connection");
}
// Wait until there's a free one
if (maxConnections > 0 && inUseConnections > maxConnections-1)
{
// Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
throw new PoolException("Waiting for a connection");
}
// A null return means that there is no existing pooled connection that matches, and the caller is free to create a new connection
ThrottledConnection rval = getPoolConnection();
if (rval == null)
return null;
// It's okay to call activate since we guarantee that only one thread is trying to grab
// a connection at a time.
rval.activate();
//sanityCheck();
if (!rval.matches(binNames,protocol,server,port,authentication,trustStoreString,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword))
{
// Destroy old connection. That should free up space for a new creation.
rval.destroy();
// Return null to indicate that we can create a new connection now
return null;
}
// Existing entry matched. Activate and return it.
return rval;
}
/** Note a new time for connection fetch for this pool.
*@param currentTime is the time the fetch was started.
*/
public synchronized void setLastFetchTime(long currentTime)
{
if (currentTime > lastFetchTime)
lastFetchTime = currentTime;
}
/** Get the last fetch time.
*@return the time.
*/
public synchronized long getLastFetchTime()
{
return lastFetchTime;
}
/** Count connections that are in use.
*@return connections that are in use.
*/
public synchronized int countConnections()
{
return freePool.size() + inUseConnections;
}
/** Flush any idle connections.
*@return true if the connection bin is now, in fact, empty.
*/
public synchronized boolean flushIdleConnections(long idleTimeout)
{
//sanityCheck();
// We have to time out the pool connections. When there are no pool connections
// left, AND the in-use counts are zero, we can delete the whole thing.
Iterator iter = freePool.keySet().iterator();
while (iter.hasNext())
{
ThrottledConnection tc = (ThrottledConnection)iter.next();
if (tc.flushIdleConnections(idleTimeout))
{
// Can delete this connection, since it timed out.
tc.activate();
tc.destroy();
iter = freePool.keySet().iterator();
}
}
//sanityCheck();
return (freePool.size() == 0 && inUseConnections == 0);
}
/** Grab a connection from the current pool. This does not remove the connection from the pool;
* it just sets it up so that later methods can do that.
*/
protected ThrottledConnection getPoolConnection()
{
if (freePool.size() == 0)
return null;
Iterator iter = freePool.keySet().iterator();
ThrottledConnection rval = (ThrottledConnection)iter.next();
return rval;
}
/** Check if a connection exists in the pool already.
*/
protected boolean existsInPool(ThrottledConnection tc)
{
return freePool.get(tc) != null;
}
public synchronized void sanityCheck()
{
// Make sure all the connections in the current pool in fact have a reference to this bin.
Iterator iter = freePool.keySet().iterator();
while (iter.hasNext())
{
ThrottledConnection tc = (ThrottledConnection)iter.next();
tc.mustHaveReference(this);
}
}
}
/** Throttles for a bin.
* An instance of this class keeps track of the information needed to bandwidth throttle access
* to a url belonging to a specific bin.
*
* In order to calculate
* the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
* For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
* window length is too long.
*
* One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
* "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
* smallest intervals.
*
* Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
* at the fastest possible rate without long pauses spent doing something else. The only complication is that
* fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
* For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
* than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
* acceptable.
*
* Some notes on the algorithms used to limit server bandwidth impact
* ==================================================================
*
* In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
* the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
* is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
* access as a way of estimating how long it will take to fetch those next n bytes.
*
* For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
* and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
*
* 1) The first chunk of any series should proceed without interference from other connections to the same server.
* The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
*
* 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
* connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
* took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
* using less server bandwidth.
*
* 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
* new chunks from other connections can start.
*
*/
protected static class ThrottleBin
{
/** This is the bin name which this throttle belongs to. */
protected String binName;
/** This is the reference count for this bin (which records active references) */
protected int refCount = 0;
/** The inverse rate estimate of the first fetch, in ms/byte */
protected double rateEstimate = 0.0;
/** Flag indicating whether a rate estimate is needed */
protected boolean estimateValid = false;
/** Flag indicating whether rate estimation is in progress yet */
protected boolean estimateInProgress = false;
/** The start time of this series */
protected long seriesStartTime = -1L;
/** Total actual bytes read in this series; this includes fetches in progress */
protected long totalBytesRead = -1L;
/** This object is used to gate access while the first chunk is being read */
protected Integer firstChunkLock = new Integer(0);
/** Constructor. */
public ThrottleBin(String binName)
{
this.binName = binName;
}
/** Get the bin name. */
public String getBinName()
{
return binName;
}
/** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
* May wait until schedule allows.
*/
public void beginFetch()
throws InterruptedException
{
synchronized (this)
{
if (refCount == 0)
{
// Now, reset bandwidth throttling counters
estimateValid = false;
rateEstimate = 0.0;
totalBytesRead = 0L;
estimateInProgress = false;
seriesStartTime = -1L;
}
refCount++;
}
}
/** Note the start of an individual byte read of a specified size. Call this method just before the
* read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
*/
public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
throws InterruptedException
{
long currentTime = System.currentTimeMillis();
synchronized (firstChunkLock)
{
while (estimateInProgress)
firstChunkLock.wait();
if (estimateValid == false)
{
seriesStartTime = currentTime;
estimateInProgress = true;
// Add these bytes to the estimated total
synchronized (this)
{
totalBytesRead += (long)byteCount;
}
// Exit early; this thread isn't going to do any waiting
return;
}
}
long waitTime = 0L;
synchronized (this)
{
// Add these bytes to the estimated total
totalBytesRead += (long)byteCount;
// Estimate the time this read will take, and wait accordingly
long estimatedTime = (long)(rateEstimate * (double)byteCount);
// Figure out how long the total byte count should take, to meet the constraint
long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
// The wait time is the different between our desired end time, minus the estimated time to read the data, and the
// current time. But it can't be negative.
waitTime = (desiredEndTime - estimatedTime) - currentTime;
}
if (waitTime > 0L)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Performing a read wait on bin '"+binName+"' of "+
new Long(waitTime).toString()+" ms.");
ManifoldCF.sleep(waitTime);
}
}
/** Note the end of an individual read from the server. Call this just after an individual read completes.
* Pass the actual number of bytes read to the method.
*/
public void endRead(int originalCount, int actualCount)
{
long currentTime = System.currentTimeMillis();
synchronized (this)
{
totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
}
// Only one thread should get here if it's the first chunk, but we synchronize to be sure
synchronized (firstChunkLock)
{
if (estimateInProgress)
{
if (actualCount == 0)
// Didn't actually get any bytes, so use 0.0
rateEstimate = 0.0;
else
rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
estimateValid = true;
estimateInProgress = false;
firstChunkLock.notifyAll();
}
}
}
/** Note the end of a fetch operation. Call this method just after the fetch completes.
*/
public boolean endFetch()
{
synchronized (this)
{
refCount--;
return (refCount == 0);
}
}
}
/** Throttled connections. Each instance of a connection describes the bins to which it belongs,
* along with the actual open connection itself, and the last time the connection was used. */
protected static class ThrottledConnection implements IThrottledConnection
{
/** The connection has resolved pointers to the ConnectionBin structures that manage pool
* maximums. These are ONLY valid when the connection is actually in the pool. */
protected ConnectionBin[] connectionBinArray;
/** The connection has resolved pointers to the ThrottleBin structures that help manage
* bandwidth throttling. */
protected ThrottleBin[] throttleBinArray;
/** These are the bandwidth limits, per bin */
protected double[] minMillisecondsPerByte;
/** Is the connection considered "active"? */
protected boolean isActive;
/** If not active, this is when it went inactive */
protected long inactiveTime = 0L;
/** Protocol */
protected String protocol;
/** Server */
protected String server;
/** Port */
protected int port;
/** Authentication */
protected PageCredentials authentication;
/** Trust store */
protected IKeystoreManager trustStore;
/** Trust store string */
protected String trustStoreString;
/** The http connection manager. The pool is of size 1. */
protected PoolingClientConnectionManager connManager = null;
/** The http client object. */
protected AbstractHttpClient httpClient = null;
/** The method object */
protected HttpRequestBase fetchMethod = null;
/** The error trace, if any */
protected Throwable throwable = null;
/** The current URL being fetched */
protected String myUrl = null;
/** The status code fetched, if any */
protected int statusCode = FETCH_NOT_TRIED;
/** The kind of fetch we are doing */
protected String fetchType = null;
/** The current bytes in the current fetch */
protected long fetchCounter = 0L;
/** The start of the current fetch */
protected long startFetchTime = -1L;
/** The cookies from the last fetch */
protected LoginCookies lastFetchCookies = null;
/** Proxy host */
protected final String proxyHost;
/** Proxy port */
protected final int proxyPort;
/** Proxy auth domain */
protected final String proxyAuthDomain;
/** Proxy auth user name */
protected final String proxyAuthUsername;
/** Proxy auth password */
protected final String proxyAuthPassword;
/** Https protocol */
protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory;
/** The thread that is actually doing the work */
protected ExecuteMethodThread methodThread = null;
/** Set if thread has been started */
protected boolean threadStarted = false;
/** Constructor. Create a connection with a specific server and port, and
* register it as active against all bins. */
public ThrottledConnection(String protocol, String server, int port, PageCredentials authentication,
javax.net.ssl.SSLSocketFactory httpsSocketFactory, String trustStoreString, ConnectionBin[] connectionBins,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
{
this.proxyHost = proxyHost;
this.proxyPort = proxyPort;
this.proxyAuthDomain = proxyAuthDomain;
this.proxyAuthUsername = proxyAuthUsername;
this.proxyAuthPassword = proxyAuthPassword;
this.protocol = protocol;
this.server = server;
this.port = port;
this.authentication = authentication;
this.httpsSocketFactory = httpsSocketFactory;
this.trustStoreString = trustStoreString;
this.connectionBinArray = connectionBins;
this.throttleBinArray = new ThrottleBin[connectionBins.length];
this.minMillisecondsPerByte = new double[connectionBins.length];
this.isActive = true;
int i = 0;
while (i < connectionBins.length)
{
connectionBins[i].noteConnectionCreation();
// We don't keep throttle bin references around, since these are transient
throttleBinArray[i] = null;
minMillisecondsPerByte[i] = 0.0;
i++;
}
}
public void mustHaveReference(ConnectionBin cb)
{
int i = 0;
while (i < connectionBinArray.length)
{
if (cb == connectionBinArray[i])
return;
i++;
}
String msg = "Connection bin "+cb.toString()+" owns connection "+this.toString()+" for "+protocol+server+":"+port+
" but there is no back reference!";
Logging.connectors.error(msg);
System.out.println(msg);
new Exception(msg).printStackTrace();
System.exit(3);
//throw new RuntimeException(msg);
}
/** See if this instances matches a given server and port. */
public boolean matches(ConnectionBin[] bins, String protocol, String server, int port, PageCredentials authentication,
String trustStoreString, String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
{
if (this.trustStoreString == null || trustStoreString == null)
{
if (this.trustStoreString != trustStoreString)
return false;
}
else
{
if (!this.trustStoreString.equals(trustStoreString))
return false;
}
if (this.authentication == null || authentication == null)
{
if (this.authentication != authentication)
return false;
}
else
{
if (!this.authentication.equals(authentication))
return false;
}
if (this.proxyHost == null || proxyHost == null)
{
if (this.proxyHost != proxyHost)
return false;
}
else
{
if (!this.proxyHost.equals(proxyHost))
return false;
if (this.proxyAuthDomain == null || proxyAuthDomain == null)
{
if (this.proxyAuthDomain != proxyAuthDomain)
return false;
}
else
{
if (!this.proxyAuthDomain.equals(proxyAuthDomain))
return false;
}
if (this.proxyAuthUsername == null || proxyAuthUsername == null)
{
if (this.proxyAuthUsername != proxyAuthUsername)
return false;
}
else
{
if (!this.proxyAuthUsername.equals(proxyAuthUsername))
return false;
}
if (this.proxyAuthPassword == null || proxyAuthPassword == null)
{
if (this.proxyAuthPassword != proxyAuthPassword)
return false;
}
else
{
if (!this.proxyAuthPassword.equals(proxyAuthPassword))
return false;
}
}
if (this.proxyPort != proxyPort)
return false;
if (this.connectionBinArray.length != bins.length || !this.protocol.equals(protocol) || !this.server.equals(server) || this.port != port)
return false;
int i = 0;
while (i < bins.length)
{
if (connectionBinArray[i] != bins[i])
return false;
i++;
}
return true;
}
/** Activate the connection. */
public void activate()
{
isActive = true;
int i = 0;
while (i < connectionBinArray.length)
{
connectionBinArray[i++].takeFromPool(this);
}
}
/** Set up the connection. This allows us to feed all bins the correct bandwidth limit info.
*/
public void setup(ThrottleDescription description)
{
// Go through all bins, and set up the current limits.
int i = 0;
while (i < connectionBinArray.length)
{
String binName = connectionBinArray[i].getBinName();
minMillisecondsPerByte[i] = description.getMinimumMillisecondsPerByte(binName);
i++;
}
}
/** Do periodic bookkeeping.
*@return true if the connection is no longer valid, and can be removed. */
public boolean flushIdleConnections(long idleTimeout)
{
if (isActive)
return false;
if (connManager != null)
{
connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS);
connManager.closeExpiredConnections();
// Need to determine if there's a valid connection in the connection manager still, or if it is empty.
//return connManager.getConnectionsInPool() == 0;
return true;
}
else
return true;
}
/** Log the fetch of a number of bytes, from within a stream. */
public void logFetchCount(int count)
{
fetchCounter += (long)count;
}
/** Begin a read operation, from within a stream */
public void beginRead(int len)
throws InterruptedException
{
// Consult with throttle bins
int i = 0;
while (i < throttleBinArray.length)
{
throttleBinArray[i].beginRead(len,minMillisecondsPerByte[i]);
i++;
}
}
/** End a read operation, from within a stream */
public void endRead(int origLen, int actualAmt)
{
// Consult with throttle bins
int i = 0;
while (i < throttleBinArray.length)
{
throttleBinArray[i].endRead(origLen,actualAmt);
i++;
}
}
/** Destroy the connection forever */
protected void destroy()
{
if (isActive == false)
throw new RuntimeException("Trying to destroy an inactive connection");
// Kill the actual connection object.
if (connManager != null)
{
connManager.shutdown();
connManager = null;
}
// Call all the bins this belongs to, and decrement the in-use count.
int i = 0;
while (i < connectionBinArray.length)
{
ConnectionBin cb = connectionBinArray[i++];
cb.noteConnectionDestruction();
}
}
/** Begin the fetch process.
* @param fetchType is a short descriptive string describing the kind of fetch being requested. This
* is used solely for logging purposes.
*/
@Override
public void beginFetch(String fetchType)
throws ManifoldCFException
{
try
{
this.fetchType = fetchType;
this.fetchCounter = 0L;
// Find or create the needed throttle bins
int i = 0;
while (i < throttleBinArray.length)
{
// Access the bins as we need them, and drop them when ref count goes to zero
String binName = connectionBinArray[i].getBinName();
ThrottleBin tb;
synchronized (throttleBins)
{
tb = (ThrottleBin)throttleBins.get(binName);
if (tb == null)
{
tb = new ThrottleBin(binName);
throttleBins.put(binName,tb);
}
tb.beginFetch();
}
throttleBinArray[i] = tb;
i++;
}
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
}
/** Execute the fetch and get the return code. This method uses the
* standard logging mechanism to keep track of the fetch attempt. It also
* signals the following conditions: ServiceInterruption (if a dynamic
* error occurs), or ManifoldCFException if a fatal error occurs, or nothing if
* a standard protocol error occurs.
* Note that, for proxies etc, the idea is for this fetch request to handle whatever
* redirections are needed to support proxies.
* @param urlPath is the path part of the url, e.g. "/robots.txt"
* @param userAgent is the value of the userAgent header to use.
* @param from is the value of the from header to use.
* @param connectionTimeoutMilliseconds is the maximum number of milliseconds to wait on socket connect.
* @param redirectOK should be set to true if you want redirects to be automatically followed.
* @param host is the value to use as the "Host" header, or null to use the default.
* @param formData describes additional form arguments and how to fetch the page.
* @param loginCookies describes the cookies that should be in effect for this page fetch.
*/
@Override
public void executeFetch(String urlPath, String userAgent, String from, int connectionTimeoutMilliseconds,
int socketTimeoutMilliseconds, boolean redirectOK, String host, FormData formData,
LoginCookies loginCookies)
throws ManifoldCFException, ServiceInterruption
{
// Set up scheme
SSLSocketFactory myFactory = new SSLSocketFactory(new InterruptibleSocketFactory(httpsSocketFactory,connectionTimeoutMilliseconds),
new AllowAllHostnameVerifier());
Scheme myHttpsProtocol = new Scheme("https", 443, myFactory);
int resolvedPort;
String displayedPort;
if (port != -1)
{
if (!(protocol.equals("http") && port == 80) &&
!(protocol.equals("https") && port == 443))
displayedPort = ":"+Integer.toString(port);
else
displayedPort = "";
resolvedPort = port;
}
else
{
if (protocol.equals("http"))
resolvedPort = 80;
else if (protocol.equals("https"))
resolvedPort = 443;
else
throw new IllegalArgumentException("Unexpected protocol: "+protocol);
displayedPort = "";
}
StringBuilder sb = new StringBuilder(protocol);
sb.append("://").append(server).append(displayedPort).append(urlPath);
String fetchUrl = sb.toString();
HttpHost fetchHost = new HttpHost(server,resolvedPort,protocol);
HttpHost hostHost;
if (host != null)
{
sb.setLength(0);
sb.append(protocol).append("://").append(host).append(displayedPort).append(urlPath);
myUrl = sb.toString();
hostHost = new HttpHost(host,resolvedPort,protocol);
}
else
{
myUrl = fetchUrl;
hostHost = fetchHost;
}
if (connManager == null)
{
PoolingClientConnectionManager localConnManager = new PoolingClientConnectionManager();
localConnManager.setMaxTotal(1);
localConnManager.setDefaultMaxPerRoute(1);
connManager = localConnManager;
}
// Set up protocol registry
connManager.getSchemeRegistry().register(myHttpsProtocol);
long startTime = 0L;
if (Logging.connectors.isDebugEnabled())
{
startTime = System.currentTimeMillis();
Logging.connectors.debug("WEB: Waiting for an HttpClient object");
}
// If we already have an httpclient object, great. Otherwise we have to get one, and initialize it with
// those parameters that aren't expected to change.
if (httpClient == null)
{
BasicHttpParams params = new BasicHttpParams();
params.setParameter(ClientPNames.DEFAULT_HOST,fetchHost);
params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true);
params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false);
params.setBooleanParameter(ClientPNames.ALLOW_CIRCULAR_REDIRECTS,true);
// MEDIUM_SECURITY compatibility level not supported in HttpComponents. Try BROWSER_NETSCAPE?
HttpClientParams.setCookiePolicy(params,CookiePolicy.BROWSER_COMPATIBILITY);
params.setBooleanParameter(CookieSpecPNames.SINGLE_COOKIE_HEADER,new Boolean(true));
DefaultHttpClient localHttpClient = new DefaultHttpClient(connManager,params);
localHttpClient.setRedirectStrategy(new DefaultRedirectStrategy());
localHttpClient.getCookieSpecs().register(CookiePolicy.BROWSER_COMPATIBILITY, new CookieSpecFactory()
{
public CookieSpec newInstance(HttpParams params)
{
return new LaxBrowserCompatSpec();
}
}
);
// If there's a proxy, set that too.
if (proxyHost != null && proxyHost.length() > 0)
{
// Configure proxy authentication
if (proxyAuthUsername != null && proxyAuthUsername.length() > 0)
{
localHttpClient.getCredentialsProvider().setCredentials(
new AuthScope(proxyHost, proxyPort),
new NTCredentials(proxyAuthUsername, (proxyAuthPassword==null)?"":proxyAuthPassword, currentHost, (proxyAuthDomain==null)?"":proxyAuthDomain));
}
HttpHost proxy = new HttpHost(proxyHost, proxyPort);
localHttpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
}
// Set up authentication to use
if (authentication != null)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: For "+myUrl+", discovered matching authentication credentials");
localHttpClient.getCredentialsProvider().setCredentials(AuthScope.ANY,
authentication.makeCredentialsObject(host));
}
httpClient = localHttpClient;
}
// Set the parameters we haven't keyed on (so these can change from request to request)
httpClient.getParams().setIntParameter(CoreConnectionPNames.SO_TIMEOUT,socketTimeoutMilliseconds);
httpClient.getParams().setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,connectionTimeoutMilliseconds);
httpClient.getParams().setBooleanParameter(ClientPNames.HANDLE_REDIRECTS,redirectOK);
if (host != null)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: For "+myUrl+", setting virtual host to "+host);
httpClient.getParams().setParameter(ClientPNames.VIRTUAL_HOST,hostHost);
}
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Got an HttpClient object after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
startFetchTime = System.currentTimeMillis();
int pageFetchMethod = FormData.SUBMITMETHOD_GET;
if (formData != null)
pageFetchMethod = formData.getSubmitMethod();
switch (pageFetchMethod)
{
case FormData.SUBMITMETHOD_GET:
// MUST be just the path, or apparently we wind up resetting the HostConfiguration
// Add additional parameters to url path
String fullUrlPath;
if (formData != null)
{
StringBuilder psb = new StringBuilder(urlPath);
Iterator iter = formData.getElementIterator();
char appendChar;
if (urlPath.indexOf("?") == -1)
appendChar = '?';
else
appendChar = '&';
try
{
while (iter.hasNext())
{
FormDataElement el = (FormDataElement)iter.next();
psb.append(appendChar);
appendChar = '&';
String param = el.getElementName();
String value = el.getElementValue();
psb.append(java.net.URLEncoder.encode(param,"utf-8"));
if (value != null)
{
psb.append('=').append(java.net.URLEncoder.encode(value,"utf-8"));
}
}
}
catch (java.io.UnsupportedEncodingException e)
{
throw new ManifoldCFException("Unsupported encoding: "+e.getMessage(),e);
}
fullUrlPath = psb.toString();
}
else
{
fullUrlPath = urlPath;
}
// Hack; apparently httpclient treats // as a protocol specifier and so it rips off the first section of the path in that case.
while (fullUrlPath.startsWith("//"))
fullUrlPath = fullUrlPath.substring(1);
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Get method for '"+fullUrlPath+"'");
fetchMethod = new HttpGet(fullUrlPath);
break;
case FormData.SUBMITMETHOD_POST:
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Post method for '"+urlPath+"'");
// MUST be just the path, or apparently we wind up resetting the HostConfiguration
HttpPost postMethod = new HttpPost(urlPath);
List<NameValuePair> nvps = new ArrayList<NameValuePair>();
// Add parameters to post variables
if (formData != null)
{
Iterator iter = formData.getElementIterator();
while (iter.hasNext())
{
FormDataElement e = (FormDataElement)iter.next();
String param = e.getElementName();
String value = e.getElementValue();
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Post parameter name '"+param+"' value '"+value+"' for '"+urlPath+"'");
nvps.add(new BasicNameValuePair(param,value));
}
}
try
{
postMethod.setEntity(new UrlEncodedFormEntity(nvps,HTTP.UTF_8));
}
catch (java.io.UnsupportedEncodingException e)
{
throw new ManifoldCFException("Unsupported UTF-8 encoding: "+e.getMessage(),e);
}
fetchMethod = postMethod;
break;
default:
throw new ManifoldCFException("Illegal method type: "+Integer.toString(pageFetchMethod));
}
// Set all appropriate headers and parameters
fetchMethod.setHeader(new BasicHeader("User-Agent",userAgent));
fetchMethod.setHeader(new BasicHeader("From",from));
// Clear all current cookies
httpClient.getCookieStore().clear();
// If we have any cookies to set, set them.
if (loginCookies != null)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Adding "+Integer.toString(loginCookies.getCookieCount())+" cookies for '"+urlPath+"'");
int h = 0;
while (h < loginCookies.getCookieCount())
{
httpClient.getCookieStore().addCookie(loginCookies.getCookie(h++));
}
}
// Copy out the current cookies, in case the fetch fails
lastFetchCookies = loginCookies;
// Create the thread
methodThread = new ExecuteMethodThread(this, httpClient, fetchMethod);
try
{
methodThread.start();
threadStarted = true;
try
{
statusCode = methodThread.getResponseCode();
lastFetchCookies = methodThread.getCookies();
switch (statusCode)
{
case HttpStatus.SC_REQUEST_TIMEOUT:
case HttpStatus.SC_GATEWAY_TIMEOUT:
case HttpStatus.SC_SERVICE_UNAVAILABLE:
// Temporary service interruption
// May want to make the retry time a parameter someday
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Http response temporary error on '"+myUrl+"': "+Integer.toString(statusCode),new ManifoldCFException("Service unavailable (code "+Integer.toString(statusCode)+")"),
currentTime + TIME_2HRS, currentTime + TIME_1DAY, -1, false);
case HttpStatus.SC_UNAUTHORIZED:
case HttpStatus.SC_USE_PROXY:
case HttpStatus.SC_OK:
case HttpStatus.SC_GONE:
case HttpStatus.SC_NOT_FOUND:
case HttpStatus.SC_BAD_GATEWAY:
case HttpStatus.SC_BAD_REQUEST:
case HttpStatus.SC_FORBIDDEN:
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
default:
return;
}
}
catch (InterruptedException e)
{
methodThread.interrupt();
methodThread = null;
threadStarted = false;
throw e;
}
}
catch (InterruptedException e)
{
// Drop the current connection on the floor, so it cannot be reused.
fetchMethod = null;
throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e);
statusCode = FETCH_INTERRUPTED;
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (java.net.SocketTimeoutException e)
{
throwable = e;
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_5MIN,
currentTime + TIME_2HRS,-1,false);
}
catch (ConnectTimeoutException e)
{
throwable = e;
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out waiting for connection for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_5MIN,
currentTime + TIME_2HRS,-1,false);
}
catch (InterruptedIOException e)
{
//Logging.connectors.warn("IO interruption seen",e);
throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e);
statusCode = FETCH_INTERRUPTED;
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
catch (RedirectException e)
{
throwable = e;
statusCode = FETCH_CIRCULAR_REDIRECT;
return;
}
catch (NoHttpResponseException e)
{
throwable = e;
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_15MIN,
currentTime + TIME_2HRS,-1,false);
}
catch (java.net.ConnectException e)
{
throwable = e;
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_2HRS,
currentTime + TIME_6HRS,-1,false);
}
catch (javax.net.ssl.SSLException e)
{
// Probably this is an incorrectly configured trust store
throwable = new ManifoldCFException("SSL handshake error: "+e.getMessage()+"; check your connection's Certificate configuration",e);
statusCode = FETCH_IO_ERROR;
return;
}
catch (IOException e)
{
// Treat this as a bad url. We don't know what happened, but it isn't something we are going to naively
// retry on.
throwable = e;
statusCode = FETCH_IO_ERROR;
return;
}
catch (Throwable e)
{
Logging.connectors.debug("WEB: Caught an unexpected exception: "+e.getMessage(),e);
throwable = e;
statusCode = FETCH_UNKNOWN_ERROR;
return;
}
}
/** Get the http response code.
*@return the response code. This is either an HTTP response code, or one of the codes above.
*/
@Override
public int getResponseCode()
throws ManifoldCFException, ServiceInterruption
{
return statusCode;
}
/** Get the last fetch cookies.
*@return the cookies now in effect from the last fetch.
*/
@Override
public LoginCookies getLastFetchCookies()
throws ManifoldCFException, ServiceInterruption
{
return lastFetchCookies;
}
/** Get response headers
*@return a map keyed by header name containing a list of values.
*/
@Override
public Map<String,List<String>> getResponseHeaders()
throws ManifoldCFException, ServiceInterruption
{
if (fetchMethod == null)
throw new ManifoldCFException("Attempt to get headers when there is no method");
if (methodThread == null || threadStarted == false)
throw new ManifoldCFException("Attempt to get headers when no method thread");
try
{
return methodThread.getResponseHeaders();
}
catch (InterruptedException e)
{
methodThread.interrupt();
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (HttpException e)
{
handleHTTPException(e,"reading headers");
}
catch (IOException e)
{
handleIOException(e,"reading headers");
}
return null;
}
/** Get a specified response header, if it exists.
*@param headerName is the name of the header.
*@return the header value, or null if it doesn't exist.
*/
@Override
public String getResponseHeader(String headerName)
throws ManifoldCFException, ServiceInterruption
{
if (fetchMethod == null)
throw new ManifoldCFException("Attempt to get a header when there is no method");
if (methodThread == null || threadStarted == false)
throw new ManifoldCFException("Attempt to get a header when no method thread");
try
{
return methodThread.getFirstHeader(headerName);
}
catch (InterruptedException e)
{
methodThread.interrupt();
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (HttpException e)
{
handleHTTPException(e,"reading header");
}
catch (IOException e)
{
handleIOException(e,"reading header");
}
return null;
}
/** Get the response input stream. It is the responsibility of the caller
* to close this stream when done.
*/
@Override
public InputStream getResponseBodyStream()
throws ManifoldCFException, ServiceInterruption
{
if (fetchMethod == null)
throw new ManifoldCFException("Attempt to get an input stream when there is no method");
if (methodThread == null || threadStarted == false)
throw new ManifoldCFException("Attempt to get an input stream when no method thread");
try
{
return methodThread.getSafeInputStream();
}
catch (InterruptedException e)
{
methodThread.interrupt();
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
catch (IOException e)
{
handleIOException(e, "reading response stream");
}
catch (HttpException e)
{
handleHTTPException(e, "reading response stream");
}
return null;
}
/** Get limited response as a string.
*/
@Override
public String getLimitedResponseBody(int maxSize, String encoding)
throws ManifoldCFException, ServiceInterruption
{
try
{
InputStream is = getResponseBodyStream();
try
{
Reader r = new InputStreamReader(is,encoding);
char[] buffer = new char[maxSize];
int amt = r.read(buffer);
if (amt == -1)
return "";
return new String(buffer,0,amt);
}
finally
{
is.close();
}
}
catch (IOException e)
{
handleIOException(e,"reading limited response");
}
return null;
}
/** Note that the connection fetch was interrupted by something.
*/
@Override
public void noteInterrupted(Throwable e)
{
if (statusCode > 0)
{
throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e);
statusCode = FETCH_INTERRUPTED;
}
}
/** Done with the fetch. Call this when the fetch has been completed. A log entry will be generated
* describing what was done.
*/
@Override
public void doneFetch(IVersionActivity activities)
throws ManifoldCFException
{
if (fetchType != null)
{
// Abort the connection, if not already complete
methodThread.abort();
long endTime = System.currentTimeMillis();
int i = 0;
while (i < throttleBinArray.length)
{
synchronized (throttleBins)
{
if (throttleBinArray[i].endFetch())
throttleBins.remove(throttleBinArray[i].getBinName());
}
throttleBinArray[i] = null;
i++;
}
activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH,
new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
Logging.connectors.info("WEB: FETCH "+fetchType+"|"+myUrl+"|"+new Long(startFetchTime).toString()+"+"+new Long(endTime-startFetchTime).toString()+"|"+
Integer.toString(statusCode)+"|"+new Long(fetchCounter).toString()+"|"+((throwable==null)?"":(throwable.getClass().getName()+"| "+throwable.getMessage())));
if (throwable != null)
{
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("WEB: Fetch exception for '"+myUrl+"'",throwable);
}
// Shut down (join) the connection thread, if any, and if it started
if (methodThread != null)
{
if (threadStarted)
{
try
{
methodThread.finishUp();
}
catch (InterruptedException e)
{
throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
threadStarted = false;
}
methodThread = null;
}
fetchMethod = null;
throwable = null;
startFetchTime = -1L;
myUrl = null;
statusCode = FETCH_NOT_TRIED;
lastFetchCookies = null;
fetchType = null;
}
}
/** Close the connection. Call this to end this server connection.
*/
@Override
public void close()
throws ManifoldCFException
{
synchronized (poolLock)
{
// Verify that all the connections that exist are in fact sane
synchronized (connectionBins)
{
Iterator iter = connectionBins.keySet().iterator();
while (iter.hasNext())
{
String connectionName = (String)iter.next();
ConnectionBin cb = (ConnectionBin)connectionBins.get(connectionName);
//cb.sanityCheck();
}
}
// Leave the connection alive, but mark it as inactive, and return it to the appropriate pools.
isActive = false;
inactiveTime = System.currentTimeMillis();
int i = 0;
while (i < connectionBinArray.length)
{
connectionBinArray[i++].addToPool(this);
}
// Verify that all the connections that exist are in fact sane
synchronized (connectionBins)
{
Iterator iter = connectionBins.keySet().iterator();
while (iter.hasNext())
{
String connectionName = (String)iter.next();
ConnectionBin cb = (ConnectionBin)connectionBins.get(connectionName);
//cb.sanityCheck();
}
}
// Wake up everything waiting on the pool lock
poolLock.notifyAll();
}
}
protected void handleHTTPException(HttpException e, String activity)
throws ServiceInterruption, ManifoldCFException
{
long currentTime = System.currentTimeMillis();
Logging.connectors.debug("Web: HTTP exception "+activity+" for '"+myUrl+"', retrying");
throw new ServiceInterruption("HTTP exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false);
}
protected void handleIOException(IOException e, String activity)
throws ServiceInterruption, ManifoldCFException
{
if (e instanceof java.net.SocketTimeoutException)
{
long currentTime = System.currentTimeMillis();
Logging.connectors.debug("Web: Socket timeout exception "+activity+" for '"+myUrl+"', retrying");
throw new ServiceInterruption("Socket timeout exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false);
}
if (e instanceof ConnectTimeoutException)
{
long currentTime = System.currentTimeMillis();
Logging.connectors.debug("Web: Connect timeout exception "+activity+" for '"+myUrl+"', retrying");
throw new ServiceInterruption("Connect timeout exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false);
}
if (e instanceof InterruptedIOException)
{
methodThread.interrupt();
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
if (e instanceof NoHttpResponseException)
{
// Give up after 2 hours.
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out "+activity+" for '"+myUrl+"'", e, currentTime + 15L * 60000L,
currentTime + 120L * 60000L,-1,false);
}
if (e instanceof java.net.ConnectException)
{
// Give up after 6 hours.
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Timed out "+activity+" for '"+myUrl+"'", e, currentTime + 1000000L,
currentTime + 720L * 60000L,-1,false);
}
if (e instanceof java.net.NoRouteToHostException)
{
// This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient
// with the network. Some degree of retry is probably wise.
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("No route to host during "+activity+" for '"+myUrl+"'", e, currentTime + 1000000L,
currentTime + 720L * 60000L,-1,false);
}
long currentTime = System.currentTimeMillis();
Logging.connectors.debug("Web: IO exception "+activity+" for '"+myUrl+"', retrying");
throw new ServiceInterruption("IO exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false);
}
}
/** This class throttles an input stream based on the specified byte rate parameters. The
* throttling takes place across all streams that are open to the server in question.
*/
protected static class ThrottledInputstream extends InputStream
{
/** Stream throttling parameters */
protected double minimumMillisecondsPerBytePerServer;
/** The throttled connection we belong to */
protected ThrottledConnection throttledConnection;
/** The stream we are wrapping. */
protected InputStream inputStream;
/** Constructor.
*/
public ThrottledInputstream(ThrottledConnection connection, InputStream is)
{
this.throttledConnection = connection;
this.inputStream = is;
}
/** Read a byte.
*/
@Override
public int read()
throws IOException
{
byte[] byteArray = new byte[1];
int count = read(byteArray,0,1);
if (count == -1)
return count;
return (int)byteArray[0];
}
/** Read lots of bytes.
*/
@Override
public int read(byte[] b)
throws IOException
{
return read(b,0,b.length);
}
/** Read lots of specific bytes.
*/
@Override
public int read(byte[] b, int off, int len)
throws IOException
{
int totalCount = 0;
while (len > ThrottledFetcher.READ_CHUNK_LENGTH)
{
int amt = basicRead(b,off,ThrottledFetcher.READ_CHUNK_LENGTH,totalCount);
if (amt == -1)
{
if (totalCount == 0)
return amt;
return totalCount;
}
totalCount += amt;
off += amt;
len -= amt;
}
if (len > 0)
{
int amt = basicRead(b,off,len,totalCount);
if (amt == -1)
{
if (totalCount == 0)
return amt;
return totalCount;
}
return totalCount + amt;
}
return totalCount;
}
/** Basic read, which uses the server object to throttle activity.
*/
protected int basicRead(byte[] b, int off, int len, int totalSoFar)
throws IOException
{
try
{
throttledConnection.beginRead(len);
int amt = 0;
try
{
amt = inputStream.read(b,off,len);
return amt;
}
finally
{
if (amt == -1)
throttledConnection.endRead(len,0);
else
{
throttledConnection.endRead(len,amt);
throttledConnection.logFetchCount(amt);
}
}
}
catch (InterruptedException e)
{
InterruptedIOException e2 = new InterruptedIOException("Interrupted");
e2.bytesTransferred = totalSoFar;
throw e2;
}
}
/** Skip
*/
@Override
public long skip(long n)
throws IOException
{
// Not sure whether we should bother doing anything with this; it's not used.
return inputStream.skip(n);
}
/** Get available.
*/
@Override
public int available()
throws IOException
{
return inputStream.available();
}
/** Mark.
*/
@Override
public void mark(int readLimit)
{
inputStream.mark(readLimit);
}
/** Reset.
*/
@Override
public void reset()
throws IOException
{
inputStream.reset();
}
/** Check if mark is supported.
*/
@Override
public boolean markSupported()
{
return inputStream.markSupported();
}
/** Close.
*/
@Override
public void close()
throws IOException
{
try
{
inputStream.close();
}
catch (java.net.SocketTimeoutException e)
{
Logging.connectors.debug("Socket timeout exception trying to close connection: "+e.getMessage(),e);
}
catch (ConnectTimeoutException e)
{
Logging.connectors.debug("Socket connection timeout exception trying to close connection: "+e.getMessage(),e);
}
catch (InterruptedIOException e)
{
throw e;
}
catch (java.net.SocketException e)
{
Logging.connectors.debug("Connection reset while I was closing it: "+e.getMessage(),e);
}
catch (IOException e)
{
Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e);
}
}
}
/** Pool exception class */
protected static class PoolException extends Exception
{
public PoolException(String message)
{
super(message);
}
}
/** Wait exception class */
protected static class WaitException extends Exception
{
protected long amt;
public WaitException(long amt)
{
super("Wait needed");
this.amt = amt;
}
public long getWaitAmount()
{
return amt;
}
}
/** SSL Socket factory which wraps another socket factory but allows timeout on socket
* creation.
*/
protected static class InterruptibleSocketFactory extends javax.net.ssl.SSLSocketFactory
{
protected final javax.net.ssl.SSLSocketFactory wrappedFactory;
protected final long connectTimeoutMilliseconds;
public InterruptibleSocketFactory(javax.net.ssl.SSLSocketFactory wrappedFactory, long connectTimeoutMilliseconds)
{
this.wrappedFactory = wrappedFactory;
this.connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
@Override
public Socket createSocket()
throws IOException
{
// Socket isn't open
return wrappedFactory.createSocket();
}
@Override
public Socket createSocket(String host, int port)
throws IOException, UnknownHostException
{
return fireOffThread(InetAddress.getByName(host),port,null,-1);
}
@Override
public Socket createSocket(InetAddress host, int port)
throws IOException
{
return fireOffThread(host,port,null,-1);
}
@Override
public Socket createSocket(String host, int port, InetAddress localHost, int localPort)
throws IOException, UnknownHostException
{
return fireOffThread(InetAddress.getByName(host),port,localHost,localPort);
}
@Override
public Socket createSocket(InetAddress address, int port, InetAddress localAddress, int localPort)
throws IOException
{
return fireOffThread(address,port,localAddress,localPort);
}
@Override
public Socket createSocket(Socket s, String host, int port, boolean autoClose)
throws IOException
{
// Socket's already open
return wrappedFactory.createSocket(s,host,port,autoClose);
}
@Override
public String[] getDefaultCipherSuites()
{
return wrappedFactory.getDefaultCipherSuites();
}
@Override
public String[] getSupportedCipherSuites()
{
return wrappedFactory.getSupportedCipherSuites();
}
protected Socket fireOffThread(InetAddress address, int port, InetAddress localHost, int localPort)
throws IOException
{
SocketCreateThread thread = new SocketCreateThread(wrappedFactory,address,port,localHost,localPort);
thread.start();
try
{
// Wait for thread to complete for only a certain amount of time!
thread.join(connectTimeoutMilliseconds);
// If join() times out, then the thread is going to still be alive.
if (thread.isAlive())
{
// Kill the thread - not that this will necessarily work, but we need to try
thread.interrupt();
throw new ConnectTimeoutException("Secure connection timed out");
}
// The thread terminated. Throw an error if there is one, otherwise return the result.
Throwable t = thread.getException();
if (t != null)
{
if (t instanceof java.net.SocketTimeoutException)
throw (java.net.SocketTimeoutException)t;
else if (t instanceof ConnectTimeoutException)
throw (ConnectTimeoutException)t;
else if (t instanceof InterruptedIOException)
throw (InterruptedIOException)t;
else if (t instanceof IOException)
throw (IOException)t;
else if (t instanceof Error)
throw (Error)t;
else if (t instanceof RuntimeException)
throw (RuntimeException)t;
throw new Error("Received an unexpected exception: "+t.getMessage(),t);
}
return thread.getResult();
}
catch (InterruptedException e)
{
throw new InterruptedIOException("Interrupted: "+e.getMessage());
}
}
}
/** Create a secure socket in a thread, so that we can "give up" after a while if the socket fails to connect.
*/
protected static class SocketCreateThread extends Thread
{
// Socket factory
protected javax.net.ssl.SSLSocketFactory socketFactory;
protected InetAddress host;
protected int port;
protected InetAddress clientHost;
protected int clientPort;
// The return socket
protected Socket rval = null;
// The return error
protected Throwable throwable = null;
/** Create the thread */
public SocketCreateThread(javax.net.ssl.SSLSocketFactory socketFactory,
InetAddress host,
int port,
InetAddress clientHost,
int clientPort)
{
this.socketFactory = socketFactory;
this.host = host;
this.port = port;
this.clientHost = clientHost;
this.clientPort = clientPort;
setDaemon(true);
}
public void run()
{
try
{
if (clientHost == null)
rval = socketFactory.createSocket(host,port);
else
rval = socketFactory.createSocket(host,port,clientHost,clientPort);
}
catch (Throwable e)
{
throwable = e;
}
}
public Throwable getException()
{
return throwable;
}
public Socket getResult()
{
return rval;
}
}
/** Class to override browser compatibility to make it not check cookie paths. See CONNECTORS-97.
*/
protected static class LaxBrowserCompatSpec extends BrowserCompatSpec
{
public LaxBrowserCompatSpec()
{
super();
registerAttribHandler(ClientCookie.PATH_ATTR, new BasicPathHandler()
{
@Override
public void validate(Cookie cookie, CookieOrigin origin) throws MalformedCookieException
{
// No validation
}
}
);
}
}
/** This thread does the actual socket communication with the server.
* It's set up so that it can be abandoned at shutdown time.
*
* The way it works is as follows:
* - it starts the transaction
* - it receives the response, and saves that for the calling class to inspect
* - it transfers the data part to an input stream provided to the calling class
* - it shuts the connection down
*
* If there is an error, the sequence is aborted, and an exception is recorded
* for the calling class to examine.
*
* The calling class basically accepts the sequence above. It starts the
* thread, and tries to get a response code. If instead an exception is seen,
* the exception is thrown up the stack.
*/
protected static class ExecuteMethodThread extends Thread
{
/** The connection */
protected final ThrottledConnection theConnection;
/** Client and method, all preconfigured */
protected final AbstractHttpClient httpClient;
protected final HttpRequestBase executeMethod;
protected HttpResponse response = null;
protected Throwable responseException = null;
protected LoginCookies cookies = null;
protected Throwable cookieException = null;
protected XThreadInputStream threadStream = null;
protected boolean streamCreated = false;
protected Throwable streamException = null;
protected boolean abortThread = false;
protected Throwable shutdownException = null;
protected Throwable generalException = null;
public ExecuteMethodThread(ThrottledConnection theConnection,
AbstractHttpClient httpClient, HttpRequestBase executeMethod)
{
super();
setDaemon(true);
this.theConnection = theConnection;
this.httpClient = httpClient;
this.executeMethod = executeMethod;
}
public void run()
{
try
{
try
{
// Call the execute method appropriately
synchronized (this)
{
if (!abortThread)
{
try
{
response = httpClient.execute(executeMethod);
}
catch (java.net.SocketTimeoutException e)
{
responseException = e;
}
catch (ConnectTimeoutException e)
{
responseException = e;
}
catch (InterruptedIOException e)
{
throw e;
}
catch (Throwable e)
{
responseException = e;
}
this.notifyAll();
}
}
// Fetch the cookies
if (responseException == null)
{
synchronized (this)
{
if (!abortThread)
{
try
{
cookies = new CookieSet(httpClient.getCookieStore().getCookies());
}
catch (Throwable e)
{
cookieException = e;
}
this.notifyAll();
}
}
}
// Start the transfer of the content
if (cookieException == null && responseException == null)
{
synchronized (this)
{
if (!abortThread)
{
try
{
InputStream bodyStream = response.getEntity().getContent();
if (bodyStream != null)
{
bodyStream = new ThrottledInputstream(theConnection,bodyStream);
threadStream = new XThreadInputStream(bodyStream);
}
streamCreated = true;
}
catch (java.net.SocketTimeoutException e)
{
streamException = e;
}
catch (ConnectTimeoutException e)
{
streamException = e;
}
catch (InterruptedIOException e)
{
throw e;
}
catch (Throwable e)
{
streamException = e;
}
this.notifyAll();
}
}
}
if (cookieException == null && responseException == null && streamException == null)
{
if (threadStream != null)
{
// Stuff the content until we are done
threadStream.stuffQueue();
}
}
}
finally
{
synchronized (this)
{
try
{
executeMethod.abort();
}
catch (Throwable e)
{
shutdownException = e;
}
this.notifyAll();
}
}
}
catch (Throwable e)
{
// We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted.
this.generalException = e;
}
}
public int getResponseCode()
throws InterruptedException, IOException, HttpException
{
// Must wait until the response object is there
while (true)
{
synchronized (this)
{
checkException(responseException);
if (response != null)
return response.getStatusLine().getStatusCode();
wait();
}
}
}
public Map<String,List<String>> getResponseHeaders()
throws InterruptedException, IOException, HttpException
{
// Must wait for the response object to appear
while (true)
{
synchronized (this)
{
checkException(responseException);
if (response != null)
{
Header[] headers = response.getAllHeaders();
Map<String,List<String>> rval = new HashMap<String,List<String>>();
int i = 0;
while (i < headers.length)
{
Header h = headers[i++];
String name = h.getName();
String value = h.getValue();
List<String> values = rval.get(name);
if (values == null)
{
values = new ArrayList<String>();
rval.put(name,values);
}
values.add(value);
}
return rval;
}
wait();
}
}
}
public String getFirstHeader(String headerName)
throws InterruptedException, IOException, HttpException
{
// Must wait for the response object to appear
while (true)
{
synchronized (this)
{
checkException(responseException);
if (response != null)
{
Header h = response.getFirstHeader(headerName);
if (h == null)
return null;
return h.getValue();
}
wait();
}
}
}
public LoginCookies getCookies()
throws InterruptedException, IOException, HttpException
{
while (true)
{
synchronized (this)
{
if (responseException != null)
throw new IllegalStateException("Check for response before getting cookies");
checkException(cookieException);
if (cookies != null)
return cookies;
wait();
}
}
}
public InputStream getSafeInputStream()
throws InterruptedException, IOException, HttpException
{
// Must wait until stream is created, or until we note an exception was thrown.
while (true)
{
synchronized (this)
{
if (responseException != null)
throw new IllegalStateException("Check for response before getting stream");
if (cookieException != null)
throw new IllegalStateException("Check for cookies before getting stream");
checkException(streamException);
if (streamCreated)
return threadStream;
wait();
}
}
}
public void abort()
{
// This will be called during the finally
// block in the case where all is well (and
// the stream completed) and in the case where
// there were exceptions.
synchronized (this)
{
if (streamCreated)
{
if (threadStream != null)
threadStream.abort();
}
abortThread = true;
}
}
public void finishUp()
throws InterruptedException
{
join();
}
protected synchronized void checkException(Throwable exception)
throws IOException, HttpException
{
if (exception != null)
{
// Throw the current exception, but clear it, so no further throwing is possible on the same problem.
Throwable e = exception;
if (e instanceof IOException)
throw (IOException)e;
else if (e instanceof HttpException)
throw (HttpException)e;
else if (e instanceof RuntimeException)
throw (RuntimeException)e;
else if (e instanceof Error)
throw (Error)e;
else
throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
}
}
}
}