| /* $Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $ */ |
| |
| /**` |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.manifoldcf.crawler.connectors.webcrawler; |
| |
| import org.apache.manifoldcf.core.interfaces.*; |
| import org.apache.manifoldcf.core.common.DeflateInputStream; |
| import org.apache.manifoldcf.core.common.XThreadInputStream; |
| import org.apache.manifoldcf.core.common.InterruptibleSocketFactory; |
| import org.apache.manifoldcf.agents.interfaces.*; |
| import org.apache.manifoldcf.crawler.interfaces.*; |
| import org.apache.manifoldcf.crawler.system.Logging; |
| import org.apache.manifoldcf.crawler.system.ManifoldCF; |
| import java.util.*; |
| import java.io.*; |
| import java.net.*; |
| import java.util.zip.GZIPInputStream; |
| import java.util.concurrent.TimeUnit; |
| import java.nio.charset.Charset; |
| |
| import org.apache.http.conn.HttpClientConnectionManager; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; |
| import org.apache.http.impl.client.HttpClientBuilder; |
| import org.apache.http.protocol.HttpRequestExecutor; |
| import org.apache.http.impl.client.HttpClients; |
| import org.apache.http.client.config.RequestConfig; |
| import org.apache.http.client.CredentialsProvider; |
| import org.apache.http.impl.client.BasicCredentialsProvider; |
| import org.apache.http.config.SocketConfig; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.NameValuePair; |
| import org.apache.http.conn.ssl.SSLConnectionSocketFactory; |
| import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpResponse; |
| import org.apache.http.auth.AuthScope; |
| import org.apache.http.auth.NTCredentials; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.methods.HttpRequestBase; |
| import org.apache.http.impl.client.DefaultRedirectStrategy; |
| import org.apache.http.util.EntityUtils; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.HttpHost; |
| import org.apache.http.Header; |
| import org.apache.http.HeaderElement; |
| import org.apache.http.message.BasicHeader; |
| import org.apache.http.impl.cookie.BasicClientCookie; |
| import org.apache.http.message.BasicNameValuePair; |
| import org.apache.http.protocol.HTTP; |
| import org.apache.http.client.entity.UrlEncodedFormEntity; |
| import org.apache.http.cookie.CookieOrigin; |
| import org.apache.http.cookie.ClientCookie; |
| import org.apache.http.cookie.Cookie; |
| import org.apache.http.impl.cookie.BasicPathHandler; |
| import org.apache.http.impl.cookie.BrowserCompatSpec; |
| import org.apache.http.cookie.CookieSpec; |
| import org.apache.http.client.CookieStore; |
| import org.apache.http.protocol.HttpContext; |
| import org.apache.http.protocol.BasicHttpContext; |
| import org.apache.http.client.protocol.HttpClientContext; |
| import org.apache.http.cookie.CookieIdentityComparator; |
| import org.apache.http.client.HttpRequestRetryHandler; |
| import org.apache.http.cookie.CookieSpecProvider; |
| import org.apache.http.config.RegistryBuilder; |
| import org.apache.http.config.Registry; |
| import org.apache.http.client.config.CookieSpecs; |
| import org.apache.http.impl.cookie.BestMatchSpecFactory; |
| import org.apache.http.impl.cookie.BrowserCompatSpecFactory; |
| import org.apache.http.impl.cookie.RFC2965SpecFactory; |
| import org.apache.http.impl.cookie.NetscapeDraftSpecFactory; |
| import org.apache.http.impl.cookie.IgnoreSpecFactory; |
| |
| import org.apache.http.cookie.MalformedCookieException; |
| import org.apache.http.conn.ConnectTimeoutException; |
| import org.apache.http.client.RedirectException; |
| import org.apache.http.client.CircularRedirectException; |
| import org.apache.http.NoHttpResponseException; |
| import org.apache.http.HttpException; |
| |
| /** This class uses httpclient to fetch stuff from webservers. However, it additionally controls the fetch |
| * rate in two ways: first, controlling the overall bandwidth used per server, and second, limiting the number |
| * of simultaneous open connections per server. |
| * An instance of this class would very probably need to have a lifetime consistent with the long-term nature |
| * of these values, and be static. |
| */ |
| public class ThrottledFetcher |
| { |
| public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $"; |
| |
| /** Web throttle group type */ |
| protected static final String webThrottleGroupType = "_WEB_"; |
| |
| /** Idle timeout */ |
| protected static final long idleTimeout = 300000L; |
| |
| /** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */ |
| protected static final boolean recordEverything = false; |
| |
| protected static final long TIME_2HRS = 7200000L; |
| protected static final long TIME_5MIN = 300000L; |
| protected static final long TIME_15MIN = 1500000L; |
| protected static final long TIME_6HRS = 6L * 60L * 60000L; |
| protected static final long TIME_1DAY = 24L * 60L * 60000L; |
| |
| /** The read chunk length */ |
| protected static final int READ_CHUNK_LENGTH = 4096; |
| |
| /** Connection pools. |
| /* This is a static hash of the connection pools in existence. Each connection pool represents a set of identical connections. */ |
| protected final static Map<ConnectionPoolKey,ConnectionPool> connectionPools = new HashMap<ConnectionPoolKey,ConnectionPool>(); |
| |
| /** Current host name */ |
| private static String currentHost = null; |
| static |
| { |
| // Find the current host name |
| try |
| { |
| java.net.InetAddress addr = java.net.InetAddress.getLocalHost(); |
| |
| // Get hostname |
| currentHost = addr.getHostName(); |
| } |
| catch (java.net.UnknownHostException e) |
| { |
| } |
| } |
| |
| protected static final Charset UTF_8 = Charset.forName("UTF-8"); |
| |
| private static final Registry<CookieSpecProvider> cookieSpecRegistry = |
| RegistryBuilder.<CookieSpecProvider>create() |
| .register(CookieSpecs.BEST_MATCH, new BestMatchSpecFactory()) |
| .register(CookieSpecs.STANDARD, new RFC2965SpecFactory()) |
| .register(CookieSpecs.BROWSER_COMPATIBILITY, new LaxBrowserCompatSpecFactory()) |
| .register(CookieSpecs.NETSCAPE, new NetscapeDraftSpecFactory()) |
| .register(CookieSpecs.IGNORE_COOKIES, new IgnoreSpecFactory()) |
| .build(); |
| |
| /** Constructor. Private since we never instantiate. |
| */ |
| private ThrottledFetcher() |
| { |
| } |
| |
| |
| /** Obtain a connection to specified protocol, server, and port. We use the protocol because the |
| * setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if |
| * we distinguish connections based on that. |
| *@param protocol is the protocol, e.g. "http" |
| *@param server is the server IP address, e.g. "10.32.65.1" |
| *@param port is the port to connect to, e.g. 80. Pass -1 if the default port for the protocol is desired. |
| *@param authentication is the page credentials object to use for the fetch. If null, no credentials are available. |
| *@param trustStore is the current trust store in effect for the fetch. |
| *@param binNames is the set of bins, in order, that should be used for throttling this connection. |
| * Note that the bin names for a given IP address and port MUST be the same for every connection! |
| * This must be enforced by whatever it is that builds the bins - it must do so given an IP and port. |
| *@param throttleDescription is the description of all the throttling that should take place. |
| *@param connectionLimit isthe maximum number of connections permitted. |
| *@return an IThrottledConnection object that can be used to fetch from the port. |
| */ |
| public static IThrottledConnection getConnection(IThreadContext threadContext, String throttleGroupName, |
| String protocol, String server, int port, |
| PageCredentials authentication, |
| IKeystoreManager trustStore, |
| IThrottleSpec throttleDescription, String[] binNames, |
| int connectionLimit, |
| String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword) |
| throws ManifoldCFException |
| { |
| // Get a throttle groups handle |
| IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext); |
| |
| // Create the appropruate throttle group, or update the throttle description for an existing one |
| throttleGroups.createOrUpdateThrottleGroup(webThrottleGroupType,throttleGroupName,throttleDescription); |
| |
| // Create the https scheme and trust store string for this connection |
| javax.net.ssl.SSLSocketFactory baseFactory; |
| String trustStoreString; |
| if (trustStore != null) |
| { |
| baseFactory = trustStore.getSecureSocketFactory(); |
| trustStoreString = trustStore.getHashString(); |
| } |
| else |
| { |
| baseFactory = KeystoreManagerFactory.getTrustingSecureSocketFactory(); |
| trustStoreString = null; |
| } |
| |
| // Construct a connection pool key |
| ConnectionPoolKey poolKey = new ConnectionPoolKey(protocol,server,port,authentication, |
| trustStoreString,proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword); |
| |
| ConnectionPool p; |
| synchronized (connectionPools) |
| { |
| p = connectionPools.get(poolKey); |
| if (p == null) |
| { |
| // Construct a new IConnectionThrottler. |
| IConnectionThrottler connectionThrottler = |
| throttleGroups.obtainConnectionThrottler(webThrottleGroupType,throttleGroupName,binNames); |
| p = new ConnectionPool(connectionThrottler,protocol,server,port,authentication,baseFactory, |
| proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword); |
| connectionPools.put(poolKey,p); |
| } |
| } |
| |
| try |
| { |
| return p.grab(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED); |
| } |
| } |
| |
| /** Flush connections that have timed out from inactivity. */ |
| public static void flushIdleConnections(IThreadContext threadContext) |
| throws ManifoldCFException |
| { |
| // Go through outstanding connection pools and clean them up. |
| synchronized (connectionPools) |
| { |
| for (ConnectionPool pool : connectionPools.values()) |
| { |
| pool.flushIdleConnections(); |
| } |
| } |
| } |
| |
| /** Throttled connections. Each instance of a connection describes the bins to which it belongs, |
| * along with the actual open connection itself, and the last time the connection was used. */ |
| protected static class ThrottledConnection implements IThrottledConnection |
| { |
| /** Connection pool */ |
| protected final ConnectionPool myPool; |
| /** Fetch throttler */ |
| protected final IFetchThrottler fetchThrottler; |
| /** Protocol */ |
| protected final String protocol; |
| /** Server */ |
| protected final String server; |
| /** Port */ |
| protected final int port; |
| /** Authentication */ |
| protected final PageCredentials authentication; |
| |
| /** This is when the connection will expire. Only valid if connection is in the pool. */ |
| protected long expireTime = -1L; |
| |
| /** The http connection manager. The pool is of size 1. */ |
| protected HttpClientConnectionManager connManager = null; |
| /** The http client object. */ |
| protected HttpClient httpClient = null; |
| /** The method object */ |
| protected HttpRequestBase fetchMethod = null; |
| /** The error trace, if any */ |
| protected Throwable throwable = null; |
| /** The current URL being fetched */ |
| protected String myUrl = null; |
| /** The status code fetched, if any */ |
| protected int statusCode = FETCH_NOT_TRIED; |
| /** The kind of fetch we are doing */ |
| protected String fetchType = null; |
| /** The current bytes in the current fetch */ |
| protected long fetchCounter = 0L; |
| /** The start of the current fetch */ |
| protected long startFetchTime = -1L; |
| /** The cookies from the last fetch */ |
| protected LoginCookies lastFetchCookies = null; |
| /** Proxy host */ |
| protected final String proxyHost; |
| /** Proxy port */ |
| protected final int proxyPort; |
| /** Proxy auth domain */ |
| protected final String proxyAuthDomain; |
| /** Proxy auth user name */ |
| protected final String proxyAuthUsername; |
| /** Proxy auth password */ |
| protected final String proxyAuthPassword; |
| /** Https protocol */ |
| protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory; |
| |
| /** The thread that is actually doing the work */ |
| protected ExecuteMethodThread methodThread = null; |
| /** Set if thread has been started */ |
| protected boolean threadStarted = false; |
| |
| |
| /** Constructor. Create a connection with a specific server and port, and |
| * register it as active against all bins. */ |
| public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler, |
| String protocol, String server, int port, PageCredentials authentication, |
| javax.net.ssl.SSLSocketFactory httpsSocketFactory, |
| String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword) |
| { |
| this.myPool = myPool; |
| this.fetchThrottler = fetchThrottler; |
| this.proxyHost = proxyHost; |
| this.proxyPort = proxyPort; |
| this.proxyAuthDomain = proxyAuthDomain; |
| this.proxyAuthUsername = proxyAuthUsername; |
| this.proxyAuthPassword = proxyAuthPassword; |
| this.protocol = protocol; |
| this.server = server; |
| this.port = port; |
| this.authentication = authentication; |
| this.httpsSocketFactory = httpsSocketFactory; |
| } |
| |
| /** Check whether the connection has expired. |
| *@param currentTime is the current time to use to judge if a connection has expired. |
| *@return true if the connection has expired, and should be closed. |
| */ |
| @Override |
| public boolean hasExpired(long currentTime) |
| { |
| if (connManager != null) |
| { |
| connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS); |
| connManager.closeExpiredConnections(); |
| } |
| return (currentTime > expireTime); |
| } |
| |
| /** Log the fetch of a number of bytes, from within a stream. */ |
| public void logFetchCount(int count) |
| { |
| fetchCounter += (long)count; |
| } |
| |
| /** Destroy the connection forever */ |
| @Override |
| public void destroy() |
| { |
| // Kill the actual connection object. |
| if (connManager != null) |
| { |
| connManager.shutdown(); |
| connManager = null; |
| } |
| |
| } |
| |
| |
| /** Begin the fetch process. |
| * @param fetchType is a short descriptive string describing the kind of fetch being requested. This |
| * is used solely for logging purposes. |
| */ |
| @Override |
| public void beginFetch(String fetchType) |
| throws ManifoldCFException |
| { |
| this.fetchType = fetchType; |
| this.fetchCounter = 0L; |
| try |
| { |
| if (fetchThrottler.obtainFetchDocumentPermission() == false) |
| throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()"); |
| } |
| catch (InterruptedException e) |
| { |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| } |
| } |
| |
| /** Execute the fetch and get the return code. This method uses the |
| * standard logging mechanism to keep track of the fetch attempt. It also |
| * signals the following conditions: ServiceInterruption (if a dynamic |
| * error occurs), or ManifoldCFException if a fatal error occurs, or nothing if |
| * a standard protocol error occurs. |
| * Note that, for proxies etc, the idea is for this fetch request to handle whatever |
| * redirections are needed to support proxies. |
| * @param urlPath is the path part of the url, e.g. "/robots.txt" |
| * @param userAgent is the value of the userAgent header to use. |
| * @param from is the value of the from header to use. |
| * @param connectionTimeoutMilliseconds is the maximum number of milliseconds to wait on socket connect. |
| * @param redirectOK should be set to true if you want redirects to be automatically followed. |
| * @param host is the value to use as the "Host" header, or null to use the default. |
| * @param formData describes additional form arguments and how to fetch the page. |
| * @param loginCookies describes the cookies that should be in effect for this page fetch. |
| */ |
| @Override |
| public void executeFetch(String urlPath, String userAgent, String from, int connectionTimeoutMilliseconds, |
| int socketTimeoutMilliseconds, boolean redirectOK, String host, FormData formData, |
| LoginCookies loginCookies) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // Set up scheme |
| SSLConnectionSocketFactory myFactory = new SSLConnectionSocketFactory(new InterruptibleSocketFactory(httpsSocketFactory,connectionTimeoutMilliseconds), |
| SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); |
| |
| int hostPort; |
| String displayedPort; |
| if (port != -1) |
| { |
| if (!(protocol.equals("http") && port == 80) && |
| !(protocol.equals("https") && port == 443)) |
| { |
| displayedPort = ":"+Integer.toString(port); |
| hostPort = port; |
| } |
| else |
| { |
| displayedPort = ""; |
| hostPort = -1; |
| } |
| } |
| else |
| { |
| displayedPort = ""; |
| hostPort = -1; |
| } |
| |
| StringBuilder sb = new StringBuilder(protocol); |
| sb.append("://").append(server).append(displayedPort).append(urlPath); |
| String fetchUrl = sb.toString(); |
| |
| HttpHost fetchHost = new HttpHost(server,hostPort,protocol); |
| HttpHost hostHost; |
| |
| if (host != null) |
| { |
| sb.setLength(0); |
| sb.append(protocol).append("://").append(host).append(displayedPort).append(urlPath); |
| myUrl = sb.toString(); |
| hostHost = new HttpHost(host,hostPort,protocol); |
| } |
| else |
| { |
| myUrl = fetchUrl; |
| hostHost = fetchHost; |
| } |
| |
| if (connManager == null) |
| { |
| connManager = new PoolingHttpClientConnectionManager(); |
| } |
| |
| long startTime = 0L; |
| if (Logging.connectors.isDebugEnabled()) |
| { |
| startTime = System.currentTimeMillis(); |
| Logging.connectors.debug("WEB: Waiting for an HttpClient object"); |
| } |
| |
| CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
| |
| // Set up authentication to use |
| if (authentication != null) |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: For "+myUrl+", discovered matching authentication credentials"); |
| credentialsProvider.setCredentials(AuthScope.ANY, |
| authentication.makeCredentialsObject(host)); |
| } |
| |
| RequestConfig.Builder requestBuilder = RequestConfig.custom() |
| .setCircularRedirectsAllowed(true) |
| .setSocketTimeout(socketTimeoutMilliseconds) |
| .setStaleConnectionCheckEnabled(true) |
| .setExpectContinueEnabled(true) |
| .setConnectTimeout(connectionTimeoutMilliseconds) |
| .setConnectionRequestTimeout(socketTimeoutMilliseconds) |
| .setCookieSpec(CookieSpecs.BROWSER_COMPATIBILITY) |
| .setRedirectsEnabled(redirectOK); |
| |
| // If there's a proxy, set that too. |
| if (proxyHost != null && proxyHost.length() > 0) |
| { |
| // Configure proxy authentication |
| if (proxyAuthUsername != null && proxyAuthUsername.length() > 0) |
| { |
| credentialsProvider.setCredentials( |
| new AuthScope(proxyHost, proxyPort), |
| new NTCredentials(proxyAuthUsername, (proxyAuthPassword==null)?"":proxyAuthPassword, currentHost, (proxyAuthDomain==null)?"":proxyAuthDomain)); |
| } |
| |
| HttpHost proxy = new HttpHost(proxyHost, proxyPort); |
| |
| requestBuilder.setProxy(proxy); |
| } |
| |
| |
| httpClient = HttpClients.custom() |
| .setConnectionManager(connManager) |
| .setMaxConnTotal(1) |
| .setMaxConnPerRoute(1) |
| .disableAutomaticRetries() |
| .setDefaultCookieSpecRegistry(cookieSpecRegistry) |
| .setDefaultRequestConfig(requestBuilder.build()) |
| .setDefaultSocketConfig(SocketConfig.custom() |
| .setTcpNoDelay(true) |
| .setSoTimeout(socketTimeoutMilliseconds) |
| .build()) |
| .setDefaultCredentialsProvider(credentialsProvider) |
| .setSSLSocketFactory(myFactory) |
| .setRequestExecutor(new HttpRequestExecutor(socketTimeoutMilliseconds)) |
| .setRedirectStrategy(new DefaultRedirectStrategy()) |
| // ??? need to add equivalent of setCookiePolicy(CookiePolicy.BROWSER_COMPATIBILITY) |
| .build(); |
| |
| /* |
| BasicHttpParams params = new BasicHttpParams(); |
| params.setParameter(ClientPNames.DEFAULT_HOST,fetchHost); |
| params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true); |
| params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,true); |
| params.setBooleanParameter(ClientPNames.ALLOW_CIRCULAR_REDIRECTS,true); |
| // MEDIUM_SECURITY compatibility level not supported in HttpComponents. Try BROWSER_NETSCAPE? |
| HttpClientParams.setCookiePolicy(params,CookiePolicy.BROWSER_COMPATIBILITY); |
| params.setBooleanParameter(CookieSpecPNames.SINGLE_COOKIE_HEADER,new Boolean(true)); |
| |
| DefaultHttpClient localHttpClient = new DefaultHttpClient(connManager,params); |
| // No retries |
| localHttpClient.setHttpRequestRetryHandler(new HttpRequestRetryHandler() |
| { |
| public boolean retryRequest( |
| IOException exception, |
| int executionCount, |
| HttpContext context) |
| { |
| return false; |
| } |
| |
| }); |
| localHttpClient.setRedirectStrategy(new DefaultRedirectStrategy()); |
| localHttpClient.getCookieSpecs().register(CookiePolicy.BROWSER_COMPATIBILITY, new CookieSpecFactory() |
| { |
| |
| public CookieSpec newInstance(HttpParams params) |
| { |
| return new LaxBrowserCompatSpec(); |
| } |
| |
| } |
| ); |
| |
| |
| |
| httpClient = localHttpClient; |
| */ |
| |
| |
| // Set the parameters we haven't keyed on (so these can change from request to request) |
| |
| if (host != null) |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: For "+myUrl+", setting virtual host to "+host); |
| } |
| |
| |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Got an HttpClient object after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms."); |
| |
| startFetchTime = System.currentTimeMillis(); |
| |
| int pageFetchMethod = FormData.SUBMITMETHOD_GET; |
| if (formData != null) |
| pageFetchMethod = formData.getSubmitMethod(); |
| switch (pageFetchMethod) |
| { |
| case FormData.SUBMITMETHOD_GET: |
| // MUST be just the path, or apparently we wind up resetting the HostConfiguration |
| // Add additional parameters to url path |
| String fullUrlPath; |
| if (formData != null) |
| { |
| StringBuilder psb = new StringBuilder(urlPath); |
| Iterator iter = formData.getElementIterator(); |
| char appendChar; |
| if (urlPath.indexOf("?") == -1) |
| appendChar = '?'; |
| else |
| appendChar = '&'; |
| try |
| { |
| while (iter.hasNext()) |
| { |
| FormDataElement el = (FormDataElement)iter.next(); |
| psb.append(appendChar); |
| appendChar = '&'; |
| String param = el.getElementName(); |
| String value = el.getElementValue(); |
| psb.append(java.net.URLEncoder.encode(param,"utf-8")); |
| if (value != null) |
| { |
| psb.append('=').append(java.net.URLEncoder.encode(value,"utf-8")); |
| } |
| } |
| } |
| catch (java.io.UnsupportedEncodingException e) |
| { |
| throw new ManifoldCFException("Unsupported encoding: "+e.getMessage(),e); |
| } |
| |
| fullUrlPath = psb.toString(); |
| } |
| else |
| { |
| fullUrlPath = urlPath; |
| } |
| // Hack; apparently httpclient treats // as a protocol specifier and so it rips off the first section of the path in that case. |
| while (fullUrlPath.startsWith("//")) |
| fullUrlPath = fullUrlPath.substring(1); |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Get method for '"+fullUrlPath+"'"); |
| fetchMethod = new HttpGet(fullUrlPath); |
| break; |
| case FormData.SUBMITMETHOD_POST: |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Post method for '"+urlPath+"'"); |
| // MUST be just the path, or apparently we wind up resetting the HostConfiguration |
| HttpPost postMethod = new HttpPost(urlPath); |
| List<NameValuePair> nvps = new ArrayList<NameValuePair>(); |
| |
| // Add parameters to post variables |
| if (formData != null) |
| { |
| Iterator iter = formData.getElementIterator(); |
| while (iter.hasNext()) |
| { |
| FormDataElement e = (FormDataElement)iter.next(); |
| String param = e.getElementName(); |
| String value = e.getElementValue(); |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Post parameter name '"+param+"' value '"+value+"' for '"+urlPath+"'"); |
| nvps.add(new BasicNameValuePair(param,value)); |
| } |
| } |
| postMethod.setEntity(new UrlEncodedFormEntity(nvps,UTF_8)); |
| fetchMethod = postMethod; |
| break; |
| default: |
| throw new ManifoldCFException("Illegal method type: "+Integer.toString(pageFetchMethod)); |
| } |
| |
| // Set all appropriate headers and parameters |
| fetchMethod.setHeader(new BasicHeader("User-Agent",userAgent)); |
| fetchMethod.setHeader(new BasicHeader("From",from)); |
| fetchMethod.setHeader(new BasicHeader("Accept","*/*")); |
| fetchMethod.setHeader(new BasicHeader("Accept-Encoding","gzip,deflate")); |
| |
| // Use a custom cookie store |
| CookieStore cookieStore = new OurBasicCookieStore(); |
| // If we have any cookies to set, set them. |
| if (loginCookies != null) |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Adding "+Integer.toString(loginCookies.getCookieCount())+" cookies for '"+urlPath+"'"); |
| int h = 0; |
| while (h < loginCookies.getCookieCount()) |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Cookie '"+loginCookies.getCookie(h)+"' added"); |
| cookieStore.addCookie(loginCookies.getCookie(h++)); |
| } |
| } |
| |
| |
| // Copy out the current cookies, in case the fetch fails |
| lastFetchCookies = loginCookies; |
| |
| // Create the thread |
| methodThread = new ExecuteMethodThread(this, fetchThrottler, httpClient, hostHost, fetchMethod, cookieStore); |
| try |
| { |
| methodThread.start(); |
| threadStarted = true; |
| try |
| { |
| statusCode = methodThread.getResponseCode(); |
| lastFetchCookies = methodThread.getCookies(); |
| switch (statusCode) |
| { |
| case HttpStatus.SC_REQUEST_TIMEOUT: |
| case HttpStatus.SC_GATEWAY_TIMEOUT: |
| case HttpStatus.SC_SERVICE_UNAVAILABLE: |
| // Temporary service interruption |
| // May want to make the retry time a parameter someday |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Http response temporary error on '"+myUrl+"': "+Integer.toString(statusCode),new ManifoldCFException("Service unavailable (code "+Integer.toString(statusCode)+")"), |
| currentTime + TIME_2HRS, currentTime + TIME_1DAY, -1, false); |
| case HttpStatus.SC_UNAUTHORIZED: |
| case HttpStatus.SC_USE_PROXY: |
| case HttpStatus.SC_OK: |
| case HttpStatus.SC_GONE: |
| case HttpStatus.SC_NOT_FOUND: |
| case HttpStatus.SC_BAD_GATEWAY: |
| case HttpStatus.SC_BAD_REQUEST: |
| case HttpStatus.SC_FORBIDDEN: |
| case HttpStatus.SC_INTERNAL_SERVER_ERROR: |
| default: |
| return; |
| } |
| } |
| catch (InterruptedException e) |
| { |
| methodThread.interrupt(); |
| methodThread = null; |
| threadStarted = false; |
| throw e; |
| } |
| |
| } |
| catch (InterruptedException e) |
| { |
| // Drop the current connection on the floor, so it cannot be reused. |
| fetchMethod = null; |
| throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e); |
| statusCode = FETCH_INTERRUPTED; |
| throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); |
| } |
| catch (java.net.SocketTimeoutException e) |
| { |
| throwable = e; |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_5MIN, |
| currentTime + TIME_2HRS,-1,false); |
| } |
| catch (ConnectTimeoutException e) |
| { |
| throwable = e; |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out waiting for connection for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_5MIN, |
| currentTime + TIME_2HRS,-1,false); |
| } |
| catch (InterruptedIOException e) |
| { |
| //Logging.connectors.warn("IO interruption seen",e); |
| throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e); |
| statusCode = FETCH_INTERRUPTED; |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| } |
| catch (RedirectException e) |
| { |
| throwable = e; |
| statusCode = FETCH_CIRCULAR_REDIRECT; |
| return; |
| } |
| catch (NoHttpResponseException e) |
| { |
| throwable = e; |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_15MIN, |
| currentTime + TIME_2HRS,-1,false); |
| } |
| catch (java.net.ConnectException e) |
| { |
| throwable = e; |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"': "+e.getMessage(), e, currentTime + TIME_2HRS, |
| currentTime + TIME_6HRS,-1,false); |
| } |
| catch (javax.net.ssl.SSLException e) |
| { |
| // Probably this is an incorrectly configured trust store |
| throwable = new ManifoldCFException("SSL handshake error: "+e.getMessage()+"; check your connection's Certificate configuration",e); |
| statusCode = FETCH_IO_ERROR; |
| return; |
| } |
| catch (IOException e) |
| { |
| // Treat this as a bad url. We don't know what happened, but it isn't something we are going to naively |
| // retry on. |
| throwable = e; |
| statusCode = FETCH_IO_ERROR; |
| return; |
| } |
| catch (Throwable e) |
| { |
| Logging.connectors.debug("WEB: Caught an unexpected exception: "+e.getMessage(),e); |
| throwable = e; |
| statusCode = FETCH_UNKNOWN_ERROR; |
| return; |
| } |
| |
| } |
| |
| /** Get the http response code. |
| *@return the response code. This is either an HTTP response code, or one of the codes above. |
| */ |
| @Override |
| public int getResponseCode() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return statusCode; |
| } |
| |
| /** Get the last fetch cookies. |
| *@return the cookies now in effect from the last fetch. |
| */ |
| @Override |
| public LoginCookies getLastFetchCookies() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| { |
| Logging.connectors.debug("WEB: Retrieving cookies..."); |
| for (int i = 0; i < lastFetchCookies.getCookieCount(); i++) |
| { |
| Logging.connectors.debug("WEB: Cookie '"+lastFetchCookies.getCookie(i)+"'"); |
| } |
| } |
| return lastFetchCookies; |
| } |
| |
| /** Get response headers |
| *@return a map keyed by header name containing a list of values. |
| */ |
| @Override |
| public Map<String,List<String>> getResponseHeaders() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (fetchMethod == null) |
| throw new ManifoldCFException("Attempt to get headers when there is no method"); |
| if (methodThread == null || threadStarted == false) |
| throw new ManifoldCFException("Attempt to get headers when no method thread"); |
| try |
| { |
| return methodThread.getResponseHeaders(); |
| } |
| catch (InterruptedException e) |
| { |
| methodThread.interrupt(); |
| throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); |
| } |
| catch (HttpException e) |
| { |
| handleHTTPException(e,"reading headers"); |
| } |
| catch (IOException e) |
| { |
| handleIOException(e,"reading headers"); |
| } |
| return null; |
| } |
| |
| /** Get a specified response header, if it exists. |
| *@param headerName is the name of the header. |
| *@return the header value, or null if it doesn't exist. |
| */ |
| @Override |
| public String getResponseHeader(String headerName) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (fetchMethod == null) |
| throw new ManifoldCFException("Attempt to get a header when there is no method"); |
| if (methodThread == null || threadStarted == false) |
| throw new ManifoldCFException("Attempt to get a header when no method thread"); |
| try |
| { |
| return methodThread.getFirstHeader(headerName); |
| } |
| catch (InterruptedException e) |
| { |
| methodThread.interrupt(); |
| throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); |
| } |
| catch (HttpException e) |
| { |
| handleHTTPException(e,"reading header"); |
| } |
| catch (IOException e) |
| { |
| handleIOException(e,"reading header"); |
| } |
| return null; |
| } |
| |
| /** Get the response input stream. It is the responsibility of the caller |
| * to close this stream when done. |
| */ |
| @Override |
| public InputStream getResponseBodyStream() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (fetchMethod == null) |
| throw new ManifoldCFException("Attempt to get an input stream when there is no method"); |
| if (methodThread == null || threadStarted == false) |
| throw new ManifoldCFException("Attempt to get an input stream when no method thread"); |
| try |
| { |
| return methodThread.getSafeInputStream(); |
| } |
| catch (InterruptedException e) |
| { |
| methodThread.interrupt(); |
| throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED); |
| } |
| catch (IOException e) |
| { |
| handleIOException(e, "reading response stream"); |
| } |
| catch (HttpException e) |
| { |
| handleHTTPException(e, "reading response stream"); |
| } |
| return null; |
| } |
| |
| /** Get limited response as a string. |
| */ |
| @Override |
| public String getLimitedResponseBody(int maxSize, String encoding) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| try |
| { |
| InputStream is = getResponseBodyStream(); |
| try |
| { |
| Reader r = new InputStreamReader(is,encoding); |
| char[] buffer = new char[maxSize]; |
| int amt = r.read(buffer); |
| if (amt == -1) |
| return ""; |
| return new String(buffer,0,amt); |
| } |
| finally |
| { |
| is.close(); |
| } |
| } |
| catch (IOException e) |
| { |
| handleIOException(e,"reading limited response"); |
| } |
| return null; |
| } |
| |
| /** Note that the connection fetch was interrupted by something. |
| */ |
| @Override |
| public void noteInterrupted(Throwable e) |
| { |
| if (statusCode > 0) |
| { |
| throwable = new ManifoldCFException("Interrupted: "+e.getMessage(),e); |
| statusCode = FETCH_INTERRUPTED; |
| } |
| } |
| |
| /** Done with the fetch. Call this when the fetch has been completed. A log entry will be generated |
| * describing what was done. |
| */ |
| @Override |
| public void doneFetch(IVersionActivity activities) |
| throws ManifoldCFException |
| { |
| if (fetchType != null) |
| { |
| // Abort the connection, if not already complete |
| if (methodThread != null && threadStarted) |
| methodThread.abort(); |
| |
| long endTime = System.currentTimeMillis(); |
| |
| activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH, |
| new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null); |
| |
| Logging.connectors.info("WEB: FETCH "+fetchType+"|"+myUrl+"|"+new Long(startFetchTime).toString()+"+"+new Long(endTime-startFetchTime).toString()+"|"+ |
| Integer.toString(statusCode)+"|"+new Long(fetchCounter).toString()+"|"+((throwable==null)?"":(throwable.getClass().getName()+"| "+throwable.getMessage()))); |
| if (throwable != null) |
| { |
| if (Logging.connectors.isDebugEnabled()) |
| Logging.connectors.debug("WEB: Fetch exception for '"+myUrl+"'",throwable); |
| } |
| |
| // Shut down (join) the connection thread, if any, and if it started |
| if (methodThread != null) |
| { |
| if (threadStarted) |
| { |
| try |
| { |
| methodThread.finishUp(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED); |
| } |
| threadStarted = false; |
| } |
| methodThread = null; |
| } |
| |
| fetchMethod = null; |
| throwable = null; |
| startFetchTime = -1L; |
| myUrl = null; |
| statusCode = FETCH_NOT_TRIED; |
| lastFetchCookies = null; |
| fetchType = null; |
| } |
| |
| } |
| |
| /** Close the connection. Call this to return the connection to its pool. |
| */ |
| @Override |
| public void close() |
| { |
| expireTime = System.currentTimeMillis() + idleTimeout; |
| myPool.release(this); |
| } |
| |
| protected void handleHTTPException(HttpException e, String activity) |
| throws ServiceInterruption, ManifoldCFException |
| { |
| long currentTime = System.currentTimeMillis(); |
| Logging.connectors.debug("Web: HTTP exception "+activity+" for '"+myUrl+"', retrying"); |
| throw new ServiceInterruption("HTTP exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false); |
| } |
| |
| protected void handleIOException(IOException e, String activity) |
| throws ServiceInterruption, ManifoldCFException |
| { |
| if (e instanceof java.net.SocketTimeoutException) |
| { |
| long currentTime = System.currentTimeMillis(); |
| Logging.connectors.debug("Web: Socket timeout exception "+activity+" for '"+myUrl+"', retrying"); |
| throw new ServiceInterruption("Socket timeout exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false); |
| } |
| if (e instanceof ConnectTimeoutException) |
| { |
| long currentTime = System.currentTimeMillis(); |
| Logging.connectors.debug("Web: Connect timeout exception "+activity+" for '"+myUrl+"', retrying"); |
| throw new ServiceInterruption("Connect timeout exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false); |
| } |
| if (e instanceof InterruptedIOException) |
| { |
| methodThread.interrupt(); |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| } |
| if (e instanceof NoHttpResponseException) |
| { |
| // Give up after 2 hours. |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out "+activity+" for '"+myUrl+"'", e, currentTime + 15L * 60000L, |
| currentTime + 120L * 60000L,-1,false); |
| } |
| if (e instanceof java.net.ConnectException) |
| { |
| // Give up after 6 hours. |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("Timed out "+activity+" for '"+myUrl+"'", e, currentTime + 1000000L, |
| currentTime + 720L * 60000L,-1,false); |
| } |
| if (e instanceof java.net.NoRouteToHostException) |
| { |
| // This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient |
| // with the network. Some degree of retry is probably wise. |
| long currentTime = System.currentTimeMillis(); |
| throw new ServiceInterruption("No route to host during "+activity+" for '"+myUrl+"'", e, currentTime + 1000000L, |
| currentTime + 720L * 60000L,-1,false); |
| } |
| long currentTime = System.currentTimeMillis(); |
| Logging.connectors.debug("Web: IO exception "+activity+" for '"+myUrl+"', retrying"); |
| throw new ServiceInterruption("IO exception "+activity+": "+e.getMessage(),e,currentTime+TIME_5MIN,-1L,2,false); |
| } |
| |
| } |
| |
| |
| /** This class throttles an input stream based on the specified byte rate parameters. The |
| * throttling takes place across all streams that are open to the server in question. |
| */ |
| protected static class ThrottledInputstream extends InputStream |
| { |
| /** Stream throttler */ |
| protected final IStreamThrottler streamThrottler; |
| /** The throttled connection we belong to */ |
| protected final ThrottledConnection throttledConnection; |
| /** The stream we are wrapping. */ |
| protected final InputStream inputStream; |
| |
| /** Constructor. |
| */ |
| public ThrottledInputstream(IStreamThrottler streamThrottler, ThrottledConnection connection, InputStream is) |
| { |
| this.streamThrottler = streamThrottler; |
| this.throttledConnection = connection; |
| this.inputStream = is; |
| } |
| |
| /** Read a byte. |
| */ |
| @Override |
| public int read() |
| throws IOException |
| { |
| byte[] byteArray = new byte[1]; |
| int count = read(byteArray,0,1); |
| if (count == -1) |
| return count; |
| return ((int)byteArray[0]) & 0xff; |
| } |
| |
| /** Read lots of bytes. |
| */ |
| @Override |
| public int read(byte[] b) |
| throws IOException |
| { |
| return read(b,0,b.length); |
| } |
| |
| /** Read lots of specific bytes. |
| */ |
| @Override |
| public int read(byte[] b, int off, int len) |
| throws IOException |
| { |
| int totalCount = 0; |
| while (len > ThrottledFetcher.READ_CHUNK_LENGTH) |
| { |
| int amt = basicRead(b,off,ThrottledFetcher.READ_CHUNK_LENGTH,totalCount); |
| if (amt == -1) |
| { |
| if (totalCount == 0) |
| return amt; |
| return totalCount; |
| } |
| totalCount += amt; |
| off += amt; |
| len -= amt; |
| } |
| if (len > 0) |
| { |
| int amt = basicRead(b,off,len,totalCount); |
| if (amt == -1) |
| { |
| if (totalCount == 0) |
| return amt; |
| return totalCount; |
| } |
| return totalCount + amt; |
| } |
| return totalCount; |
| } |
| |
| /** Basic read, which uses the server object to throttle activity. |
| */ |
| protected int basicRead(byte[] b, int off, int len, int totalSoFar) |
| throws IOException |
| { |
| try |
| { |
| if (streamThrottler.obtainReadPermission(len) == false) |
| throw new IllegalStateException("Unexpected result calling obtainReadPermission()"); |
| int amt = 0; |
| try |
| { |
| amt = inputStream.read(b,off,len); |
| return amt; |
| } |
| finally |
| { |
| if (amt == -1) |
| streamThrottler.releaseReadPermission(len,0); |
| else |
| { |
| streamThrottler.releaseReadPermission(len,amt); |
| throttledConnection.logFetchCount(amt); |
| } |
| } |
| } |
| catch (InterruptedException e) |
| { |
| InterruptedIOException e2 = new InterruptedIOException("Interrupted"); |
| e2.bytesTransferred = totalSoFar; |
| throw e2; |
| } |
| } |
| |
| /** Skip |
| */ |
| @Override |
| public long skip(long n) |
| throws IOException |
| { |
| // Not sure whether we should bother doing anything with this; it's not used. |
| return inputStream.skip(n); |
| } |
| |
| /** Get available. |
| */ |
| @Override |
| public int available() |
| throws IOException |
| { |
| return inputStream.available(); |
| } |
| |
| /** Mark. |
| */ |
| @Override |
| public void mark(int readLimit) |
| { |
| inputStream.mark(readLimit); |
| } |
| |
| /** Reset. |
| */ |
| @Override |
| public void reset() |
| throws IOException |
| { |
| inputStream.reset(); |
| } |
| |
| /** Check if mark is supported. |
| */ |
| @Override |
| public boolean markSupported() |
| { |
| return inputStream.markSupported(); |
| } |
| |
| /** Close. |
| */ |
| @Override |
| public void close() |
| throws IOException |
| { |
| try |
| { |
| inputStream.close(); |
| } |
| catch (java.net.SocketTimeoutException e) |
| { |
| Logging.connectors.debug("Socket timeout exception trying to close connection: "+e.getMessage(),e); |
| } |
| catch (ConnectTimeoutException e) |
| { |
| Logging.connectors.debug("Socket connection timeout exception trying to close connection: "+e.getMessage(),e); |
| } |
| catch (InterruptedIOException e) |
| { |
| throw e; |
| } |
| catch (java.net.SocketException e) |
| { |
| Logging.connectors.debug("Connection reset while I was closing it: "+e.getMessage(),e); |
| } |
| catch (IOException e) |
| { |
| Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e); |
| } |
| finally |
| { |
| streamThrottler.closeStream(); |
| } |
| } |
| |
| } |
| |
| /** Pool exception class */ |
| protected static class PoolException extends Exception |
| { |
| public PoolException(String message) |
| { |
| super(message); |
| } |
| } |
| |
| /** Wait exception class */ |
| protected static class WaitException extends Exception |
| { |
| protected long amt; |
| |
| public WaitException(long amt) |
| { |
| super("Wait needed"); |
| this.amt = amt; |
| } |
| |
| public long getWaitAmount() |
| { |
| return amt; |
| } |
| } |
| |
| /** Class to create a cookie spec. |
| */ |
| protected static class LaxBrowserCompatSpecFactory extends BrowserCompatSpecFactory |
| { |
| public CookieSpec create(HttpContext context) |
| { |
| return new LaxBrowserCompatSpec(); |
| } |
| } |
| |
| /** Class to override browser compatibility to make it not check cookie paths. See CONNECTORS-97. |
| */ |
| protected static class LaxBrowserCompatSpec extends BrowserCompatSpec |
| { |
| |
| public LaxBrowserCompatSpec() |
| { |
| super(); |
| registerAttribHandler(ClientCookie.PATH_ATTR, new BasicPathHandler() |
| { |
| @Override |
| public void validate(Cookie cookie, CookieOrigin origin) throws MalformedCookieException |
| { |
| // No validation |
| } |
| |
| } |
| ); |
| } |
| |
| } |
| |
| /** This thread does the actual socket communication with the server. |
| * It's set up so that it can be abandoned at shutdown time. |
| * |
| * The way it works is as follows: |
| * - it starts the transaction |
| * - it receives the response, and saves that for the calling class to inspect |
| * - it transfers the data part to an input stream provided to the calling class |
| * - it shuts the connection down |
| * |
| * If there is an error, the sequence is aborted, and an exception is recorded |
| * for the calling class to examine. |
| * |
| * The calling class basically accepts the sequence above. It starts the |
| * thread, and tries to get a response code. If instead an exception is seen, |
| * the exception is thrown up the stack. |
| */ |
| protected static class ExecuteMethodThread extends Thread |
| { |
| /** The connection */ |
| protected final ThrottledConnection theConnection; |
| /** The fetch throttler */ |
| protected final IFetchThrottler fetchThrottler; |
| /** Client and method, all preconfigured */ |
| protected final HttpClient httpClient; |
| protected final HttpHost target; |
| protected final HttpRequestBase executeMethod; |
| protected final CookieStore cookieStore; |
| |
| protected HttpResponse response = null; |
| protected Throwable responseException = null; |
| protected LoginCookies cookies = null; |
| protected Throwable cookieException = null; |
| protected XThreadInputStream threadStream = null; |
| protected InputStream bodyStream = null; |
| protected boolean streamCreated = false; |
| protected Throwable streamException = null; |
| protected boolean abortThread = false; |
| |
| protected Throwable shutdownException = null; |
| |
| protected Throwable generalException = null; |
| |
| public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler, |
| HttpClient httpClient, HttpHost target, HttpRequestBase executeMethod, CookieStore cookieStore) |
| { |
| super(); |
| setDaemon(true); |
| this.theConnection = theConnection; |
| this.fetchThrottler = fetchThrottler; |
| this.httpClient = httpClient; |
| this.target = target; |
| this.executeMethod = executeMethod; |
| this.cookieStore = cookieStore; |
| } |
| |
| public void run() |
| { |
| try |
| { |
| try |
| { |
| // Call the execute method appropriately |
| synchronized (this) |
| { |
| if (!abortThread) |
| { |
| try |
| { |
| HttpContext context = new BasicHttpContext(); |
| context.setAttribute(HttpClientContext.COOKIE_STORE,cookieStore); |
| response = httpClient.execute(target,executeMethod,context); |
| } |
| catch (java.net.SocketTimeoutException e) |
| { |
| responseException = e; |
| } |
| catch (ConnectTimeoutException e) |
| { |
| responseException = e; |
| } |
| catch (InterruptedIOException e) |
| { |
| throw e; |
| } |
| catch (Throwable e) |
| { |
| responseException = e; |
| } |
| this.notifyAll(); |
| } |
| } |
| |
| // Fetch the cookies |
| if (responseException == null) |
| { |
| synchronized (this) |
| { |
| if (!abortThread) |
| { |
| try |
| { |
| cookies = new CookieSet(cookieStore.getCookies()); |
| } |
| catch (Throwable e) |
| { |
| cookieException = e; |
| } |
| this.notifyAll(); |
| } |
| } |
| } |
| |
| // Start the transfer of the content |
| if (cookieException == null && responseException == null) |
| { |
| synchronized (this) |
| { |
| if (!abortThread) |
| { |
| try |
| { |
| boolean gzip = false; |
| boolean deflate = false; |
| Header ceheader = response.getEntity().getContentEncoding(); |
| if (ceheader != null) |
| { |
| HeaderElement[] codecs = ceheader.getElements(); |
| for (int i = 0; i < codecs.length; i++) |
| { |
| if (codecs[i].getName().equalsIgnoreCase("gzip")) |
| { |
| // GZIP |
| gzip = true; |
| break; |
| } |
| else if (codecs[i].getName().equalsIgnoreCase("deflate")) |
| { |
| // Deflate |
| deflate = true; |
| break; |
| } |
| } |
| } |
| bodyStream = response.getEntity().getContent(); |
| if (bodyStream != null) |
| { |
| bodyStream = new ThrottledInputstream(fetchThrottler.createFetchStream(),theConnection,bodyStream); |
| if (gzip) |
| bodyStream = new GZIPInputStream(bodyStream); |
| else if (deflate) |
| bodyStream = new DeflateInputStream(bodyStream); |
| threadStream = new XThreadInputStream(bodyStream); |
| } |
| streamCreated = true; |
| } |
| catch (java.net.SocketTimeoutException e) |
| { |
| streamException = e; |
| } |
| catch (ConnectTimeoutException e) |
| { |
| streamException = e; |
| } |
| catch (InterruptedIOException e) |
| { |
| throw e; |
| } |
| catch (Throwable e) |
| { |
| streamException = e; |
| } |
| this.notifyAll(); |
| } |
| } |
| } |
| |
| if (cookieException == null && responseException == null && streamException == null) |
| { |
| if (threadStream != null) |
| { |
| // Stuff the content until we are done |
| threadStream.stuffQueue(); |
| } |
| } |
| |
| } |
| finally |
| { |
| if (bodyStream != null) |
| { |
| try |
| { |
| bodyStream.close(); |
| } |
| catch (IOException e) |
| { |
| } |
| bodyStream = null; |
| } |
| synchronized (this) |
| { |
| try |
| { |
| executeMethod.abort(); |
| } |
| catch (Throwable e) |
| { |
| shutdownException = e; |
| } |
| this.notifyAll(); |
| } |
| } |
| } |
| catch (Throwable e) |
| { |
| // We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted. |
| this.generalException = e; |
| } |
| } |
| |
| public int getResponseCode() |
| throws InterruptedException, IOException, HttpException |
| { |
| // Must wait until the response object is there |
| while (true) |
| { |
| synchronized (this) |
| { |
| checkException(responseException); |
| if (response != null) |
| return response.getStatusLine().getStatusCode(); |
| wait(); |
| } |
| } |
| } |
| |
| public Map<String,List<String>> getResponseHeaders() |
| throws InterruptedException, IOException, HttpException |
| { |
| // Must wait for the response object to appear |
| while (true) |
| { |
| synchronized (this) |
| { |
| checkException(responseException); |
| if (response != null) |
| { |
| Header[] headers = response.getAllHeaders(); |
| Map<String,List<String>> rval = new HashMap<String,List<String>>(); |
| int i = 0; |
| while (i < headers.length) |
| { |
| Header h = headers[i++]; |
| String name = h.getName(); |
| String value = h.getValue(); |
| List<String> values = rval.get(name); |
| if (values == null) |
| { |
| values = new ArrayList<String>(); |
| rval.put(name,values); |
| } |
| values.add(value); |
| } |
| return rval; |
| } |
| wait(); |
| } |
| } |
| |
| } |
| |
| public String getFirstHeader(String headerName) |
| throws InterruptedException, IOException, HttpException |
| { |
| // Must wait for the response object to appear |
| while (true) |
| { |
| synchronized (this) |
| { |
| checkException(responseException); |
| if (response != null) |
| { |
| Header h = response.getFirstHeader(headerName); |
| if (h == null) |
| return null; |
| return h.getValue(); |
| } |
| wait(); |
| } |
| } |
| } |
| |
| public LoginCookies getCookies() |
| throws InterruptedException, IOException, HttpException |
| { |
| while (true) |
| { |
| synchronized (this) |
| { |
| if (responseException != null) |
| throw new IllegalStateException("Check for response before getting cookies"); |
| checkException(cookieException); |
| if (cookies != null) |
| return cookies; |
| wait(); |
| } |
| } |
| } |
| |
| public InputStream getSafeInputStream() |
| throws InterruptedException, IOException, HttpException |
| { |
| // Must wait until stream is created, or until we note an exception was thrown. |
| while (true) |
| { |
| synchronized (this) |
| { |
| if (responseException != null) |
| throw new IllegalStateException("Check for response before getting stream"); |
| if (cookieException != null) |
| throw new IllegalStateException("Check for cookies before getting stream"); |
| checkException(streamException); |
| if (streamCreated) |
| return threadStream; |
| wait(); |
| } |
| } |
| } |
| |
| public void abort() |
| { |
| // This will be called during the finally |
| // block in the case where all is well (and |
| // the stream completed) and in the case where |
| // there were exceptions. |
| synchronized (this) |
| { |
| if (streamCreated) |
| { |
| if (threadStream != null) |
| threadStream.abort(); |
| } |
| abortThread = true; |
| } |
| } |
| |
| public void finishUp() |
| throws InterruptedException |
| { |
| join(); |
| } |
| |
| protected synchronized void checkException(Throwable exception) |
| throws IOException, HttpException |
| { |
| if (exception != null) |
| { |
| // Throw the current exception, but clear it, so no further throwing is possible on the same problem. |
| Throwable e = exception; |
| if (e instanceof IOException) |
| throw (IOException)e; |
| else if (e instanceof HttpException) |
| throw (HttpException)e; |
| else if (e instanceof RuntimeException) |
| throw (RuntimeException)e; |
| else if (e instanceof Error) |
| throw (Error)e; |
| else |
| throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e); |
| } |
| } |
| |
| } |
| |
| protected static class OurBasicCookieStore implements CookieStore, Serializable { |
| |
| private static final long serialVersionUID = -7581093305228232025L; |
| |
| private final TreeSet<Cookie> cookies; |
| |
| public OurBasicCookieStore() { |
| super(); |
| this.cookies = new TreeSet<Cookie>(new CookieIdentityComparator()); |
| } |
| |
| /** |
| * Adds an {@link Cookie HTTP cookie}, replacing any existing equivalent cookies. |
| * If the given cookie has already expired it will not be added, but existing |
| * values will still be removed. |
| * |
| * @param cookie the {@link Cookie cookie} to be added |
| * |
| * @see #addCookies(Cookie[]) |
| * |
| */ |
| public synchronized void addCookie(Cookie cookie) { |
| if (cookie != null) { |
| // first remove any old cookie that is equivalent |
| cookies.remove(cookie); |
| cookies.add(cookie); |
| } |
| } |
| |
| /** |
| * Adds an array of {@link Cookie HTTP cookies}. Cookies are added individually and |
| * in the given array order. If any of the given cookies has already expired it will |
| * not be added, but existing values will still be removed. |
| * |
| * @param cookies the {@link Cookie cookies} to be added |
| * |
| * @see #addCookie(Cookie) |
| * |
| */ |
| public synchronized void addCookies(Cookie[] cookies) { |
| if (cookies != null) { |
| for (Cookie cooky : cookies) { |
| this.addCookie(cooky); |
| } |
| } |
| } |
| |
| /** |
| * Returns an immutable array of {@link Cookie cookies} that this HTTP |
| * state currently contains. |
| * |
| * @return an array of {@link Cookie cookies}. |
| */ |
| public synchronized List<Cookie> getCookies() { |
| //create defensive copy so it won't be concurrently modified |
| return new ArrayList<Cookie>(cookies); |
| } |
| |
| /** |
| * Removes all of {@link Cookie cookies} in this HTTP state |
| * that have expired by the specified {@link java.util.Date date}. |
| * |
| * @return true if any cookies were purged. |
| * |
| * @see Cookie#isExpired(Date) |
| */ |
| public synchronized boolean clearExpired(final Date date) { |
| if (date == null) { |
| return false; |
| } |
| boolean removed = false; |
| for (Iterator<Cookie> it = cookies.iterator(); it.hasNext();) { |
| if (it.next().isExpired(date)) { |
| it.remove(); |
| removed = true; |
| } |
| } |
| return removed; |
| } |
| |
| /** |
| * Clears all cookies. |
| */ |
| public synchronized void clear() { |
| cookies.clear(); |
| } |
| |
| @Override |
| public synchronized String toString() { |
| return cookies.toString(); |
| } |
| |
| } |
| |
| /** Connection pool key */ |
| protected static class ConnectionPoolKey |
| { |
| protected final String protocol; |
| protected final String server; |
| protected final int port; |
| protected final PageCredentials authentication; |
| protected final String trustStoreString; |
| protected final String proxyHost; |
| protected final int proxyPort; |
| protected final String proxyAuthDomain; |
| protected final String proxyAuthUsername; |
| protected final String proxyAuthPassword; |
| |
| public ConnectionPoolKey(String protocol, |
| String server, int port, PageCredentials authentication, |
| String trustStoreString, String proxyHost, int proxyPort, |
| String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword) |
| { |
| this.protocol = protocol; |
| this.server = server; |
| this.port = port; |
| this.authentication = authentication; |
| this.trustStoreString = trustStoreString; |
| this.proxyHost = proxyHost; |
| this.proxyPort = proxyPort; |
| this.proxyAuthDomain = proxyAuthDomain; |
| this.proxyAuthUsername = proxyAuthUsername; |
| this.proxyAuthPassword = proxyAuthPassword; |
| } |
| |
| public int hashCode() |
| { |
| return protocol.hashCode() + |
| server.hashCode() + |
| (port * 31) + |
| ((authentication==null)?0:authentication.hashCode()) + |
| ((trustStoreString==null)?0:trustStoreString.hashCode()) + |
| ((proxyHost==null)?0:proxyHost.hashCode()) + |
| (proxyPort * 29) + |
| ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) + |
| ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) + |
| ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode()); |
| } |
| |
| public boolean equals(Object o) |
| { |
| if (!(o instanceof ConnectionPoolKey)) |
| return false; |
| ConnectionPoolKey other = (ConnectionPoolKey)o; |
| if (!server.equals(other.server) || |
| port != other.port) |
| return false; |
| if (authentication == null || other.authentication == null) |
| { |
| if (authentication != other.authentication) |
| return false; |
| } |
| else |
| { |
| if (!authentication.equals(other.authentication)) |
| return false; |
| } |
| if (trustStoreString == null || other.trustStoreString == null) |
| { |
| if (trustStoreString != other.trustStoreString) |
| return false; |
| } |
| else |
| { |
| if (!trustStoreString.equals(other.trustStoreString)) |
| return false; |
| } |
| if (proxyHost == null || other.proxyHost == null) |
| { |
| if (proxyHost != other.proxyHost) |
| return false; |
| } |
| else |
| { |
| if (!proxyHost.equals(other.proxyHost)) |
| return false; |
| } |
| if (proxyPort != other.proxyPort) |
| return false; |
| if (proxyAuthDomain == null || other.proxyAuthDomain == null) |
| { |
| if (proxyAuthDomain != other.proxyAuthDomain) |
| return false; |
| } |
| else |
| { |
| if (!proxyAuthDomain.equals(other.proxyAuthDomain)) |
| return false; |
| } |
| if (proxyAuthUsername == null || other.proxyAuthUsername == null) |
| { |
| if (proxyAuthUsername != other.proxyAuthUsername) |
| return false; |
| } |
| else |
| { |
| if (!proxyAuthUsername.equals(other.proxyAuthUsername)) |
| return false; |
| } |
| if (proxyAuthPassword == null || other.proxyAuthPassword == null) |
| { |
| if (proxyAuthPassword != other.proxyAuthPassword) |
| return false; |
| } |
| else |
| { |
| if (!proxyAuthPassword.equals(other.proxyAuthPassword)) |
| return false; |
| } |
| return true; |
| } |
| } |
| |
| /** Each connection pool has identical connections we can draw on. |
| */ |
| protected static class ConnectionPool |
| { |
| /** Throttler */ |
| protected final IConnectionThrottler connectionThrottler; |
| |
| // If we need to create a connection, these are what we use |
| |
| protected final String protocol; |
| protected final String server; |
| protected final int port; |
| protected final PageCredentials authentication; |
| protected final javax.net.ssl.SSLSocketFactory baseFactory; |
| protected final String proxyHost; |
| protected final int proxyPort; |
| protected final String proxyAuthDomain; |
| protected final String proxyAuthUsername; |
| protected final String proxyAuthPassword; |
| |
| /** The actual pool of connections */ |
| protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>(); |
| |
| public ConnectionPool(IConnectionThrottler connectionThrottler, |
| String protocol, |
| String server, int port, PageCredentials authentication, |
| javax.net.ssl.SSLSocketFactory baseFactory, |
| String proxyHost, int proxyPort, |
| String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword) |
| { |
| this.connectionThrottler = connectionThrottler; |
| |
| this.protocol = protocol; |
| this.server = server; |
| this.port = port; |
| this.authentication = authentication; |
| this.baseFactory = baseFactory; |
| this.proxyHost = proxyHost; |
| this.proxyPort = proxyPort; |
| this.proxyAuthDomain = proxyAuthDomain; |
| this.proxyAuthUsername = proxyAuthUsername; |
| this.proxyAuthPassword = proxyAuthPassword; |
| } |
| |
| public IThrottledConnection grab() |
| throws InterruptedException |
| { |
| // Wait for a connection |
| int result = connectionThrottler.waitConnectionAvailable(); |
| if (result == IConnectionThrottler.CONNECTION_FROM_POOL) |
| { |
| // We are guaranteed to have a connection in the pool, unless there's a coding error. |
| synchronized (connections) |
| { |
| return connections.remove(connections.size()-1); |
| } |
| } |
| else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION) |
| { |
| return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(), |
| protocol,server,port,authentication,baseFactory, |
| proxyHost,proxyPort, |
| proxyAuthDomain,proxyAuthUsername,proxyAuthPassword); |
| } |
| else |
| throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result); |
| } |
| |
| public void release(IThrottledConnection connection) |
| { |
| if (connectionThrottler.noteReturnedConnection()) |
| { |
| // Destroy this connection |
| connection.destroy(); |
| connectionThrottler.noteConnectionDestroyed(); |
| } |
| else |
| { |
| // Return to pool |
| synchronized (connections) |
| { |
| connections.add(connection); |
| } |
| connectionThrottler.noteConnectionReturnedToPool(); |
| } |
| } |
| |
| public void flushIdleConnections() |
| { |
| long currentTime = System.currentTimeMillis(); |
| // First, remove connections that are over the quota |
| while (connectionThrottler.checkDestroyPooledConnection()) |
| { |
| // Destroy the oldest ones first |
| IThrottledConnection connection; |
| synchronized (connections) |
| { |
| connection = connections.remove(0); |
| } |
| connection.destroy(); |
| connectionThrottler.noteConnectionDestroyed(); |
| } |
| // Now, get rid of expired connections |
| while (true) |
| { |
| boolean expired; |
| synchronized (connections) |
| { |
| expired = connections.size() > 0 && connections.get(0).hasExpired(currentTime); |
| } |
| if (!expired) |
| break; |
| // We found an expired connection! Now tell the throttler that, and see if it agrees. |
| if (connectionThrottler.checkExpireConnection()) |
| { |
| // Remove a connection from the pool, and destroy it. |
| // It's not guaranteed to be an expired one, but that's a rare occurrence, we expect. |
| IThrottledConnection connection; |
| synchronized (connections) |
| { |
| connection = connections.remove(0); |
| } |
| connection.destroy(); |
| connectionThrottler.noteConnectionDestroyed(); |
| } |
| else |
| break; |
| } |
| } |
| |
| } |
| } |