| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.oodt.cas.pushpull.retrievalsystem; |
| |
| //OODT imports |
| import org.apache.oodt.cas.pushpull.config.Config; |
| import org.apache.oodt.cas.pushpull.config.SiteInfo; |
| import org.apache.oodt.cas.pushpull.exceptions.AlreadyInDatabaseException; |
| import org.apache.oodt.cas.pushpull.exceptions.CrawlerException; |
| import org.apache.oodt.cas.pushpull.exceptions.ProtocolFileException; |
| import org.apache.oodt.cas.pushpull.exceptions.RemoteConnectionException; |
| import org.apache.oodt.cas.pushpull.exceptions.ThreadEvaluatorException; |
| import org.apache.oodt.cas.pushpull.exceptions.ToManyFailedDownloadsException; |
| import org.apache.oodt.cas.pushpull.exceptions.UndefinedTypeException; |
| import org.apache.oodt.cas.pushpull.filerestrictions.renamingconventions.RenamingConvention; |
| import org.apache.oodt.cas.protocol.exceptions.ProtocolException; |
| import org.apache.oodt.cas.protocol.util.ProtocolFileFilter; |
| import org.apache.oodt.cas.protocol.Protocol; |
| import org.apache.oodt.cas.protocol.ProtocolFile; |
| import org.apache.oodt.cas.pushpull.protocol.ProtocolHandler; |
| import org.apache.oodt.cas.pushpull.protocol.RemoteSite; |
| import org.apache.oodt.cas.pushpull.protocol.RemoteSiteFile; |
| import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; |
| import org.apache.oodt.cas.metadata.Metadata; |
| import org.apache.oodt.cas.metadata.util.MimeTypeUtils; |
| |
| |
| //JDK imports |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Vector; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| |
| /** |
| * <pre> |
| * Will crawl external directory structures and will download the files within these structures. |
| * |
| * This class's settings are set using a java .properties file which can be read in and parsed by Config.java. |
| * This .properties file should have the following properties set: |
| * |
| * {@literal #list of sites to crawl |
| * protocol.external.sources=<path-to-xml-file> |
| * |
| * #protocol types |
| * protocolfactory.types=<list-of-protocols-separated-by-commas> (e.g. ftp,http,https,sftp) |
| * |
| * #Protocol factories per types (must have one for each protocol mention in protocolfactory.types -- the property must be name |
| * # as such: protocolfactory.<name-of-protocol-type> |
| * protocolfactory.ftp=<path-to-java-protocolfactory-class> (e.g. org.apache.oodt.cas.protocol.ftp.FtpClientFactory) |
| * protocolfactory.http=<path-to-java-protocolfactory-class> |
| * protocolfactory.https=<path-to-java-protocolfactory-class> |
| * protocolfactory.sftp=<path-to-java-protocolfactory-class> |
| * |
| * #configuration to make java.net.URL accept unsupported protocols -- must exist just as shown |
| * java.protocol.handler.pkgs=org.apache.oodt.cas.url.handlers |
| * } |
| * |
| * In order to specify which external sites to crawl you must create a XML file which contains the |
| * the site and necessary information needed to crawl the site, such as username and password. |
| * protocol.external.sources must contain the path to this file so the crawl knows where to find it. |
| * You can also train this class on how to crawl each given site. This is also specified in an XML |
| * file, whose path must be given in the first mentioned XML file which contians the username and password. |
| * |
| * Then schema for the external sites XML file is as such: |
| * |
| * {@literal <sources> |
| * <source url="url-of-server"> |
| * <username>username</username> |
| * <password>password</password> |
| * <dirstruct>path-to-xml-file</dirstruct> |
| * <crawl>yes-or-no</crawl> |
| * </source> |
| * ... |
| * ... |
| * ... |
| * </sources\>} |
| * |
| * You may specify as many sources as you would like by specifying multiple {@literal <source>} tags. |
| * In the {@literal <source>} tag, the parameter 'url' must be specified. This is the url of the server |
| * you want the crawler to connect to. It should be of the following format: |
| * {@literal <protocol>://<host>} (e.g. sftp://remote.computer.gov) |
| * If no username and password exist, then these elements can be omitted (they are optional). |
| * For {@literal <crawl>} place yes or no here. This is for convenience of being able to keep record of the |
| * sites and their information in this XML file even if you decide that you no longer need to crawl it |
| * anymore (just put {@literal <crawl>no</crawl>} and the crawl will not crawl that site). |
| * {@literal <dirStruct>} contains a path to another XML file which is documented in DirStruct.java javadoc. This |
| * element is optional. If no {@literal <dirStruct>} is given, then every directory will be crawled on the site |
| * and every encountered file will be downloaded. |
| * </pre> |
| * |
| * @author bfoster (Brian Foster) |
| */ |
| public class FileRetrievalSystem { |
| |
| /* our log stream */ |
| private static final Logger LOG = Logger |
| .getLogger(FileRetrievalSystem.class.getName()); |
| |
| private final static int MAX_RETRIES = 3; |
| |
| private LinkedList<ProtocolFile> failedDownloadList; |
| |
| private HashSet<ProtocolFile> currentlyDownloading; |
| |
| private int max_allowed_failed_downloads; |
| |
| /** |
| * The max number of threads able to run at the same time |
| */ |
| private int max_sessions; |
| |
| private final int absMaxAllowedSessions = 50; |
| |
| /** |
| * This is just for clarity purposes. . .I only create the amount of threads |
| * that I will allow to be used at any given moment |
| */ |
| private final static int EXTRA_LAZY_SESSIONS_TIMEOUT = 10; |
| |
| /** |
| * A list of created protocol sessions (devoted to grabbing files from the |
| * crawling directory structure) that are not presently in use. |
| */ |
| private Vector<Protocol> avaliableSessions; |
| |
| /** |
| * The number of sessions that have been created (should always be less than |
| * or equal to MAX_SESSIONS). |
| */ |
| private int numberOfSessions; |
| |
| /** |
| * The thread pool that is in charge of the sessions. |
| */ |
| private ThreadPoolExecutor threadController; |
| |
| /** |
| * Manages the Protocols and always ensures that the Crawler is using the |
| * appropriate protocol for any given server. |
| */ |
| private ProtocolHandler protocolHandler; |
| |
| /** |
| * max_sessions tracker |
| */ |
| private DownloadThreadEvaluator dtEval; |
| |
| private DownloadListener dListener; |
| |
| private Config config; |
| |
| private SiteInfo siteInfo; |
| |
| private HashSet<File> stagingAreas; |
| |
| private MimeTypeUtils mimeTypeDetection; |
| |
| /** |
| * Creates a Crawler based on the URL, DirStruct, and Config objects passed |
| * in. If no DirStruct is needed then set it to null. |
| * |
| * @param url |
| * The URL for which you want this Crawler to crawl |
| * @param dirStruct |
| * The specified directory structure located at the host -- use |
| * to train crawler (see DirStruct). |
| * @param config |
| * The Configuration file that is passed to this objects |
| * ProtocolHandler. |
| * @throws InstantiationException |
| * @throws DatabaseException |
| */ |
| public FileRetrievalSystem(Config config, SiteInfo siteInfo) |
| throws InstantiationException { |
| try { |
| protocolHandler = new ProtocolHandler(config.getProtocolInfo()); |
| this.config = config; |
| this.siteInfo = siteInfo; |
| mimeTypeDetection = new MimeTypeUtils(config |
| .getProductTypeDetectionFile()); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw new InstantiationException( |
| "Failed to create FileRetrievalSystem : " + e.getMessage()); |
| } |
| } |
| |
| public void registerDownloadListener(DownloadListener dListener) { |
| this.dListener = dListener; |
| } |
| |
| public void initialize() throws IOException { |
| try { |
| resetVariables(); |
| } catch (Exception e) { |
| throw new IOException("Failed to initialize FileRetrievalSystem : " |
| + e.getMessage()); |
| } |
| } |
| |
| /** |
| * Initializes variables that must be reset when more than one crawl is done |
| * |
| * @throws ThreadEvaluatorException |
| */ |
| void resetVariables() throws ThreadEvaluatorException { |
| numberOfSessions = 0; |
| stagingAreas = new HashSet<File>(); |
| avaliableSessions = new Vector<Protocol>(); |
| currentlyDownloading = new HashSet<ProtocolFile>(); |
| failedDownloadList = new LinkedList<ProtocolFile>(); |
| max_allowed_failed_downloads = config.getMaxFailedDownloads(); |
| max_sessions = config.getRecommendedThreadCount(); |
| threadController = new ThreadPoolExecutor(this.max_sessions, |
| this.max_sessions, EXTRA_LAZY_SESSIONS_TIMEOUT, |
| TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); |
| if (config.useTracker()) |
| dtEval = new DownloadThreadEvaluator(this.absMaxAllowedSessions); |
| } |
| |
| /** |
| * reset error flag |
| */ |
| public void clearErrorFlag() { |
| max_allowed_failed_downloads += config.getMaxFailedDownloads(); |
| } |
| |
| public boolean isAlreadyInDatabase(RemoteFile rf) throws CatalogException { |
| return config.getIngester() != null ? config.getIngester().hasProduct( |
| config.getFmUrl(), rf.getMetadata(RemoteFile.PRODUCT_NAME)) |
| : false; |
| } |
| |
| public List<RemoteSiteFile> getNextPage(final RemoteSiteFile dir, |
| final ProtocolFileFilter filter) throws RemoteConnectionException { |
| for (int i = 0; i < 3; i++) { |
| try { |
| return protocolHandler.nextPage(dir.getSite(), protocolHandler |
| .getAppropriateProtocol(dir, true, true), |
| new ProtocolFileFilter() { |
| @Override |
| public boolean accept(ProtocolFile file) { |
| return filter.accept(file) |
| && !FileRetrievalSystem.this |
| .isDownloading(file); |
| } |
| }); |
| } catch (Exception e) { |
| LOG.log(Level.WARNING, "Retrying to get next page for " + dir |
| + " because operation failed : " + e.getMessage(), e); |
| } |
| } |
| throw new RemoteConnectionException("Failed to get next page for " |
| + dir); |
| } |
| |
| public void changeToRoot(RemoteSite remoteSite) throws ProtocolException, |
| MalformedURLException, org.apache.oodt.cas.protocol.exceptions.ProtocolException { |
| if (validate(remoteSite)) |
| protocolHandler.cdToROOT(protocolHandler |
| .getAppropriateProtocolBySite(remoteSite, true)); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public void changeToHOME(RemoteSite remoteSite) throws ProtocolException, |
| MalformedURLException { |
| if (validate(remoteSite)) |
| protocolHandler.cdToHOME(protocolHandler |
| .getAppropriateProtocolBySite(remoteSite, true)); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public void changeToDir(String dir, RemoteSite remoteSite) |
| throws MalformedURLException, ProtocolException { |
| if (validate(remoteSite)) |
| this |
| .changeToDir(protocolHandler.getProtocolFileFor(remoteSite, |
| protocolHandler.getAppropriateProtocolBySite( |
| remoteSite, true), dir, true)); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public void changeToDir(RemoteSiteFile pFile) throws ProtocolException, |
| MalformedURLException { |
| RemoteSite remoteSite = pFile.getSite(); |
| if (validate(remoteSite)) |
| protocolHandler.cd(protocolHandler.getAppropriateProtocolBySite( |
| remoteSite, true), pFile); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public ProtocolFile getHomeDir(RemoteSite remoteSite) |
| throws ProtocolException { |
| if (validate(remoteSite)) |
| return protocolHandler.getHomeDir(remoteSite, protocolHandler |
| .getAppropriateProtocolBySite(remoteSite, true)); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public ProtocolFile getProtocolFile(RemoteSite remoteSite, String file, |
| boolean isDir) throws ProtocolException { |
| if (validate(remoteSite)) |
| return protocolHandler.getProtocolFileFor(remoteSite, protocolHandler |
| .getAppropriateProtocolBySite(remoteSite, true), file, |
| isDir); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public ProtocolFile getCurrentFile(RemoteSite remoteSite) |
| throws ProtocolFileException, ProtocolException, |
| MalformedURLException { |
| if (validate(remoteSite)) |
| return protocolHandler.pwd(remoteSite, protocolHandler |
| .getAppropriateProtocolBySite(remoteSite, true)); |
| else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| // returns true if download was added to queue. . .false otherwise |
| public boolean addToDownloadQueue(RemoteSite remoteSite, String file, |
| String renamingString, File downloadToDir, |
| String uniqueMetadataElement, boolean deleteAfterDownload, Metadata fileMetadata) |
| throws ToManyFailedDownloadsException, RemoteConnectionException, |
| ProtocolFileException, ProtocolException, |
| AlreadyInDatabaseException, UndefinedTypeException, |
| CatalogException, IOException { |
| if (validate(remoteSite)) { |
| if (!file.startsWith("/")) |
| file = "/" + file; |
| return addToDownloadQueue(protocolHandler.getProtocolFileFor(remoteSite, |
| protocolHandler.getAppropriateProtocolBySite(remoteSite, |
| true), file, false), renamingString, downloadToDir, |
| uniqueMetadataElement, deleteAfterDownload, fileMetadata); |
| } else |
| throw new ProtocolException("Not a valid remote site " + remoteSite); |
| } |
| |
| public boolean validate(RemoteSite remoteSite) { |
| Preconditions.checkNotNull(remoteSite); |
| LinkedList<RemoteSite> remoteSites = this.siteInfo |
| .getPossibleRemoteSites(remoteSite.getAlias(), remoteSite |
| .getURL(), remoteSite.getUsername(), remoteSite |
| .getPassword()); |
| if (remoteSites.size() == 1) { |
| RemoteSite rs = remoteSites.get(0); |
| remoteSite.copy(rs); |
| return true; |
| } |
| return false; |
| } |
| |
| public void waitUntilAllCurrentDownloadsAreComplete() |
| throws ProtocolException { |
| synchronized (this) { |
| for (int i = 0; i < 180; i++) { |
| try { |
| if (this.avaliableSessions.size() == this.numberOfSessions) |
| return; |
| else |
| this.wait(5000); |
| } catch (Exception e) { |
| } |
| } |
| throw new ProtocolException( |
| "Downloads appear to be hanging . . . aborting wait . . . waited for 15 minutes"); |
| } |
| } |
| |
| public boolean addToDownloadQueue(RemoteSiteFile file, |
| String renamingString, |
| File downloadToDir, |
| String uniqueMetadataElement, |
| boolean deleteAfterDownload, |
| Metadata fileMetadata) throws ToManyFailedDownloadsException, |
| RemoteConnectionException, |
| AlreadyInDatabaseException, |
| UndefinedTypeException, |
| CatalogException, |
| IOException { |
| if (this.failedDownloadList.size() > max_allowed_failed_downloads) |
| throw new ToManyFailedDownloadsException( |
| "Number of failed downloads exceeds " |
| + max_allowed_failed_downloads |
| + " . . . blocking all downloads from being added to queue . . . " |
| + "reset error flag in order to force allow downloads into queue"); |
| if (this.isDownloading(file)) { |
| LOG.log(Level.WARNING, "Skipping file '" + file |
| + "' because it is already on the download queue"); |
| return false; |
| } |
| |
| RemoteFile remoteFile = new RemoteFile(file); |
| remoteFile.addMetadata(fileMetadata); |
| remoteFile.addMetadata(RemoteFile.RENAMING_STRING, renamingString); |
| remoteFile.addMetadata(RemoteFile.DELETE_AFTER_DOWNLOAD, |
| deleteAfterDownload + ""); |
| |
| if (config.onlyDownloadDefinedTypes()) { |
| String mimeType = this.mimeTypeDetection.getMimeType(file.getName()); |
| if (mimeType != null |
| && !mimeType.equals("application/octet-stream")) { |
| remoteFile.addMetadata(RemoteFile.MIME_TYPE, mimeType); |
| remoteFile.addMetadata(RemoteFile.SUPER_TYPE, this.mimeTypeDetection |
| .getSuperTypeForMimeType(mimeType)); |
| String description = this.mimeTypeDetection |
| .getDescriptionForMimeType(mimeType); |
| if (!Strings.isNullOrEmpty(description)) { |
| if(description.indexOf("&") != -1){ |
| for (String field : description.split("\\&\\&")) { |
| String[] keyval = field.split("\\="); |
| remoteFile.addMetadata(keyval[0].trim(), keyval[1].trim()); |
| } |
| } else{ |
| // it's the ProductType |
| remoteFile.addMetadata(RemoteFile.PRODUCT_TYPE, description); |
| } |
| if (remoteFile.getMetadata(RemoteFile.UNIQUE_ELEMENT) != null) { |
| uniqueMetadataElement = remoteFile.getMetadata(RemoteFile.UNIQUE_ELEMENT); |
| } |
| } |
| } else { |
| throw new UndefinedTypeException("File '" + file |
| + "' is not a defined type"); |
| } |
| } |
| |
| downloadToDir = new File(downloadToDir.isAbsolute() ? downloadToDir |
| .getAbsolutePath() : this.config.getBaseStagingArea() + "/" |
| + downloadToDir.getPath()); |
| if (!this.isStagingAreaInitialized(downloadToDir)) |
| this.initializeStagingArea(downloadToDir); |
| |
| remoteFile.addMetadata(RemoteFile.DOWNLOAD_TO_DIR, downloadToDir.getAbsolutePath()); |
| |
| if (remoteFile.getMetadata(RemoteFile.PRODUCT_NAME_GENERATOR) != null) { |
| remoteFile.addMetadata(RemoteFile.PRODUCT_NAME, RenamingConvention.rename(remoteFile, remoteFile.getMetadata(RemoteFile.PRODUCT_NAME_GENERATOR))); |
| }else { |
| remoteFile.setUniqueMetadataElement(uniqueMetadataElement == null ? RemoteFile.FILENAME : uniqueMetadataElement); |
| } |
| |
| if (!isAlreadyInDatabase(remoteFile)) { |
| |
| // get download location |
| File newFile = getSaveToLoc(remoteFile); |
| |
| // add session to thread pool |
| if (!this.isInStagingArea(newFile)) { |
| for (int retries = 0;; retries++) { |
| try { |
| addSessionToThreadPool( |
| getNextAvaliableSession(remoteFile |
| .getProtocolFile()), remoteFile, |
| newFile); |
| return true; |
| } catch (Exception e) { |
| if (retries < MAX_RETRIES) { |
| LOG.log(Level.WARNING, "Failed to get session for " |
| + file + " . . . retrying in 5 secs"); |
| synchronized (this) { |
| try { |
| wait(5000); |
| } catch (Exception e1) { |
| } |
| } |
| } else { |
| this.failedDownloadList.add(file); |
| throw new RemoteConnectionException( |
| "Failed to get session to download " + file |
| + " : " + e.getMessage(), e); |
| } |
| } |
| } |
| } else { |
| if (deleteAfterDownload) { |
| try { |
| protocolHandler |
| .delete(protocolHandler.getAppropriateProtocol( |
| file, true, true), file); |
| } catch (Exception e) { |
| LOG.log(Level.SEVERE, |
| "Failed to delete file from server : " |
| + e.getMessage()); |
| } |
| } |
| LOG.log(Level.WARNING, "Skipping file " + file |
| + " because it is already in staging area"); |
| return false; |
| } |
| } else |
| throw new AlreadyInDatabaseException("File " + file |
| + " is already the database"); |
| } |
| |
| private boolean isStagingAreaInitialized(File stagingArea) { |
| return this.stagingAreas.contains(stagingArea); |
| } |
| |
| private boolean isInStagingArea(final File findFile) { |
| return (findFile.exists() || new File(findFile.getParentFile(), |
| "Downloading_" + findFile.getName()).exists()); |
| } |
| |
| private void initializeStagingArea(File stagingArea) throws IOException { |
| LOG.log(Level.INFO, "Preparing staging area " + stagingArea); |
| if (stagingArea.exists()) { |
| File[] failedDownloads = stagingArea.listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname.getName().startsWith("Downloading_"); |
| } |
| }); |
| for (File file : failedDownloads) { |
| LOG.log(Level.INFO, "Removing failed download file " |
| + file.getAbsolutePath()); |
| file.delete(); |
| } |
| } else { |
| LOG.log(Level.INFO, "Staging area " + stagingArea.getAbsolutePath() |
| + " does not exist! -- trying to create it "); |
| if (!stagingArea.mkdirs()) |
| throw new IOException("Failed to create staging area at " |
| + stagingArea.getAbsolutePath()); |
| } |
| this.stagingAreas.add(stagingArea); |
| } |
| |
| File getSaveToLoc(RemoteFile remoteFile) { |
| String renamingString = remoteFile |
| .getMetadata(RemoteFile.RENAMING_STRING); |
| if (renamingString == null || renamingString.equals("")) { |
| return new File(remoteFile.getMetadata(RemoteFile.DOWNLOAD_TO_DIR) |
| + "/" + remoteFile.getMetadata(RemoteFile.FILENAME)); |
| } else { |
| File newFile = new File(remoteFile |
| .getMetadata(RemoteFile.DOWNLOAD_TO_DIR) |
| + "/" |
| + RenamingConvention.rename(remoteFile, renamingString)); |
| if (!newFile.getParentFile().equals( |
| remoteFile.getMetadata(RemoteFile.DOWNLOAD_TO_DIR))) |
| newFile.getParentFile().mkdirs(); |
| return newFile; |
| } |
| } |
| |
| Protocol getNextAvaliableSession(RemoteSiteFile file) throws CrawlerException { |
| // wait for available session, then load it |
| Protocol session; |
| while ((session = getSession(file)) == null) { |
| try { |
| waitMainThread(); |
| } catch (InterruptedException e1) { |
| } |
| } |
| return session; |
| } |
| |
| /** |
| * Sleeps the crawling thread |
| * |
| * @throws InterruptedException |
| */ |
| synchronized void waitMainThread() throws InterruptedException { |
| wait(); |
| } |
| |
| /** |
| * Wakes up the crawling thread |
| */ |
| synchronized void wakeUpMainThread() { |
| notify(); |
| } |
| |
| /** |
| * Increments the number of downloading session |
| */ |
| synchronized void incrementSessions() { |
| numberOfSessions++; |
| } |
| |
| synchronized void decrementSessions() { |
| this.numberOfSessions--; |
| } |
| |
| /** |
| * Gets an available downloading session Protocol. Returns null if none are |
| * available |
| * |
| * @param path |
| * The session returned will be checked against the Path passed |
| * in and if not presently connected to the Path's URL, it will |
| * be disconnected from it's current server and connected to the |
| * server specified by the Path. |
| * @return The found downloading session Protocol |
| * @throws RemoteCommunicationException |
| * If downloading session Protocol has to be reconnected and |
| * there is an error communicating with the server |
| */ |
| synchronized Protocol getSession(RemoteSiteFile file) throws CrawlerException { |
| try { |
| Protocol session = null; |
| if (file.getSite().getMaxConnections() < 0 |
| || file.getSite().getMaxConnections() > this.getCurrentlyDownloadingFiles().size()) { |
| if (avaliableSessions.size() > 0) { |
| session = modifyAvailableSessionForPath(file); |
| } else if (numberOfSessions < max_sessions) { |
| session = createNewSessionForPath(file); |
| incrementSessions(); |
| } |
| } |
| return session; |
| } catch (Exception e) { |
| throw new CrawlerException("Failed to get new session : " |
| + e.getMessage(), e); |
| } |
| } |
| |
| Protocol createNewSessionForPath(RemoteSiteFile file) |
| throws RemoteConnectionException { |
| return protocolHandler.getAppropriateProtocol(file, /* reuse */false, /* navigate */ |
| true); |
| } |
| |
| Protocol modifyAvailableSessionForPath(RemoteSiteFile file) |
| throws ProtocolException, RemoteConnectionException { |
| Protocol session = getAvailableSession(); |
| if (!file.getSite().getURL().getHost().equals( |
| file.getSite().getURL().getHost()) |
| || !protocolHandler.isProtocolConnected(session)) { |
| protocolHandler.disconnect(session); |
| session = protocolHandler.getAppropriateProtocol(file, /* reuse */ |
| false, /* navigate */true); |
| } else { |
| try { |
| if (file.isDir()) |
| protocolHandler.cd(session, file); |
| else |
| protocolHandler.cd(session, |
| new RemoteSiteFile(file.getParent(), file.getSite())); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| try { |
| protocolHandler.disconnect(session); |
| } catch (Exception exc) { |
| } |
| session = protocolHandler.getAppropriateProtocol(file, /* reuse */ |
| false, /* navigate */true); |
| } |
| } |
| return session; |
| } |
| |
| /** |
| * Puts a session in the available session list |
| * |
| * @param session |
| * The Protocol session to be added to the available list |
| */ |
| synchronized void addAvailableSession(Protocol session) { |
| avaliableSessions.add(session); |
| } |
| |
| /** |
| * Removes a session from the available list and returns it |
| * |
| * @return An available downloading Protocol session |
| */ |
| synchronized Protocol getAvailableSession() { |
| return avaliableSessions.remove(0); |
| } |
| |
| synchronized int getNumberOfUsedSessions() { |
| return numberOfSessions - avaliableSessions.size(); |
| } |
| |
| /** |
| * Registers a downloading session with the threadpoolexecutor to begin |
| * downloading the specified ProtocolFile to the local File location |
| * |
| * @param session |
| * The downloading Protocol session to be used to download the |
| * ProtocolFile |
| * @param protocolFile |
| * The file to be downloaded |
| * @param newFile |
| * The location which the downloaded file will be stored |
| */ |
| void addSessionToThreadPool(final Protocol session, |
| final RemoteFile remoteFile, final File newFile) { |
| this.addToDownloadingList(remoteFile.getProtocolFile()); |
| threadController.execute(new Runnable() { |
| @Override |
| public void run() { |
| boolean successful = false; |
| int retries = 0; |
| Protocol curSession = session; |
| |
| if (FileRetrievalSystem.this.dListener != null) |
| FileRetrievalSystem.this.dListener |
| .downloadStarted(remoteFile.getProtocolFile()); |
| |
| // try until successful or all retries have been used |
| do { |
| try { |
| // if thread tracker is to be used |
| if (config.useTracker()) { |
| dtEval.startTrackingDownloadRuntimeForFile(newFile); |
| protocolHandler.download(curSession, remoteFile |
| .getProtocolFile(), newFile, remoteFile |
| .getMetadata( |
| RemoteFile.DELETE_AFTER_DOWNLOAD) |
| .equals("true")); |
| dtEval.fileDownloadComplete(newFile); |
| threadController |
| .setCorePoolSize(max_sessions = dtEval |
| .getRecommendedThreadCount()); |
| threadController.setMaximumPoolSize(max_sessions); |
| // if static number of threads are to be used |
| } else { |
| protocolHandler.download(curSession, remoteFile |
| .getProtocolFile(), newFile, remoteFile |
| .getMetadata( |
| RemoteFile.DELETE_AFTER_DOWNLOAD) |
| .equals("true")); |
| } |
| |
| successful = true; |
| if (FileRetrievalSystem.this.dListener != null) |
| FileRetrievalSystem.this.dListener |
| .downloadFinished(remoteFile |
| .getProtocolFile()); |
| |
| remoteFile.addMetadata(RemoteFile.FILE_SIZE, newFile |
| .length() |
| + ""); |
| |
| // try to create the metadata file |
| if (config.getWriteMetFile()) { |
| try { |
| LOG.log(Level.INFO, "Writing metadata file for '" + newFile + "'"); |
| remoteFile.addMetadata(RemoteFile.FILE_SIZE, |
| newFile.length() + ""); |
| remoteFile.writeToPropEqValFile(newFile |
| .getAbsolutePath() |
| + "." + config.getMetFileExtension(), |
| config.getListOfMetadataToOutput()); |
| } catch (Exception e) { |
| LOG.log(Level.SEVERE, |
| "Failed to create metadata file for " |
| + remoteFile.getProtocolFile()); |
| } |
| } |
| |
| } catch (Exception e) { |
| |
| // if tracker is being used cancel tracking |
| if (config.useTracker()) |
| dtEval.cancelRuntimeTracking(newFile); |
| |
| // delete any created file from staging area |
| newFile.delete(); |
| new File(newFile.getAbsolutePath() + "." |
| + config.getMetFileExtension()).delete(); |
| |
| // check if a retry is still allowed |
| if (++retries > MAX_RETRIES) { |
| FileRetrievalSystem.this.failedDownloadList |
| .add(remoteFile.getProtocolFile()); |
| LOG.log(Level.SEVERE, "Failed to download " |
| + remoteFile.getProtocolFile() + " : " |
| + e.getMessage()); |
| if (FileRetrievalSystem.this.dListener != null) |
| FileRetrievalSystem.this.dListener |
| .downloadFailed(remoteFile |
| .getProtocolFile(), e |
| .getMessage()); |
| break; |
| } else if (FileRetrievalSystem.this.failedDownloadList |
| .size() < max_allowed_failed_downloads) { |
| // discard current session and recreate a new |
| // session to try to download file with |
| LOG.log(Level.WARNING, "Retrying to download file " |
| + remoteFile.getProtocolFile() |
| + " because download failed : " |
| + e.getMessage(), e); |
| try { |
| protocolHandler.disconnect(curSession); |
| } catch (Exception exc) { |
| } |
| try { |
| curSession = protocolHandler |
| .getAppropriateProtocol(remoteFile |
| .getProtocolFile(), false, true); |
| } catch (Exception exc) { |
| LOG.log(Level.SEVERE, |
| "Failed to reconnect protocol to retry download of file " |
| + remoteFile.getProtocolFile() |
| + " -- aborting retry : " |
| + e.getMessage(), e); |
| } |
| } else { |
| LOG |
| .log( |
| Level.SEVERE, |
| "Terminating download tries for file " |
| + remoteFile |
| .getProtocolFile() |
| + " do to too many previous download failures : " |
| + e.getMessage(), e); |
| if (FileRetrievalSystem.this.dListener != null) |
| FileRetrievalSystem.this.dListener |
| .downloadFailed(remoteFile |
| .getProtocolFile(), e |
| .getMessage()); |
| break; |
| } |
| |
| } |
| } while (!successful); |
| |
| FileRetrievalSystem.this.removeFromDownloadingList(remoteFile |
| .getProtocolFile()); |
| determineSessionFate(curSession); |
| } |
| }); |
| } |
| |
| private synchronized void addToDownloadingList(ProtocolFile pFile) { |
| this.currentlyDownloading.add(pFile); |
| } |
| |
| private synchronized void removeFromDownloadingList(ProtocolFile pFile) { |
| this.currentlyDownloading.remove(pFile); |
| } |
| |
| public synchronized boolean isDownloading(ProtocolFile pFile) { |
| return this.currentlyDownloading.contains(pFile); |
| } |
| |
| public synchronized LinkedList<ProtocolFile> getCurrentlyDownloadingFiles() { |
| LinkedList<ProtocolFile> list = new LinkedList<ProtocolFile>(); |
| list.addAll(this.currentlyDownloading); |
| return list; |
| } |
| |
| public LinkedList<ProtocolFile> getListOfFailedDownloads() { |
| return this.failedDownloadList; |
| } |
| |
| public void clearFailedDownloadsList() { |
| this.failedDownloadList.clear(); |
| } |
| |
| synchronized void determineSessionFate(Protocol session) { |
| // determine whether thread should be keep or should be thrown away |
| if (numberOfSessions <= max_sessions) { |
| giveBackSession(session); |
| } else { |
| disposeOfSession(session); |
| } |
| } |
| |
| void giveBackSession(Protocol session) { |
| addAvailableSession(session); |
| wakeUpMainThread(); |
| } |
| |
| void disposeOfSession(Protocol session) { |
| try { |
| protocolHandler.disconnect(session); |
| } catch (Exception e) { |
| // log failure |
| } |
| numberOfSessions--; |
| } |
| |
| public void shutdown() { |
| try { |
| // close out threadpool |
| threadController.shutdown(); |
| // give a max of 10 minutes to finish downloading any files |
| threadController.awaitTermination(600, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| // log failure |
| } |
| |
| try { |
| this.resetVariables(); |
| } catch (Exception e) { |
| |
| } |
| |
| try { |
| closeSessions(); |
| } catch (Exception e) { |
| // log failure!!! |
| } |
| |
| try { |
| protocolHandler.close(); |
| } catch (Exception e) { |
| // log failure!!! |
| } |
| |
| } |
| |
| /** |
| * Disconnects all downloading Protocol sessions in the avaiableSessions |
| * list. The ThreadPoolExecutor needs to be completely shutdown before this |
| * method should be called. Otherwise some Protocols might not be |
| * disconnected or left downloading. |
| * |
| * @return True if successful, false otherwise |
| * @throws RemoteConnectionException |
| */ |
| public boolean closeSessions() throws RemoteConnectionException { |
| for (Protocol session : avaliableSessions) { |
| protocolHandler.disconnect(session); |
| } |
| // sessions.clear(); |
| avaliableSessions.clear(); |
| numberOfSessions = 0; |
| return true; |
| } |
| } |