| /* $Id: WorkerThread.java 988245 2010-08-23 18:39:35Z kwright $ */ |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.manifoldcf.crawler.system; |
| |
| import org.apache.manifoldcf.core.interfaces.*; |
| import org.apache.manifoldcf.agents.interfaces.*; |
| import org.apache.manifoldcf.crawler.interfaces.*; |
| import org.apache.manifoldcf.core.util.URLEncoder; |
| import java.util.*; |
| import java.io.*; |
| import java.lang.reflect.*; |
| |
| /** This class represents a worker thread. Hundreds of these threads are instantiated in order to |
| * perform crawling and extraction. |
| */ |
| public class WorkerThread extends Thread |
| { |
| public static final String _rcsid = "@(#)$Id: WorkerThread.java 988245 2010-08-23 18:39:35Z kwright $"; |
| |
| |
| // Local data |
| /** Thread id */ |
| protected final String id; |
| /** This is a reference to the static main document queue */ |
| protected final DocumentQueue documentQueue; |
| /** Worker thread pool reset manager */ |
| protected final WorkerResetManager resetManager; |
| /** Queue tracker */ |
| protected final QueueTracker queueTracker; |
| /** Process ID */ |
| protected final String processID; |
| |
| /** Constructor. |
| *@param id is the worker thread id. |
| */ |
| public WorkerThread(String id, DocumentQueue documentQueue, WorkerResetManager resetManager, QueueTracker queueTracker, String processID) |
| throws ManifoldCFException |
| { |
| super(); |
| this.id = id; |
| this.documentQueue = documentQueue; |
| this.resetManager = resetManager; |
| this.queueTracker = queueTracker; |
| this.processID = processID; |
| setName("Worker thread '"+id+"'"); |
| setDaemon(true); |
| |
| } |
| |
| public void run() |
| { |
| // Register this thread in the worker reset manager |
| resetManager.registerMe(); |
| |
| try |
| { |
| // Create a thread context object. |
| IThreadContext threadContext = ThreadContextFactory.make(); |
| IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext); |
| IJobManager jobManager = JobManagerFactory.make(threadContext); |
| IBinManager binManager = BinManagerFactory.make(threadContext); |
| IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext); |
| IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext); |
| |
| IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext); |
| |
| // This is the set of documents that we will either be marking as complete, or requeued, depending on the kind of crawl. |
| List<QueuedDocument> finishList = new ArrayList<QueuedDocument>(); |
| |
| // This is where we accumulate the document QueuedDocuments to be deleted from the job queue. |
| List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>(); |
| |
| // This is where we accumulate documents that need to be placed in the HOPCOUNTREMOVED |
| // state |
| List<QueuedDocument> hopcountremoveList = new ArrayList<QueuedDocument>(); |
| |
| // This is where we accumulate documents that need to be rescanned |
| List<QueuedDocument> rescanList = new ArrayList<QueuedDocument>(); |
| |
| // This is where we store document ID strings of documents that need to be noted as having |
| // been checked. |
| List<String> ingesterCheckList = new ArrayList<String>(); |
| |
| // Service interruption thrown with "abort on fail". |
| ManifoldCFException abortOnFail = null; |
| |
| // Loop |
| while (true) |
| { |
| // Do another try/catch around everything in the loop |
| try |
| { |
| if (Thread.currentThread().isInterrupted()) |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| |
| // Before we begin, conditionally reset |
| resetManager.waitForReset(threadContext); |
| |
| // Once we pull something off the queue, we MUST make sure that |
| // we update its status, even if there is an exception!!! |
| |
| // See if there is anything on the queue for me |
| QueuedDocumentSet qds = documentQueue.getDocument(queueTracker); |
| if (qds == null) |
| // It's a reset, so recycle |
| continue; |
| |
| try |
| { |
| // System.out.println("Got a document set"); |
| |
| if (Thread.currentThread().isInterrupted()) |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| |
| // First of all: find out if the job for these documents has been aborted, paused, etc. |
| // If so, we requeue the documents immediately. |
| IJobDescription job = qds.getJobDescription(); |
| Long jobID = job.getID(); |
| if (jobManager.checkJobActive(jobID) == false) |
| // Recycle; let these documents be requeued and go get the next set. |
| continue; |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Worker thread received "+Integer.toString(qds.getCount())+" documents"); |
| |
| // Build a basic pipeline specification right off; we need it whenever |
| // we interact with Incremental Ingester. |
| IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job); |
| String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineSpecificationBasic); |
| // Compute a parameter version string for all documents in this job |
| String newParameterVersion = packParameters(job.getForcedMetadata()); |
| |
| // Universal job data we'll need later |
| String connectionName = job.getConnectionName(); |
| DocumentSpecification spec = job.getSpecification(); |
| int jobType = job.getType(); |
| |
| IRepositoryConnection connection = qds.getConnection(); |
| |
| OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr); |
| |
| // The flow through this section of the code is as follows. |
| // (1) We start with a list of documents |
| // (2) We attempt to do various things to these documents |
| // (3) Based on what happens, and what errors we get, we progressively move documents out of the main list |
| // and into secondary lists that will be all treated in the same way |
| |
| // First, initialize the active document set to contain everything. |
| List<QueuedDocument> activeDocuments = new ArrayList<QueuedDocument>(qds.getCount()); |
| |
| for (int i = 0; i < qds.getCount(); i++) |
| { |
| QueuedDocument qd = qds.getDocument(i); |
| activeDocuments.add(qd); |
| } |
| |
| // Clear out all of our disposition lists |
| finishList.clear(); |
| deleteList.clear(); |
| ingesterCheckList.clear(); |
| hopcountremoveList.clear(); |
| rescanList.clear(); // jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1); |
| abortOnFail = null; |
| |
| // Keep track of the starting processing time, for statistics calculation |
| long processingStartTime = System.currentTimeMillis(); |
| // Log these documents in the overlap calculator |
| qds.beginProcessing(queueTracker); |
| try |
| { |
| long currentTime = System.currentTimeMillis(); |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Worker thread starting document count is "+Integer.toString(activeDocuments.size())); |
| |
| // Get the legal link types. This is needed for later hopcount checking. |
| String[] legalLinkTypes = null; |
| if (activeDocuments.size() > 0) |
| { |
| legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName()); |
| // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption. |
| if (legalLinkTypes == null) |
| { |
| // Failure here puts all remaining documents into rescan list |
| moveList(activeDocuments,rescanList); |
| } |
| } |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug(" Post-linktype document count is "+Integer.toString(activeDocuments.size())); |
| |
| // Do the hopcount checks, if any. This will iteratively reduce the viable list of |
| // document identifiers in need of having their versions fetched. |
| if (legalLinkTypes != null && activeDocuments.size() > 0) |
| { |
| // Set up the current ID array |
| String[] currentDocIDHashArray = new String[activeDocuments.size()]; |
| for (int i = 0; i < currentDocIDHashArray.length; i++) |
| { |
| currentDocIDHashArray[i] = activeDocuments.get(i).getDocumentDescription().getDocumentIdentifierHash(); |
| } |
| Map filterMap = job.getHopCountFilters(); |
| Iterator filterIter = filterMap.keySet().iterator(); |
| // Array to accumulate hopcount results for all link types |
| boolean[] overallResults = new boolean[currentDocIDHashArray.length]; |
| for (int i = 0; i < overallResults.length; i++) |
| { |
| overallResults[i] = true; |
| } |
| // Calculate the hopcount result for each link type, and fold it in. |
| while (filterIter.hasNext()) |
| { |
| String linkType = (String)filterIter.next(); |
| int maxHop = (int)((Long)filterMap.get(linkType)).longValue(); |
| boolean[] results = jobManager.findHopCounts(job.getID(),legalLinkTypes,currentDocIDHashArray,linkType, |
| maxHop,job.getHopcountMode()); |
| for (int i = 0; i < results.length; i++) |
| { |
| overallResults[i] = overallResults[i] && results[i]; |
| } |
| } |
| // Move all documents to the appropriate list |
| List<QueuedDocument> newActiveSet = new ArrayList<QueuedDocument>(activeDocuments.size()); |
| for (int i = 0; i < overallResults.length; i++) |
| { |
| if (overallResults[i] == false) |
| { |
| hopcountremoveList.add(activeDocuments.get(i)); |
| } |
| else |
| { |
| newActiveSet.add(activeDocuments.get(i)); |
| } |
| } |
| activeDocuments = newActiveSet; |
| } |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug(" Post-hopcount pruned document count is "+Integer.toString(activeDocuments.size())); |
| |
| // From here on down we need a connector instance, so get one. |
| IRepositoryConnector connector = null; |
| if (activeDocuments.size() > 0 || hopcountremoveList.size() > 0) |
| { |
| connector = repositoryConnectorPool.grab(connection); |
| |
| // If we wind up with a null here, it means that a document got queued for a connector which is now gone. |
| // Basically, what we want to do in that case is to treat this kind of like a service interruption - the document |
| // must be requeued for immediate reprocessing. When the rest of the world figures out that the job that owns this |
| // document is in fact unable to function, we'll stop getting such documents handed to us, because the state of the |
| // job will be changed. |
| |
| if (connector == null) |
| { |
| // Failure here puts all remaining documents into rescan list |
| moveList(activeDocuments,rescanList); |
| moveList(hopcountremoveList,rescanList); |
| } |
| } |
| |
| if (connector != null) |
| { |
| // Open try/finally block to free the connector instance no matter what |
| try |
| { |
| // Check for interruption before we start fetching |
| if (Thread.currentThread().isInterrupted()) |
| throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); |
| |
| // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process. |
| // We put this in a map so it can be looked up by document identifier. |
| // Create a full PipelineSpecification, including description strings. (This is per-job still, but can throw ServiceInterruptions, so we do it in here.) |
| IPipelineSpecification pipelineSpecification; |
| try |
| { |
| pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester); |
| } |
| catch (ServiceInterruption e) |
| { |
| // Handle service interruption from pipeline |
| if (!e.jobInactiveAbort()) |
| Logging.jobs.warn("Service interruption reported for job "+ |
| job.getID()+" connection '"+job.getConnectionName()+"': "+ |
| e.getMessage()); |
| |
| // All documents get requeued, because we never got far enough to make distinctions. All we have to decide |
| // is whether to requeue or abort. |
| List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>(); |
| |
| for (QueuedDocument qd : activeDocuments) |
| { |
| DocumentDescription dd = qd.getDocumentDescription(); |
| // Check for hard failure. But no hard failure possible of it's a job inactive abort. |
| if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() || |
| dd.getFailRetryCount() == 0)) |
| { |
| // Treat this as a hard failure. |
| if (e.isAbortOnFail()) |
| { |
| rescanList.add(qd); |
| abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause()); |
| } |
| else |
| { |
| requeueList.add(qd); |
| } |
| } |
| else |
| { |
| requeueList.add(qd); |
| } |
| } |
| |
| requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(), |
| e.getFailRetryCount()); |
| |
| activeDocuments.clear(); |
| pipelineSpecification = null; |
| } |
| |
| if (activeDocuments.size() > 0) |
| { |
| |
| // **** New worker thread code starts here!!! **** |
| |
| IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments); |
| String aclAuthority = connection.getACLAuthority(); |
| if (aclAuthority == null) |
| aclAuthority = ""; |
| boolean isDefaultAuthority = (aclAuthority.length() == 0); |
| |
| // Build the processActivity object |
| Map<String,QueuedDocument> previousDocuments = new HashMap<String,QueuedDocument>(); |
| |
| String[] documentIDs = new String[activeDocuments.size()]; |
| int k = 0; |
| for (QueuedDocument qd : activeDocuments) |
| { |
| previousDocuments.put(qd.getDocumentDescription().getDocumentIdentifierHash(),qd); |
| documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier(); |
| } |
| |
| ProcessActivity activity = new ProcessActivity(job.getID(),processID, |
| threadContext,rt,jobManager,ingester, |
| connectionName,pipelineSpecification, |
| previousDocuments, |
| currentTime, |
| job.getExpiration(), |
| job.getForcedMetadata(), |
| job.getInterval(), |
| job.getMaxInterval(), |
| job.getHopcountMode(), |
| connection,connector,connMgr,legalLinkTypes,ingestLogger, |
| newParameterVersion); |
| try |
| { |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents"); |
| |
| // Now, process in bulk -- catching and handling ServiceInterruptions |
| ServiceInterruption serviceInterruption = null; |
| try |
| { |
| connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority); |
| |
| // Now do everything that the connector might have done if we were not doing it for it. |
| |
| // Right now, that's just getting rid of untouched components. |
| for (QueuedDocument qd : activeDocuments) |
| { |
| String documentIdentifier = qd.getDocumentDescription().getDocumentIdentifier(); |
| if (!activity.wasDocumentAborted(documentIdentifier) && !activity.wasDocumentDeleted(documentIdentifier)) |
| { |
| String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash(); |
| // In order to be able to loop over all the components that the incremental ingester knows about, we need to know |
| // what the FIRST output is. |
| DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic)); |
| if (set != null) |
| { |
| Iterator<String> componentHashes = set.componentIterator(); |
| while (componentHashes.hasNext()) |
| { |
| String componentHash = componentHashes.next(); |
| // Check whether we've indexed or not |
| if (!activity.wasDocumentComponentTouched(documentIdentifier, |
| componentHash)) |
| { |
| // This component must be removed. |
| ingester.documentRemove( |
| pipelineSpecificationBasic, |
| connectionName,documentIdentifierHash,componentHash, |
| ingestLogger); |
| } |
| } |
| } |
| } |
| } |
| |
| // Done with connector functionality! |
| } |
| catch (ServiceInterruption e) |
| { |
| serviceInterruption = e; |
| if (!e.jobInactiveAbort()) |
| Logging.jobs.warn("Service interruption reported for job "+ |
| job.getID()+" connection '"+job.getConnectionName()+"': "+ |
| e.getMessage()); |
| } |
| |
| // Flush remaining references into the database! |
| activity.flush(); |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents"); |
| |
| // Either way, handle the documents we were supposed to process. But if there was a service interruption, |
| // and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally. |
| List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>(); |
| |
| for (QueuedDocument qd : activeDocuments) |
| { |
| // If this document was aborted, then treat it specially. |
| if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier())) |
| { |
| // Special treatment for aborted documents. |
| // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc. |
| // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met). |
| // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an |
| // unconditional requeue. |
| finishList.add(qd); |
| } |
| else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier())) |
| { |
| deleteList.add(qd); |
| } |
| else if (serviceInterruption != null) |
| { |
| |
| // Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document |
| // for references |
| DocumentDescription dd = qd.getDocumentDescription(); |
| // Check for hard failure. But no hard failure possible of it's a job inactive abort. |
| if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() || |
| dd.getFailRetryCount() == 0)) |
| { |
| // Treat this as a hard failure. |
| if (serviceInterruption.isAbortOnFail()) |
| { |
| // Make sure that the job aborts. |
| abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause()); |
| rescanList.add(qd); |
| } |
| else |
| { |
| // Skip the document, rather than failing. |
| // We want this particular document to be not included in the |
| // reprocessing. Therefore, we do the same thing as we would |
| // if we got back a null version. |
| deleteList.add(qd); |
| } |
| } |
| else |
| { |
| // Not a hard failure. Requeue. |
| requeueList.add(qd); |
| } |
| } |
| else |
| finishList.add(qd); |
| |
| // Note whether the document was untouched; if so, update it |
| if (!activity.wasDocumentTouched(qd.getDocumentDescription().getDocumentIdentifier())) |
| { |
| ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash()); |
| } |
| } |
| |
| |
| if (serviceInterruption != null) |
| { |
| // Requeue the documents we've identified as needing to be repeated |
| requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(), |
| serviceInterruption.getFailRetryCount()); |
| } |
| |
| // Note the documents that have been checked but not reingested. This should happen BEFORE we need |
| // the statistics (which are calculated during the finishlist step below) |
| if (ingesterCheckList.size() > 0) |
| { |
| String[] checkClasses = new String[ingesterCheckList.size()]; |
| String[] checkIDs = new String[ingesterCheckList.size()]; |
| for (int i = 0; i < checkIDs.length; i++) |
| { |
| checkClasses[i] = connectionName; |
| checkIDs[i] = ingesterCheckList.get(i); |
| } |
| // This method should exercise reasonable intelligence. If the document has never been indexed, it should detect that |
| // and stop. Otherwise, it should update the statistics accordingly. |
| ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime); |
| } |
| |
| // Process the finish list! |
| if (finishList.size() > 0) |
| { |
| // "Finish" the documents (removing unneeded carrydown info, and compute hopcounts). |
| // This can ONLY be done on fully-completed documents; everything else should be left in a dangling |
| // state (which we know is OK because it will be fixed the next time the document is attempted). |
| String[] documentIDHashes = new String[finishList.size()]; |
| k = 0; |
| for (QueuedDocument qd : finishList) |
| { |
| documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash(); |
| } |
| DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode()); |
| ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime); |
| |
| // In both job types, we have to go through the finishList to figure out what to do with the documents. |
| // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types. |
| switch (job.getType()) |
| { |
| case IJobDescription.TYPE_CONTINUOUS: |
| { |
| // We need to populate timeArray |
| String[] timeIDClasses = new String[finishList.size()]; |
| String[] timeIDHashes = new String[finishList.size()]; |
| for (int i = 0; i < timeIDHashes.length; i++) |
| { |
| QueuedDocument qd = (QueuedDocument)finishList.get(i); |
| DocumentDescription dd = qd.getDocumentDescription(); |
| String documentIDHash = dd.getDocumentIdentifierHash(); |
| timeIDClasses[i] = connectionName; |
| timeIDHashes[i] = documentIDHash; |
| } |
| long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes); |
| Long[] recheckTimeArray = new Long[timeArray.length]; |
| int[] actionArray = new int[timeArray.length]; |
| DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()]; |
| for (int i = 0; i < finishList.size(); i++) |
| { |
| QueuedDocument qd = finishList.get(i); |
| recrawlDocs[i] = qd.getDocumentDescription(); |
| String documentID = recrawlDocs[i].getDocumentIdentifier(); |
| |
| // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else. |
| boolean wasAborted = activity.wasDocumentAborted(documentID); |
| if (wasAborted) |
| { |
| // Requeue for immediate reprocessing |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met"); |
| |
| actionArray[i] = IJobManager.ACTION_RESCAN; |
| recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'. |
| } |
| else |
| { |
| // Calculate the next time to run, or time to expire. |
| |
| // For run time, the formula is to calculate the running avg interval between changes, |
| // add an additional interval (which comes from the job description), |
| // and add that to the current time. |
| // One caveat: we really want to calculate the interval from the last |
| // time change was detected, but this is not implemented yet. |
| long timeAmt = timeArray[i]; |
| // null value indicates never to schedule |
| |
| Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID); |
| Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID); |
| |
| |
| // Merge the two times together. We decide on the action based on the action with the lowest time. |
| if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue())) |
| { |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString()); |
| recheckTimeArray[i] = recrawlTime; |
| actionArray[i] = IJobManager.ACTION_RESCAN; |
| } |
| else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue())) |
| { |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString()); |
| recheckTimeArray[i] = expireTime; |
| actionArray[i] = IJobManager.ACTION_REMOVE; |
| } |
| else |
| { |
| // Default activity if conflict will be rescan |
| if (Logging.scheduling.isDebugEnabled() && recrawlTime != null) |
| Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString()); |
| recheckTimeArray[i] = recrawlTime; |
| actionArray[i] = IJobManager.ACTION_RESCAN; |
| } |
| } |
| } |
| |
| jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray); |
| |
| } |
| break; |
| case IJobDescription.TYPE_SPECIFIED: |
| { |
| // Separate the ones we actually finished from the ones we need to requeue because they were aborted |
| List<DocumentDescription> completedList = new ArrayList<DocumentDescription>(); |
| List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>(); |
| for (QueuedDocument qd : finishList) |
| { |
| DocumentDescription dd = qd.getDocumentDescription(); |
| if (activity.wasDocumentAborted(dd.getDocumentIdentifier())) |
| { |
| // The document was aborted, so put it into the abortedList |
| abortedList.add(dd); |
| } |
| else |
| { |
| // The document was completed. |
| completedList.add(dd); |
| } |
| } |
| |
| // Requeue the ones that must be repeated |
| if (abortedList.size() > 0) |
| { |
| DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()]; |
| Long[] recheckTimeArray = new Long[docDescriptions.length]; |
| int[] actionArray = new int[docDescriptions.length]; |
| for (int i = 0; i < docDescriptions.length; i++) |
| { |
| docDescriptions[i] = abortedList.get(i); |
| recheckTimeArray[i] = new Long(0L); |
| actionArray[i] = IJobManager.ACTION_RESCAN; |
| } |
| |
| jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray); |
| } |
| |
| // Mark the ones completed that were actually completed. |
| if (completedList.size() > 0) |
| { |
| DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()]; |
| for (int i = 0; i < docDescriptions.length; i++) |
| { |
| docDescriptions[i] = completedList.get(i); |
| } |
| |
| jobManager.markDocumentCompletedMultiple(docDescriptions); |
| } |
| } |
| break; |
| default: |
| throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'"); |
| } |
| |
| // Finally, if we're still alive, mark everything we finished as "processed". |
| for (QueuedDocument qd : finishList) |
| { |
| qd.setProcessed(); |
| } |
| } |
| } |
| finally |
| { |
| // Make sure we don't leave any dangling carrydown files |
| activity.discard(); |
| } |
| |
| // Successful processing of the set |
| // We count 'get version' time in the average, so even if we decide not to process a doc |
| // it still counts. |
| queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime); |
| |
| } |
| |
| // Now, handle the delete list |
| processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager, |
| deleteList,ingester, |
| job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime); |
| |
| // Handle hopcount removal |
| processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager, |
| hopcountremoveList,ingester, |
| job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime); |
| |
| } |
| finally |
| { |
| repositoryConnectorPool.release(connection,connector); |
| } |
| |
| } |
| |
| // Handle rescanning |
| for (QueuedDocument qd : rescanList) |
| { |
| jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1); |
| qd.setProcessed(); |
| } |
| |
| } |
| finally |
| { |
| // Note termination of processing of these documents in the overlap calculator |
| qds.endProcessing(queueTracker); |
| } |
| |
| if (abortOnFail != null) |
| throw abortOnFail; |
| |
| } |
| catch (ManifoldCFException e) |
| { |
| if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) |
| break; |
| |
| if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR) |
| throw e; |
| |
| if (jobManager.errorAbort(qds.getJobDescription().getID(),e.getMessage())) |
| { |
| // We eat the exception if there was already one recorded. |
| |
| // An exception occurred in the processing of a set of documents. |
| // Shut the corresponding job down, with an appropriate error |
| Logging.threads.error("Exception tossed: "+e.getMessage(),e); |
| } |
| } |
| finally |
| { |
| // Go through qds and requeue any that aren't closed out in one way or another. This allows the job |
| // to be aborted; no dangling entries are left around. |
| for (int i = 0; i < qds.getCount(); i++) |
| { |
| QueuedDocument qd = qds.getDocument(i); |
| if (!qd.wasProcessed()) |
| { |
| jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1); |
| } |
| } |
| } |
| } |
| catch (ManifoldCFException e) |
| { |
| if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) |
| break; |
| |
| if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR) |
| { |
| // Note the failure, which will cause a reset to occur |
| resetManager.noteEvent(); |
| |
| Logging.threads.error("Worker thread aborting and restarting due to database connection reset: "+e.getMessage(),e); |
| try |
| { |
| // Give the database a chance to catch up/wake up |
| ManifoldCF.sleep(10000L); |
| } |
| catch (InterruptedException se) |
| { |
| break; |
| } |
| continue; |
| } |
| |
| // An exception occurred in the cleanup from another error. |
| // Log the error (but that's all we can do) |
| Logging.threads.error("Exception tossed: "+e.getMessage(),e); |
| |
| } |
| catch (InterruptedException e) |
| { |
| // We're supposed to quit |
| break; |
| } |
| catch (OutOfMemoryError e) |
| { |
| System.err.println("agents process ran out of memory - shutting down"); |
| e.printStackTrace(System.err); |
| System.exit(-200); |
| } |
| catch (Throwable e) |
| { |
| // A more severe error - but stay alive |
| Logging.threads.fatal("Error tossed: "+e.getMessage(),e); |
| } |
| } |
| } |
| catch (Throwable e) |
| { |
| // Severe error on initialization |
| System.err.println("agents process could not start - shutting down"); |
| Logging.threads.fatal("WorkerThread "+id+" initialization error tossed: "+e.getMessage(),e); |
| System.exit(-300); |
| } |
| |
| } |
| |
| /** Compare two sorted collection names lists. |
| */ |
| protected static boolean compareArrays(String[] array1, String[] array2) |
| { |
| if (array1.length != array2.length) |
| return false; |
| int i = 0; |
| while (i < array1.length) |
| { |
| if (!array1[i].equals(array2[i])) |
| return false; |
| i++; |
| } |
| return true; |
| } |
| |
| protected static void moveList(List<QueuedDocument> sourceList, List<QueuedDocument> targetList) |
| { |
| for (int i = 0; i < sourceList.size(); i++) |
| { |
| targetList.add(sourceList.get(i)); |
| } |
| sourceList.clear(); |
| } |
| |
| /** Mark specified documents as 'hopcount removed', and remove them from the |
| * index. Documents in this state are presumed to have: |
| * (a) nothing in the index |
| * (b) no intrinsic links for which they are the origin |
| * In order to guarantee this situation, this method must be capable of doing much |
| * of what the deletion method must do. Specifically, it should be capable of deleting |
| * documents from the index should they be already present. |
| */ |
| protected static void processHopcountRemovalLists(IPipelineSpecificationBasic pipelineSpecificationBasic, |
| IRepositoryConnector connector, |
| IRepositoryConnection connection, IJobManager jobManager, |
| List<QueuedDocument> hopcountremoveList, |
| IIncrementalIngester ingester, |
| Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger, |
| int hopcountMethod, IReprioritizationTracker rt, long currentTime) |
| throws ManifoldCFException |
| { |
| // Remove from index |
| hopcountremoveList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger); |
| // Mark as 'hopcountremoved' in the job queue |
| processJobQueueHopcountRemovals(hopcountremoveList,connector,connection, |
| jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime); |
| } |
| |
| /** Clear specified documents out of the job queue and from the appliance. |
| *@param pipelineSpecificationBasic is the basic pipeline specification for this job. |
| *@param jobManager is the job manager. |
| *@param deleteList is a list of QueuedDocument objects to clean out. |
| *@param ingester is the handle to the incremental ingestion API control object. |
| *@param ingesterDeleteList is a list of document id's to delete. |
| */ |
| protected static void processDeleteLists(IPipelineSpecificationBasic pipelineSpecificationBasic, |
| IRepositoryConnector connector, |
| IRepositoryConnection connection, IJobManager jobManager, |
| List<QueuedDocument> deleteList, |
| IIncrementalIngester ingester, |
| Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger, |
| int hopcountMethod, IReprioritizationTracker rt, long currentTime) |
| throws ManifoldCFException |
| { |
| // Remove from index |
| deleteList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,deleteList,ingester,ingestLogger); |
| // Delete from the job queue |
| processJobQueueDeletions(deleteList,connector,connection, |
| jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime); |
| } |
| |
| /** Remove a specified set of documents from the index. |
| *@return the list of documents whose state needs to be updated in jobqueue. |
| */ |
| protected static List<QueuedDocument> removeFromIndex(IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList, |
| IIncrementalIngester ingester, OutputActivity ingestLogger) |
| throws ManifoldCFException |
| { |
| List<String> ingesterDeleteList = new ArrayList<String>(deleteList.size()); |
| for (int i = 0; i < deleteList.size(); i++) |
| { |
| QueuedDocument qd = deleteList.get(i); |
| // See if we need to delete from index |
| if (qd.anyLastIngestedRecords()) |
| { |
| // Queue up to issue deletion |
| ingesterDeleteList.add(qd.getDocumentDescription().getDocumentIdentifierHash()); |
| } |
| } |
| |
| // First, do the ingester delete list. This guarantees that if the ingestion system is down, this operation will be handled atomically. |
| if (ingesterDeleteList.size() > 0) |
| { |
| String[] deleteClasses = new String[ingesterDeleteList.size()]; |
| String[] deleteIDs = new String[ingesterDeleteList.size()]; |
| |
| for (int i = 0; i < ingesterDeleteList.size(); i++) |
| { |
| deleteClasses[i] = connectionName; |
| deleteIDs[i] = ingesterDeleteList.get(i); |
| } |
| |
| // Try to delete the documents via the output connection. |
| try |
| { |
| ingester.documentDeleteMultiple(pipelineSpecificationBasic,deleteClasses,deleteIDs,ingestLogger); |
| } |
| catch (ServiceInterruption e) |
| { |
| // It looks like the output connection is not currently functioning, so we need to requeue instead of deleting |
| // those documents that could not be removed. |
| List<QueuedDocument> newDeleteList = new ArrayList<QueuedDocument>(); |
| List<QueuedDocument> newRequeueList = new ArrayList<QueuedDocument>(); |
| |
| Set<String> ingesterSet = new HashSet<String>(); |
| for (int j = 0 ; j < ingesterDeleteList.size() ; j++) |
| { |
| String id = ingesterDeleteList.get(j); |
| ingesterSet.add(id); |
| } |
| for (int j = 0 ; j < deleteList.size() ; j++) |
| { |
| QueuedDocument qd = deleteList.get(j); |
| DocumentDescription dd = qd.getDocumentDescription(); |
| String documentIdentifierHash = dd.getDocumentIdentifierHash(); |
| if (ingesterSet.contains(documentIdentifierHash)) |
| newRequeueList.add(qd); |
| else |
| newDeleteList.add(qd); |
| } |
| |
| // Requeue those that are supposed to be requeued |
| requeueDocuments(jobManager,newRequeueList,e.getRetryTime(),e.getFailTime(), |
| e.getFailRetryCount()); |
| |
| // Process the ones that are just new job queue changes |
| deleteList = newDeleteList; |
| } |
| } |
| return deleteList; |
| } |
| |
| /** Process job queue deletions. Either the indexer has already been updated, or it is not necessary to update it. |
| */ |
| protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList, |
| IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, |
| Long jobID, String[] legalLinkTypes, int hopcountMethod, IReprioritizationTracker rt, long currentTime) |
| throws ManifoldCFException |
| { |
| // Now, do the document queue cleanup for deletions. |
| if (jobmanagerDeleteList.size() > 0) |
| { |
| DocumentDescription[] deleteDescriptions = new DocumentDescription[jobmanagerDeleteList.size()]; |
| for (int i = 0; i < deleteDescriptions.length; i++) |
| { |
| QueuedDocument qd = jobmanagerDeleteList.get(i); |
| deleteDescriptions[i] = qd.getDocumentDescription(); |
| } |
| |
| // Do the actual work. |
| DocumentDescription[] requeueCandidates = jobManager.markDocumentDeletedMultiple(jobID,legalLinkTypes,deleteDescriptions,hopcountMethod); |
| |
| // Requeue those documents that had carrydown data modifications |
| ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, |
| requeueCandidates,connector,connection,rt,currentTime); |
| |
| // Mark all these as done |
| for (int i = 0; i < jobmanagerDeleteList.size(); i++) |
| { |
| QueuedDocument qd = jobmanagerDeleteList.get(i); |
| qd.setProcessed(); |
| } |
| } |
| } |
| |
| /** Process job queue hopcount removals. All indexer updates have already taken place. |
| */ |
| protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList, |
| IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, |
| Long jobID, String[] legalLinkTypes, int hopcountMethod, IReprioritizationTracker rt, long currentTime) |
| throws ManifoldCFException |
| { |
| // Now, do the document queue cleanup for deletions. |
| if (jobmanagerRemovalList.size() > 0) |
| { |
| DocumentDescription[] removalDescriptions = new DocumentDescription[jobmanagerRemovalList.size()]; |
| for (int i = 0; i < removalDescriptions.length; i++) |
| { |
| QueuedDocument qd = jobmanagerRemovalList.get(i); |
| removalDescriptions[i] = qd.getDocumentDescription(); |
| } |
| |
| // Do the actual work. |
| DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod); |
| |
| // Requeue those documents that had carrydown data modifications |
| ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, |
| requeueCandidates,connector,connection,rt,currentTime); |
| |
| // Mark all these as done |
| for (QueuedDocument qd : jobmanagerRemovalList) |
| { |
| qd.setProcessed(); |
| } |
| } |
| } |
| |
| /** Requeue documents after a service interruption was detected. |
| *@param jobManager is the job manager object. |
| *@param requeueList is a list of QueuedDocument objects describing what needs to be requeued. |
| *@param retryTime is the time that the first retry ought to be scheduled for. |
| *@param failTime is the time beyond which retries lead to hard failure. |
| *@param failCount is the number of retries allowed until hard failure. |
| */ |
| protected static void requeueDocuments(IJobManager jobManager, List<QueuedDocument> requeueList, long retryTime, long failTime, int failCount) |
| throws ManifoldCFException |
| { |
| if (requeueList.size() > 0) |
| { |
| DocumentDescription[] requeueDocs = new DocumentDescription[requeueList.size()]; |
| |
| for (int i = 0; i < requeueDocs.length; i++) |
| { |
| QueuedDocument qd = requeueList.get(i); |
| DocumentDescription dd = qd.getDocumentDescription(); |
| requeueDocs[i] = dd; |
| } |
| |
| jobManager.resetDocumentMultiple(requeueDocs,retryTime,IJobManager.ACTION_RESCAN,failTime,failCount); |
| |
| for (QueuedDocument qd : requeueList) |
| { |
| qd.setProcessed(); |
| } |
| } |
| } |
| |
| /** Another stuffer for packing lists of variable length */ |
| protected static void packList(StringBuilder output, String[] values, char delimiter) |
| { |
| pack(output,Integer.toString(values.length),delimiter); |
| int i = 0; |
| while (i < values.length) |
| { |
| pack(output,values[i++],delimiter); |
| } |
| } |
| |
| protected static String packParameters(Map<String,Set<String>> forcedParameters) |
| { |
| StringBuilder sb = new StringBuilder(); |
| String[] paramNames = new String[forcedParameters.size()]; |
| int i = 0; |
| for (String paramName : forcedParameters.keySet()) |
| { |
| paramNames[i++] = paramName; |
| } |
| java.util.Arrays.sort(paramNames); |
| for (String paramName : paramNames) |
| { |
| Set<String> values = forcedParameters.get(paramName); |
| String[] paramValues = new String[values.size()]; |
| i = 0; |
| for (String paramValue : values) |
| { |
| paramValues[i++] = paramValue; |
| } |
| java.util.Arrays.sort(paramValues); |
| for (String paramValue : paramValues) |
| { |
| pack(sb,paramName,'+'); |
| pack(sb,paramValue,'+'); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| protected static void pack(StringBuilder sb, String value, char delim) |
| { |
| for (int i = 0; i < value.length(); i++) |
| { |
| char x = value.charAt(i); |
| if (x == delim || x == '\\') |
| { |
| sb.append('\\'); |
| } |
| sb.append(x); |
| } |
| sb.append(delim); |
| } |
| |
| /** The maximum number of adds that happen in a single transaction */ |
| protected static final int MAX_ADDS_IN_TRANSACTION = 20; |
| |
| // Nested classes |
| |
| /** Process activity class wraps access to the ingester and job queue. |
| */ |
| protected static class ProcessActivity implements IProcessActivity |
| { |
| // Member variables |
| protected final Long jobID; |
| protected final String processID; |
| protected final IThreadContext threadContext; |
| protected final IJobManager jobManager; |
| protected final IIncrementalIngester ingester; |
| protected final String connectionName; |
| protected final IPipelineSpecification pipelineSpecification; |
| protected final Map<String,QueuedDocument> previousDocuments; |
| protected final long currentTime; |
| protected final Long expireInterval; |
| protected final Map<String,Set<String>> forcedMetadata; |
| protected final Long recrawlInterval; |
| protected final Long maxInterval; |
| protected final int hopcountMode; |
| protected final IRepositoryConnection connection; |
| protected final IRepositoryConnector connector; |
| protected final IRepositoryConnectionManager connMgr; |
| protected final String[] legalLinkTypes; |
| protected final OutputActivity ingestLogger; |
| protected final IReprioritizationTracker rt; |
| protected final String parameterVersion; |
| |
| // We submit references in bulk, because that's way more efficient. |
| protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>(); |
| |
| // Keep track of lower and upper reschedule bounds separately. Contains a Long and is keyed by a document identifier. |
| protected final Map<String,Long> lowerRescheduleBounds = new HashMap<String,Long>(); |
| protected final Map<String,Long> upperRescheduleBounds = new HashMap<String,Long>(); |
| protected final Map<String,Long> lowerExpireBounds = new HashMap<String,Long>(); |
| protected final Map<String,Long> upperExpireBounds = new HashMap<String,Long>(); |
| |
| // Origination times |
| protected final Map<String,Long> originationTimes = new HashMap<String,Long>(); |
| |
| // Whether the document was aborted or not |
| protected final Set<String> abortSet = new HashSet<String>(); |
| |
| // Whether the document was touched or not |
| protected final Set<String> touchedSet = new HashSet<String>(); |
| |
| // Whether document was deleted |
| protected final Set<String> documentDeletedSet = new HashSet<String>(); |
| |
| // Whether a component was touched or not, keyed by document identifier. |
| // This does not include primary document. The set is keyed by component id hash. |
| protected final Map<String,Set<String>> touchedComponentSet = new HashMap<String,Set<String>>(); |
| |
| /** Constructor. |
| *@param jobManager is the job manager |
| *@param ingester is the ingester |
| */ |
| public ProcessActivity(Long jobID, String processID, |
| IThreadContext threadContext, |
| IReprioritizationTracker rt, IJobManager jobManager, |
| IIncrementalIngester ingester, |
| String connectionName, |
| IPipelineSpecification pipelineSpecification, |
| Map<String,QueuedDocument> previousDocuments, |
| long currentTime, |
| Long expireInterval, |
| Map<String,Set<String>> forcedMetadata, |
| Long recrawlInterval, |
| Long maxInterval, |
| int hopcountMode, |
| IRepositoryConnection connection, IRepositoryConnector connector, |
| IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger, |
| String parameterVersion) |
| { |
| this.jobID = jobID; |
| this.processID = processID; |
| this.threadContext = threadContext; |
| this.rt = rt; |
| this.jobManager = jobManager; |
| this.ingester = ingester; |
| this.connectionName = connectionName; |
| this.pipelineSpecification = pipelineSpecification; |
| this.previousDocuments = previousDocuments; |
| this.currentTime = currentTime; |
| this.expireInterval = expireInterval; |
| this.forcedMetadata = forcedMetadata; |
| this.recrawlInterval = recrawlInterval; |
| this.maxInterval = maxInterval; |
| this.hopcountMode = hopcountMode; |
| this.connection = connection; |
| this.connector = connector; |
| this.connMgr = connMgr; |
| this.legalLinkTypes = legalLinkTypes; |
| this.ingestLogger = ingestLogger; |
| this.parameterVersion = parameterVersion; |
| } |
| |
| /** Clean up any dangling information, before abandoning this process activity object */ |
| public void discard() |
| throws ManifoldCFException |
| { |
| for (DocumentReference dr : referenceList.keySet()) |
| { |
| dr.discard(); |
| } |
| referenceList.clear(); |
| } |
| |
| /** Check whether a document (and its version string) was touched or not. |
| */ |
| public boolean wasDocumentTouched(String documentIdentifier) |
| { |
| return touchedSet.contains(documentIdentifier); |
| } |
| |
| /** Check whether a document component was touched or not. |
| */ |
| public boolean wasDocumentComponentTouched(String documentIdentifier, |
| String componentIdentifierHash) |
| { |
| Set<String> components = touchedComponentSet.get(documentIdentifier); |
| if (components == null) |
| return false; |
| return components.contains(componentIdentifierHash); |
| } |
| |
| /** Check whether document was deleted or not. |
| */ |
| public boolean wasDocumentDeleted(String documentIdentifier) |
| { |
| return documentDeletedSet.contains(documentIdentifier); |
| } |
| |
| /** Check whether a document was aborted or not. |
| */ |
| public boolean wasDocumentAborted(String documentIdentifier) |
| { |
| return abortSet.contains(documentIdentifier); |
| } |
| |
| /** Check if a document needs to be reindexed, based on a computed version string. |
| * Call this method to determine whether reindexing is necessary. Pass in a newly-computed version |
| * string. This method will return "true" if the document needs to be re-indexed. |
| *@param documentIdentifier is the document identifier. |
| *@param newVersionString is the newly-computed version string. |
| *@return true if the document needs to be reindexed. |
| */ |
| @Override |
| public boolean checkDocumentNeedsReindexing(String documentIdentifier, |
| String newVersionString) |
| throws ManifoldCFException |
| { |
| return checkDocumentNeedsReindexing(documentIdentifier,null,newVersionString); |
| } |
| |
| /** Check if a document needs to be reindexed, based on a computed version string. |
| * Call this method to determine whether reindexing is necessary. Pass in a newly-computed version |
| * string. This method will return "true" if the document needs to be re-indexed. |
| *@param documentIdentifier is the document identifier. |
| *@param componentIdentifier is the component document identifier, if any. |
| *@param newVersionString is the newly-computed version string. |
| *@return true if the document needs to be reindexed. |
| */ |
| @Override |
| public boolean checkDocumentNeedsReindexing(String documentIdentifier, |
| String componentIdentifier, |
| String newVersionString) |
| throws ManifoldCFException |
| { |
| String documentIdentifierHash = ManifoldCF.hash(documentIdentifier); |
| String componentIdentifierHash = computeComponentIDHash(componentIdentifier); |
| IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash); |
| return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority()); |
| } |
| |
| /** Add a document description to the current job's queue. |
| *@param localIdentifier is the local document identifier to add (for the connector that |
| * fetched the document). |
| *@param parentIdentifier is the document identifier that is considered to be the "parent" |
| * of this identifier. May be null, if no hopcount filtering desired for this kind of relationship. |
| *@param relationshipType is the string describing the kind of relationship described by this |
| * reference. This must be one of the strings returned by the IRepositoryConnector method |
| * "getRelationshipTypes()". May be null. |
| *@param dataNames is the list of carry-down data from the parent to the child. May be null. Each name is limited to 255 characters! |
| *@param dataValues are the values that correspond to the data names in the dataNames parameter. May be null only if dataNames is null. |
| * The type of each object must either be a String, or a CharacterInput. |
| *@param originationTime is the time, in ms since epoch, that the document originated. Pass null if none or unknown. |
| *@param prereqEventNames are the names of the prerequisite events which this document requires prior to processing. Pass null if none. |
| */ |
| @Override |
| public void addDocumentReference(String localIdentifier, String parentIdentifier, String relationshipType, |
| String[] dataNames, Object[][] dataValues, Long originationTime, String[] prereqEventNames) |
| throws ManifoldCFException |
| { |
| String localIdentifierHash = ManifoldCF.hash(localIdentifier); |
| String parentIdentifierHash = null; |
| if (parentIdentifier != null && parentIdentifier.length() > 0) |
| parentIdentifierHash = ManifoldCF.hash(parentIdentifier); |
| |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Adding document reference, from "+((parentIdentifier==null)?"no parent":"'"+parentIdentifier+"'") |
| +" to '"+localIdentifier+"', relationship type "+((relationshipType==null)?"null":"'"+relationshipType+"'") |
| +", with "+((dataNames==null)?"no":Integer.toString(dataNames.length))+" data types, origination time="+((originationTime==null)?"unknown":originationTime.toString())); |
| |
| //Long expireInterval = job.getExpiration(); |
| if (expireInterval != null) |
| { |
| // We should not queue documents that have already expired; it wastes time |
| // So, calculate when this document will expire |
| long currentTime = System.currentTimeMillis(); |
| long expireTime; |
| if (originationTime == null) |
| expireTime = currentTime + expireInterval.longValue(); |
| else |
| expireTime = originationTime.longValue() + expireInterval.longValue(); |
| if (expireTime <= currentTime) |
| { |
| if (Logging.threads.isDebugEnabled()) |
| Logging.threads.debug("Not adding document reference for '"+localIdentifier+"', since it has already expired"); |
| return; |
| } |
| } |
| |
| if (referenceList.size() == MAX_ADDS_IN_TRANSACTION) |
| { |
| // Output what we've got, and reset |
| processDocumentReferences(); |
| } |
| DocumentReference dr = new DocumentReference(localIdentifierHash,localIdentifier,new DocumentBin(parentIdentifierHash,relationshipType)); |
| DocumentReference existingDr = referenceList.get(dr); |
| if (existingDr == null) |
| { |
| referenceList.put(dr,dr); |
| existingDr = dr; |
| } |
| // We can't just keep a reference to the passed-in data values, because if these are files the caller will delete them upon the return of this method. So, for all data values we keep, |
| // make a local copy, and remove the file pointer from the caller's copy. It then becomes the responsibility of the ProcessActivity object to clean up these items when it is discarded. |
| Object[][] savedDataValues; |
| if (dataValues != null) |
| { |
| savedDataValues = new Object[dataValues.length][]; |
| int q = 0; |
| while (q < savedDataValues.length) |
| { |
| Object[] innerArray = dataValues[q]; |
| if (innerArray != null) |
| { |
| savedDataValues[q] = new Object[innerArray.length]; |
| int z = 0; |
| while (z < innerArray.length) |
| { |
| Object innerValue = innerArray[z]; |
| if (innerValue != null) |
| { |
| if (innerValue instanceof CharacterInput) |
| (savedDataValues[q])[z] = ((CharacterInput)innerValue).transfer(); |
| else |
| (savedDataValues[q])[z] = innerValue; |
| } |
| else |
| (savedDataValues[q])[z] = null; |
| z++; |
| } |
| } |
| else |
| savedDataValues[q] = null; |
| q++; |
| } |
| } |
| else |
| savedDataValues = null; |
| |
| existingDr.addData(dataNames,savedDataValues); |
| existingDr.addPrerequisiteEvents(prereqEventNames); |
| } |
| |
| /** Add a document description to the current job's queue. |
| *@param localIdentifier is the local document identifier to add (for the connector that |
| * fetched the document). |
| *@param parentIdentifier is the document identifier that is considered to be the "parent" |
| * of this identifier. May be null, if no hopcount filtering desired for this kind of relationship. |
| *@param relationshipType is the string describing the kind of relationship described by this |
| * reference. This must be one of the strings returned by the IRepositoryConnector method |
| * "getRelationshipTypes()". May be null. |
| *@param dataNames is the list of carry-down data from the parent to the child. May be null. Each name is limited to 255 characters! |
| *@param dataValues are the values that correspond to the data names in the dataNames parameter. May be null only if dataNames is null. |
| *@param originationTime is the time, in ms since epoch, that the document originated. Pass null if none or unknown. |
| */ |
| @Override |
| public void addDocumentReference(String localIdentifier, String parentIdentifier, String relationshipType, |
| String[] dataNames, Object[][] dataValues, Long originationTime) |
| throws ManifoldCFException |
| { |
| addDocumentReference(localIdentifier,parentIdentifier,relationshipType,dataNames,dataValues,originationTime,null); |
| } |
| |
| /** Add a document description to the current job's queue. |
| *@param localIdentifier is the local document identifier to add (for the connector that |
| * fetched the document). |
| *@param parentIdentifier is the document identifier that is considered to be the "parent" |
| * of this identifier. May be null, if no hopcount filtering desired for this kind of relationship. |
| *@param relationshipType is the string describing the kind of relationship described by this |
| * reference. This must be one of the strings returned by the IRepositoryConnector method |
| * "getRelationshipTypes()". May be null. |
| *@param dataNames is the list of carry-down data from the parent to the child. May be null. Each name is limited to 255 characters! |
| *@param dataValues are the values that correspond to the data names in the dataNames parameter. May be null only if dataNames is null. |
| */ |
| @Override |
| public void addDocumentReference(String localIdentifier, String parentIdentifier, String relationshipType, |
| String[] dataNames, Object[][] dataValues) |
| throws ManifoldCFException |
| { |
| addDocumentReference(localIdentifier,parentIdentifier,relationshipType,dataNames,dataValues,null); |
| } |
| |
| /** Add a document description to the current job's queue. |
| *@param localIdentifier is the local document identifier to add (for the connector that |
| * fetched the document). |
| *@param parentIdentifier is the document identifier that is considered to be the "parent" |
| * of this identifier. May be null, if no hopcount filtering desired for this kind of relationship. |
| *@param relationshipType is the string describing the kind of relationship described by this |
| * reference. This must be one of the strings returned by the IRepositoryConnector method |
| * "getRelationshipTypes()". May be null. |
| */ |
| @Override |
| public void addDocumentReference(String localIdentifier, String parentIdentifier, String relationshipType) |
| throws ManifoldCFException |
| { |
| addDocumentReference(localIdentifier,parentIdentifier,relationshipType,null,null); |
| } |
| |
| /** Add a document description to the current job's queue. This method is equivalent to |
| * addDocumentReference(localIdentifier,null,null). |
| *@param localIdentifier is the local document identifier to add (for the connector that |
| * fetched the document). |
| */ |
| @Override |
| public void addDocumentReference(String localIdentifier) |
| throws ManifoldCFException |
| { |
| addDocumentReference(localIdentifier,null,null,null,null); |
| } |
| |
| /** Retrieve data passed from parents to a specified child document. |
| *@param localIdentifier is the document identifier of the document we want the recorded data for. |
| *@param dataName is the name of the data items to retrieve. |
| *@return an array containing the unique data values passed from ALL parents. Note that these are in no particular order, and there will not be any duplicates. |
| */ |
| @Override |
| public String[] retrieveParentData(String localIdentifier, String dataName) |
| throws ManifoldCFException |
| { |
| return jobManager.retrieveParentData(jobID,ManifoldCF.hash(localIdentifier),dataName); |
| } |
| |
| /** Retrieve data passed from parents to a specified child document. |
| *@param localIdentifier is the document identifier of the document we want the recorded data for. |
| *@param dataName is the name of the data items to retrieve. |
| *@return an array containing the unique data values passed from ALL parents. Note that these are in no particular order, and there will not be any duplicates. |
| */ |
| @Override |
| public CharacterInput[] retrieveParentDataAsFiles(String localIdentifier, String dataName) |
| throws ManifoldCFException |
| { |
| return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName); |
| } |
| |
| /** Record a document version, but don't ingest it. |
| *@param documentIdentifier is the document identifier. |
| *@param version is the document version. |
| */ |
| @Override |
| public void recordDocument(String documentIdentifier, String version) |
| throws ManifoldCFException |
| { |
| recordDocument(documentIdentifier,null,version); |
| } |
| |
| /** Record a document version, WITHOUT reindexing it, or removing it. (Other |
| * documents with the same URL, however, will still be removed.) This is |
| * useful if the version string changes but the document contents are known not |
| * to have changed. |
| *@param documentIdentifier is the document identifier. |
| *@param componentIdentifier is the component document identifier, if any. |
| *@param version is the document version. |
| */ |
| @Override |
| public void recordDocument(String documentIdentifier, |
| String componentIdentifier, |
| String version) |
| throws ManifoldCFException |
| { |
| String documentIdentifierHash = ManifoldCF.hash(documentIdentifier); |
| String componentIdentifierHash = computeComponentIDHash(componentIdentifier); |
| ingester.documentRecord( |
| pipelineSpecification.getBasicPipelineSpecification(), |
| connectionName,documentIdentifierHash,componentIdentifierHash, |
| version,currentTime); |
| touchedSet.add(documentIdentifier); |
| touchComponentSet(documentIdentifier,componentIdentifierHash); |
| } |
| |
| /** Ingest the current document. |
| *@param localIdentifier is the document's local identifier. |
| *@param version is the version of the document, as reported by the getDocumentVersions() method of the |
| * corresponding repository connector. |
| *@param documentURI is the URI to use to retrieve this document from the search interface (and is |
| * also the unique key in the index). |
| *@param data is the document data. The data is closed after ingestion is complete. |
| * NOTE: Any data stream IOExceptions will be converted to ManifoldCFExceptions and ServiceInterruptions |
| * according to standard best practices. |
| */ |
| @Override |
| @Deprecated |
| public void ingestDocument(String localIdentifier, String version, String documentURI, RepositoryDocument data) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| try |
| { |
| ingestDocumentWithException(localIdentifier,version,documentURI,data); |
| } |
| catch (IOException e) |
| { |
| handleIOException(e,"fetching"); |
| } |
| } |
| |
| |
| /** Ingest the current document. |
| *@param documentIdentifier is the document's local identifier. |
| *@param version is the version of the document, as reported by the getDocumentVersions() method of the |
| * corresponding repository connector. |
| *@param documentURI is the URI to use to retrieve this document from the search interface (and is |
| * also the unique key in the index). |
| *@param data is the document data. The data is closed after ingestion is complete. |
| *@throws IOException only when data stream reading fails. |
| */ |
| @Override |
| public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| ingestDocumentWithException(documentIdentifier,null,version,documentURI,data); |
| } |
| |
| /** Ingest the current document. |
| *@param documentIdentifier is the document's identifier. |
| *@param componentIdentifier is the component document identifier, if any. |
| *@param version is the version of the document, as reported by the getDocumentVersions() method of the |
| * corresponding repository connector. |
| *@param documentURI is the URI to use to retrieve this document from the search interface (and is |
| * also the unique key in the index). |
| *@param data is the document data. The data is closed after ingestion is complete. |
| *@throws IOException only when data stream reading fails. |
| */ |
| @Override |
| public void ingestDocumentWithException(String documentIdentifier, |
| String componentIdentifier, |
| String version, String documentURI, RepositoryDocument data) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| // We should not get called here if versions agree, unless the repository |
| // connector cannot distinguish between versions - in which case it must |
| // always ingest (essentially) |
| |
| String documentIdentifierHash = ManifoldCF.hash(documentIdentifier); |
| String componentIdentifierHash = computeComponentIDHash(componentIdentifier); |
| |
| if (data != null) |
| { |
| //Map<String,Set<String>> forcedMetadata = job.getForcedMetadata(); |
| |
| // Modify the repository document with forced parameters. |
| for (String paramName : forcedMetadata.keySet()) |
| { |
| Set<String> values = forcedMetadata.get(paramName); |
| String[] paramValues = new String[values.size()]; |
| int j = 0; |
| for (String value : values) |
| { |
| paramValues[j++] = value; |
| } |
| data.addField(paramName,paramValues); |
| } |
| } |
| |
| // First, we need to add into the metadata the stuff from the job description. |
| ingester.documentIngest( |
| computePipelineSpecification(documentIdentifierHash,componentIdentifierHash), |
| connectionName,documentIdentifierHash,componentIdentifierHash, |
| version,parameterVersion, |
| connection.getACLAuthority(), |
| data,currentTime, |
| documentURI, |
| ingestLogger); |
| |
| touchedSet.add(documentIdentifier); |
| touchComponentSet(documentIdentifier,componentIdentifierHash); |
| } |
| |
| /** Remove the specified document from the search engine index, while keeping track of the version information |
| * for it (to reduce churn). |
| *@param documentIdentifier is the document's local identifier. |
| *@param version is the version string to be recorded for the document. |
| */ |
| @Override |
| public void noDocument(String documentIdentifier, String version) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| noDocument(documentIdentifier,null,version); |
| } |
| |
| /** Remove the specified document from the search engine index, and update the |
| * recorded version information for the document. |
| *@param documentIdentifier is the document's local identifier. |
| *@param componentIdentifier is the component document identifier, if any. |
| *@param version is the version string to be recorded for the document. |
| */ |
| @Override |
| public void noDocument(String documentIdentifier, |
| String componentIdentifier, |
| String version) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // Special interpretation for empty version string; treat as if the document doesn't exist |
| // (by ignoring it and allowing it to be deleted later) |
| String documentIdentifierHash = ManifoldCF.hash(documentIdentifier); |
| String componentIdentifierHash = computeComponentIDHash(componentIdentifier); |
| |
| ingester.documentNoData( |
| computePipelineSpecification(documentIdentifierHash,componentIdentifierHash), |
| connectionName,documentIdentifierHash,componentIdentifierHash, |
| version,parameterVersion, |
| connection.getACLAuthority(), |
| currentTime, |
| ingestLogger); |
| |
| touchedSet.add(documentIdentifier); |
| touchComponentSet(documentIdentifier,componentIdentifierHash); |
| } |
| |
| /** Remove the specified document primary component permanently from the search engine index, |
| * and from the status table. Use this method when your document has components and |
| * now also has a primary document, but will not have a primary document again for the foreseeable |
| * future. This is a rare situation. |
| *@param documentIdentifier is the document's identifier. |
| */ |
| @Override |
| public void removeDocument(String documentIdentifier) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| String documentIdentifierHash = ManifoldCF.hash(documentIdentifier); |
| ingester.documentRemove( |
| pipelineSpecification.getBasicPipelineSpecification(), |
| connectionName,documentIdentifierHash,null, |
| ingestLogger); |
| |
| // Note that we touched it, so it won't get checked |
| touchedSet.add(documentIdentifier); |
| } |
| |
| /** Retain existing document component. Use this method to signal that an already-existing |
| * document component does not need to be reindexed. The default behavior is to remove |
| * components that are not mentioned during processing. |
| *@param documentIdentifier is the document's identifier. |
| *@param componentIdentifier is the component document identifier, which cannot be null. |
| */ |
| @Override |
| public void retainDocument(String documentIdentifier, |
| String componentIdentifier) |
| throws ManifoldCFException |
| { |
| touchComponentSet(documentIdentifier,computeComponentIDHash(componentIdentifier)); |
| } |
| |
| |
| /** Delete the current document from the search engine index, while keeping track of the version information |
| * for it (to reduce churn). |
| * Use noDocument() above instead. |
| *@param documentIdentifier is the document's local identifier. |
| *@param version is the version string to be recorded for the document. |
| */ |
| @Override |
| @Deprecated |
| public void deleteDocument(String documentIdentifier, String version) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| noDocument(documentIdentifier,version); |
| } |
| |
| /** Delete the specified document from the search engine index, and from the status table. This |
| * method does NOT keep track of version |
| * information for the document and thus can lead to "churn", whereby the same document is queued, processed, |
| * and removed on subsequent crawls. It is therefore preferable to use noDocument() instead, |
| * in any case where the same decision will need to be made over and over. |
| *@param documentIdentifier is the document's identifier. |
| */ |
| @Override |
| public void deleteDocument(String documentIdentifier) |
| throws ManifoldCFException |
| { |
| documentDeletedSet.add(documentIdentifier); |
| } |
| |
| /** Override the schedule for the next time a document is crawled. |
| * Calling this method allows you to set an upper recrawl bound, lower recrawl bound, upper expire bound, lower expire bound, |
| * or a combination of these, on a specific document. This method is only effective if the job is a continuous one, and if the |
| * identifier you pass in is being processed. |
| *@param localIdentifier is the document's local identifier. |
| *@param lowerRecrawlBoundTime is the time in ms since epoch that the reschedule time should not fall BELOW, or null if none. |
| *@param upperRecrawlBoundTime is the time in ms since epoch that the reschedule time should not rise ABOVE, or null if none. |
| *@param lowerExpireBoundTime is the time in ms since epoch that the expire time should not fall BELOW, or null if none. |
| *@param upperExpireBoundTime is the time in ms since epoch that the expire time should not rise ABOVE, or null if none. |
| */ |
| @Override |
| public void setDocumentScheduleBounds(String localIdentifier, |
| Long lowerRecrawlBoundTime, Long upperRecrawlBoundTime, |
| Long lowerExpireBoundTime, Long upperExpireBoundTime) |
| throws ManifoldCFException |
| { |
| if (lowerRecrawlBoundTime != null) |
| lowerRescheduleBounds.put(localIdentifier,lowerRecrawlBoundTime); |
| else |
| lowerRescheduleBounds.remove(localIdentifier); |
| if (upperRecrawlBoundTime != null) |
| upperRescheduleBounds.put(localIdentifier,upperRecrawlBoundTime); |
| else |
| upperRescheduleBounds.remove(localIdentifier); |
| if (lowerExpireBoundTime != null) |
| lowerExpireBounds.put(localIdentifier,lowerExpireBoundTime); |
| else |
| lowerExpireBounds.remove(localIdentifier); |
| if (upperExpireBoundTime != null) |
| upperExpireBounds.put(localIdentifier,upperExpireBoundTime); |
| else |
| upperExpireBounds.remove(localIdentifier); |
| } |
| |
| /** Override a document's origination time. |
| * Use this method to signal the framework that a document's origination time is something other than the first time it was crawled. |
| *@param localIdentifier is the document's local identifier. |
| *@param originationTime is the document's origination time, or null if unknown. |
| */ |
| @Override |
| public void setDocumentOriginationTime(String localIdentifier, |
| Long originationTime) |
| throws ManifoldCFException |
| { |
| if (originationTime == null) |
| originationTimes.remove(localIdentifier); |
| else |
| originationTimes.put(localIdentifier,originationTime); |
| } |
| |
| /** Find a document's lower rescheduling time bound, if any */ |
| public Long getDocumentRescheduleLowerBoundTime(String localIdentifier) |
| { |
| return lowerRescheduleBounds.get(localIdentifier); |
| } |
| |
| /** Find a document's upper rescheduling time bound, if any */ |
| public Long getDocumentRescheduleUpperBoundTime(String localIdentifier) |
| { |
| return upperRescheduleBounds.get(localIdentifier); |
| } |
| |
| /** Find a document's lower expiration time bound, if any */ |
| public Long getDocumentExpirationLowerBoundTime(String localIdentifier) |
| { |
| return lowerExpireBounds.get(localIdentifier); |
| } |
| |
| /** Find a document's upper expiration time bound, if any */ |
| public Long getDocumentExpirationUpperBoundTime(String localIdentifier) |
| { |
| return upperExpireBounds.get(localIdentifier); |
| } |
| |
| /** Get a document's origination time */ |
| public Long getDocumentOriginationTime(String localIdentifier) |
| { |
| return originationTimes.get(localIdentifier); |
| } |
| |
| public Long calculateDocumentRescheduleTime(long currentTime, long timeAmt, String localIdentifier) |
| { |
| Long recrawlTime = null; |
| //Long recrawlInterval = job.getInterval(); |
| if (recrawlInterval != null) |
| { |
| //Long maxInterval = job.getMaxInterval(); |
| long actualInterval = recrawlInterval.longValue() + timeAmt; |
| if (maxInterval != null && actualInterval > maxInterval.longValue()) |
| actualInterval = maxInterval.longValue(); |
| recrawlTime = new Long(currentTime + actualInterval); |
| } |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug("Default rescan time for document '"+localIdentifier+"' is "+((recrawlTime==null)?"NEVER":recrawlTime.toString())); |
| Long lowerBound = getDocumentRescheduleLowerBoundTime(localIdentifier); |
| if (lowerBound != null) |
| { |
| if (recrawlTime == null || recrawlTime.longValue() < lowerBound.longValue()) |
| { |
| recrawlTime = lowerBound; |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug(" Rescan time overridden for document '"+localIdentifier+"' due to lower bound; new value is "+recrawlTime.toString()); |
| } |
| } |
| Long upperBound = getDocumentRescheduleUpperBoundTime(localIdentifier); |
| if (upperBound != null) |
| { |
| if (recrawlTime == null || recrawlTime.longValue() > upperBound.longValue()) |
| { |
| recrawlTime = upperBound; |
| if (Logging.scheduling.isDebugEnabled()) |
| Logging.scheduling.debug(" Rescan time overridden for document '"+localIdentifier+"' due to upper bound; new value is "+recrawlTime.toString()); |
| } |
| } |
| return recrawlTime; |
| } |
| |
| public Long calculateDocumentExpireTime(long currentTime, String localIdentifier) |
| { |
| // For expire time, we take the document's origination time, plus the expiration interval (which comes from the job). |
| Long originationTime = getDocumentOriginationTime(localIdentifier); |
| if (originationTime == null) |
| originationTime = new Long(currentTime); |
| //Long expireInterval = job.getExpiration(); |
| Long expireTime = null; |
| if (expireInterval != null) |
| expireTime = new Long(originationTime.longValue() + expireInterval.longValue()); |
| Long lowerBound = getDocumentExpirationLowerBoundTime(localIdentifier); |
| if (lowerBound != null) |
| { |
| if (expireTime == null || expireTime.longValue() < lowerBound.longValue()) |
| expireTime = lowerBound; |
| } |
| Long upperBound = getDocumentExpirationUpperBoundTime(localIdentifier); |
| if (upperBound != null) |
| { |
| if (expireTime == null || expireTime.longValue() > upperBound.longValue()) |
| expireTime = upperBound; |
| } |
| return expireTime; |
| } |
| |
| /** Reset the recorded times */ |
| public void resetTimes() |
| { |
| lowerRescheduleBounds.clear(); |
| upperRescheduleBounds.clear(); |
| lowerExpireBounds.clear(); |
| upperExpireBounds.clear(); |
| } |
| |
| /** Record time-stamped information about the activity of the connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityIdentifier is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| *@param childIdentifiers is a set of child entity identifiers associated with this activity. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityIdentifier, String resultCode, String resultDescription, String[] childIdentifiers) |
| throws ManifoldCFException |
| { |
| connMgr.recordHistory(connection.getName(),startTime,activityType,dataSize,entityIdentifier,resultCode, |
| resultDescription,childIdentifiers); |
| } |
| |
| /** Flush the outstanding references into the database. |
| */ |
| public void flush() |
| throws ManifoldCFException |
| { |
| processDocumentReferences(); |
| } |
| |
| /** Process outstanding document references, in batch. |
| */ |
| protected void processDocumentReferences() |
| throws ManifoldCFException |
| { |
| if (referenceList.size() == 0) |
| return; |
| |
| // We have to segregate the references by link type and parent. |
| Map<DocumentBin,List<DocumentReference>> linkBins = new HashMap<DocumentBin,List<DocumentReference>>(); |
| for (DocumentReference dr : referenceList.keySet()) |
| { |
| DocumentBin key = dr.getKey(); |
| List<DocumentReference> set = linkBins.get(key); |
| if (set == null) |
| { |
| set = new ArrayList<DocumentReference>(); |
| linkBins.put(key,set); |
| } |
| set.add(dr); |
| } |
| |
| // Now, go through link types. |
| for (DocumentBin db : linkBins.keySet()) |
| { |
| List<DocumentReference> set = linkBins.get(db); |
| |
| String[] docidHashes = new String[set.size()]; |
| String[] docids = new String[set.size()]; |
| IPriorityCalculator[] priorities = new IPriorityCalculator[set.size()]; |
| String[][] dataNames = new String[docids.length][]; |
| Object[][][] dataValues = new Object[docids.length][][]; |
| String[][] eventNames = new String[docids.length][]; |
| |
| long currentTime = System.currentTimeMillis(); |
| |
| rt.clearPreloadRequests(); |
| for (int j = 0; j < docidHashes.length; j++) |
| { |
| DocumentReference dr = set.get(j); |
| docidHashes[j] = dr.getLocalIdentifierHash(); |
| docids[j] = dr.getLocalIdentifier(); |
| dataNames[j] = dr.getDataNames(); |
| dataValues[j] = dr.getDataValues(); |
| eventNames[j] = dr.getPrerequisiteEventNames(); |
| |
| // Calculate desired document priority based on current queuetracker status. |
| String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier()); |
| PriorityCalculator p = new PriorityCalculator(rt,connection,bins); |
| priorities[j] = p; |
| p.makePreloadRequest(); |
| } |
| rt.preloadBinValues(); |
| |
| jobManager.addDocuments(processID, |
| jobID,legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),hopcountMode, |
| dataNames,dataValues,currentTime,priorities,eventNames); |
| |
| rt.clearPreloadedValues(); |
| } |
| |
| discard(); |
| } |
| |
| /** Check whether current job is still active. |
| * This method is provided to allow an individual connector that needs to wait on some long-term condition to give up waiting due to the job |
| * itself being aborted. If the connector should abort, this method will raise a properly-formed ServiceInterruption, which if thrown to the |
| * caller, will signal that the current processing activity remains incomplete and must be retried when the job is resumed. |
| */ |
| @Override |
| public void checkJobStillActive() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (jobManager.checkJobActive(jobID) == false) |
| throw new ServiceInterruption("Job no longer active",System.currentTimeMillis()); |
| } |
| |
| /** Begin an event sequence. |
| * This method should be called by a connector when a sequencing event should enter the "pending" state. If the event is already in that state, |
| * this method will return false, otherwise true. The connector has the responsibility of appropriately managing sequencing given the response |
| * status. |
| *@param eventName is the event name. |
| *@return false if the event is already in the "pending" state. |
| */ |
| @Override |
| public boolean beginEventSequence(String eventName) |
| throws ManifoldCFException |
| { |
| return jobManager.beginEventSequence(processID,eventName); |
| } |
| |
| /** Complete an event sequence. |
| * This method should be called to signal that an event is no longer in the "pending" state. This can mean that the prerequisite processing is |
| * completed, but it can also mean that prerequisite processing was aborted or cannot be completed. |
| * Note well: This method should not be called unless the connector is CERTAIN that an event is in progress, and that the current thread has |
| * the sole right to complete it. Otherwise, race conditions can develop which would be difficult to diagnose. |
| *@param eventName is the event name. |
| */ |
| @Override |
| public void completeEventSequence(String eventName) |
| throws ManifoldCFException |
| { |
| jobManager.completeEventSequence(eventName); |
| } |
| |
| /** Abort processing a document (for sequencing reasons). |
| * This method should be called in order to cause the specified document to be requeued for later processing. While this is similar in some respects |
| * to the semantics of a ServiceInterruption, it is applicable to only one document at a time, and also does not specify any delay period, since it is |
| * presumed that the reason for the requeue is because of sequencing issues synchronized around an underlying event. |
| *@param localIdentifier is the document identifier to requeue |
| */ |
| @Override |
| public void retryDocumentProcessing(String localIdentifier) |
| throws ManifoldCFException |
| { |
| // Accumulate aborts |
| abortSet.add(localIdentifier); |
| } |
| |
| /** Check whether a mime type is indexable by the currently specified output connector. |
| *@param mimeType is the mime type to check, not including any character set specification. |
| *@return true if the mime type is indexable. |
| */ |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return ingester.checkMimeTypeIndexable( |
| pipelineSpecification,mimeType, |
| ingestLogger); |
| } |
| |
| /** Check whether a document is indexable by the currently specified output connector. |
| *@param localFile is the local copy of the file to check. |
| *@return true if the document is indexable. |
| */ |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return ingester.checkDocumentIndexable( |
| pipelineSpecification,localFile, |
| ingestLogger); |
| } |
| |
| /** Check whether a document of a specified length is indexable by the currently specified output connector. |
| *@param length is the length to check. |
| *@return true if the document is indexable. |
| */ |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return ingester.checkLengthIndexable( |
| pipelineSpecification,length, |
| ingestLogger); |
| } |
| |
| /** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors |
| * to help filter out documents that are not worth indexing. |
| *@param url is the URL of the document. |
| *@return true if the file is indexable. |
| */ |
| @Override |
| public boolean checkURLIndexable(String url) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return ingester.checkURLIndexable( |
| pipelineSpecification,url, |
| ingestLogger); |
| } |
| |
| /** Create a global string from a simple string. |
| *@param simpleString is the simple string. |
| *@return a global string. |
| */ |
| @Override |
| public String createGlobalString(String simpleString) |
| { |
| return ManifoldCF.createGlobalString(simpleString); |
| } |
| |
| /** Create a connection-specific string from a simple string. |
| *@param simpleString is the simple string. |
| *@return a connection-specific string. |
| */ |
| @Override |
| public String createConnectionSpecificString(String simpleString) |
| { |
| return ManifoldCF.createConnectionSpecificString(connection.getName(),simpleString); |
| } |
| |
| /** Create a job-based string from a simple string. |
| *@param simpleString is the simple string. |
| *@return a job-specific string. |
| */ |
| @Override |
| public String createJobSpecificString(String simpleString) |
| { |
| return ManifoldCF.createJobSpecificString(jobID,simpleString); |
| } |
| |
| protected void touchComponentSet(String documentIdentifier, String componentIdentifierHash) |
| { |
| if (componentIdentifierHash == null) |
| return; |
| Set<String> components = touchedComponentSet.get(documentIdentifier); |
| if (components == null) |
| { |
| components = new HashSet<String>(); |
| touchedComponentSet.put(documentIdentifier,components); |
| } |
| components.add(componentIdentifierHash); |
| } |
| |
| protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash, |
| String componentIdentifierHash) |
| { |
| return new PipelineSpecificationWithVersions(pipelineSpecification,previousDocuments.get(documentIdentifierHash),componentIdentifierHash); |
| } |
| |
| } |
| |
| protected static String computeComponentIDHash(String componentIdentifier) |
| throws ManifoldCFException |
| { |
| if (componentIdentifier != null) |
| return ManifoldCF.hash(componentIdentifier); |
| else |
| return null; |
| } |
| |
| /** DocumentBin class */ |
| protected static class DocumentBin |
| { |
| protected String linkType; |
| protected String parentIdentifierHash; |
| |
| public DocumentBin(String parentIdentifierHash, String linkType) |
| { |
| this.parentIdentifierHash = parentIdentifierHash; |
| this.linkType = linkType; |
| } |
| |
| public String getParentIdentifierHash() |
| { |
| return parentIdentifierHash; |
| } |
| |
| public String getLinkType() |
| { |
| return linkType; |
| } |
| |
| public int hashCode() |
| { |
| return ((linkType==null)?0:linkType.hashCode()) + ((parentIdentifierHash==null)?0:parentIdentifierHash.hashCode()); |
| } |
| |
| public boolean equals(Object o) |
| { |
| if (!(o instanceof DocumentBin)) |
| return false; |
| DocumentBin db = (DocumentBin)o; |
| if (linkType == null || db.linkType == null) |
| { |
| if (linkType != db.linkType) |
| return false; |
| } |
| else |
| { |
| if (!linkType.equals(db.linkType)) |
| return false; |
| } |
| if (parentIdentifierHash == null || db.parentIdentifierHash == null) |
| { |
| if (parentIdentifierHash != db.parentIdentifierHash) |
| return false; |
| } |
| else |
| { |
| if (!parentIdentifierHash.equals(db.parentIdentifierHash)) |
| return false; |
| } |
| return true; |
| } |
| |
| } |
| |
| /** Class describing document reference. |
| * Note: If the same document reference occurs multiple times, the data names and values should AGGREGATE, rather than the newer one replacing the older. |
| * Similar treatment will occur for prerequisites, although that's unlikely to be used. |
| */ |
| protected static class DocumentReference |
| { |
| protected String localIdentifierHash; |
| protected String localIdentifier; |
| protected DocumentBin db; |
| /** This hashmap is keyed by data name and has a hashmap as a value (which contains the data values) */ |
| protected HashMap data = new HashMap(); |
| /** This hashmap contains the prerequisite event names */ |
| protected HashMap prereqEvents = new HashMap(); |
| |
| public DocumentReference(String localIdentifierHash, String localIdentifier, DocumentBin db) |
| { |
| this.localIdentifierHash = localIdentifierHash; |
| this.localIdentifier = localIdentifier; |
| this.db = db; |
| } |
| |
| /** Close all object data references. This should be called whenever a DocumentReference object is abandoned. */ |
| public void discard() |
| throws ManifoldCFException |
| { |
| Iterator iter = data.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| String dataName = (String)iter.next(); |
| ArrayList list = (ArrayList)data.get(dataName); |
| int i = 0; |
| while (i < list.size()) |
| { |
| Object o = (Object)list.get(i++); |
| if (o instanceof CharacterInput) |
| ((CharacterInput)o).discard(); |
| } |
| } |
| } |
| |
| public void addData(String[] dataNames, Object[][] dataValues) |
| { |
| if (dataNames == null || dataValues == null) |
| return; |
| int i = 0; |
| while (i < dataNames.length) |
| { |
| addData(dataNames[i],dataValues[i]); |
| i++; |
| } |
| } |
| |
| public void addData(String dataName, Object[] dataValues) |
| { |
| if (dataName == null || dataValues == null) |
| return; |
| int i = 0; |
| while (i < dataValues.length) |
| { |
| addData(dataName,dataValues[i++]); |
| } |
| } |
| |
| public void addData(String dataName, Object dataValue) |
| { |
| if (dataName == null) |
| return; |
| ArrayList valueMap = (ArrayList)data.get(dataName); |
| if (valueMap == null) |
| { |
| valueMap = new ArrayList(); |
| data.put(dataName,valueMap); |
| } |
| // Without the hash value, it's impossible to keep track of value uniqueness in this layer. So, I've removed any attempts to do so; jobManager.addDocuments() |
| // will have to do that job instead. |
| valueMap.add(dataValue); |
| } |
| |
| public void addPrerequisiteEvents(String[] eventNames) |
| { |
| if (eventNames == null) |
| return; |
| int i = 0; |
| while (i < eventNames.length) |
| { |
| addPrerequisiteEvent(eventNames[i++]); |
| } |
| } |
| |
| public void addPrerequisiteEvent(String eventName) |
| { |
| prereqEvents.put(eventName,eventName); |
| } |
| |
| public DocumentBin getKey() |
| { |
| return db; |
| } |
| |
| public String getLocalIdentifierHash() |
| { |
| return localIdentifierHash; |
| } |
| |
| public String getLocalIdentifier() |
| { |
| return localIdentifier; |
| } |
| |
| public String[] getPrerequisiteEventNames() |
| { |
| String[] rval = new String[prereqEvents.size()]; |
| int i = 0; |
| Iterator iter = prereqEvents.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| rval[i++] = (String)iter.next(); |
| } |
| return rval; |
| } |
| |
| public String[] getDataNames() |
| { |
| String[] rval = new String[data.size()]; |
| int i = 0; |
| Iterator iter = data.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| String dataName = (String)iter.next(); |
| rval[i++] = dataName; |
| } |
| return rval; |
| } |
| |
| public Object[][] getDataValues() |
| { |
| // Presumably the values will correspond with the names ONLY if no changes have occurred to the hash table. |
| Object[][] rval = new Object[data.size()][]; |
| int i = 0; |
| Iterator iter = data.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| String dataName = (String)iter.next(); |
| ArrayList values = (ArrayList)data.get(dataName); |
| Object[] valueArray = new Object[values.size()]; |
| rval[i] = valueArray; |
| int j = 0; |
| while (j < valueArray.length) |
| { |
| valueArray[j] = values.get(j); |
| j++; |
| } |
| i++; |
| } |
| return rval; |
| } |
| |
| public boolean equals(Object o) |
| { |
| if (!(o instanceof DocumentReference)) |
| return false; |
| DocumentReference other = (DocumentReference)o; |
| if (!other.localIdentifierHash.equals(localIdentifierHash)) |
| return false; |
| return other.db.equals(db); |
| } |
| |
| public int hashCode() |
| { |
| return localIdentifierHash.hashCode() + db.hashCode(); |
| } |
| } |
| |
| /** Class that represents a decision to process a document. |
| */ |
| protected static class DocumentToProcess |
| { |
| protected QueuedDocument document; |
| protected boolean scanOnly; |
| |
| /** Construct. |
| *@param document is the document to process. |
| *@param scanOnly is true if the document should be scanned, but not ingested. |
| */ |
| public DocumentToProcess(QueuedDocument document, boolean scanOnly) |
| { |
| this.document = document; |
| this.scanOnly = scanOnly; |
| } |
| |
| /** Get the document. |
| *@return the document. |
| */ |
| public QueuedDocument getDocument() |
| { |
| return document; |
| } |
| |
| /** Get the 'scan only' flag. |
| *@return true if only scan should be attempted. |
| */ |
| public boolean getScanOnly() |
| { |
| return scanOnly; |
| } |
| } |
| |
| /** The check activity class */ |
| protected static class CheckActivity implements IOutputCheckActivity |
| { |
| public CheckActivity() |
| { |
| } |
| |
| /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document |
| * in the first place. |
| *@param mimeType is the mime type of the document. |
| *@return true if the mime type can be accepted by the downstream connection. |
| */ |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return false; |
| } |
| |
| /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is |
| * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support |
| * search engines that only handle a small set of accepted file types. |
| *@param localFile is the local file to check. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return false; |
| } |
| |
| /** Pre-determine whether a document's length is acceptable downstream. This method is used |
| * to determine whether to fetch a document in the first place. |
| *@param length is the length of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return false; |
| } |
| |
| /** Pre-determine whether a document's URL is acceptable downstream. This method is used |
| * to help filter out documents that cannot be indexed in advance. |
| *@param url is the URL of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkURLIndexable(String url) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return false; |
| } |
| |
| } |
| |
| /** The implementation of the IExistingVersions interface. |
| */ |
| protected static class ExistingVersions implements IExistingVersions |
| { |
| protected final Map<String,QueuedDocument> map; |
| protected final String lastOutputConnectionName; |
| |
| public ExistingVersions(String lastOutputConnectionName, List<QueuedDocument> list) |
| { |
| this.lastOutputConnectionName = lastOutputConnectionName; |
| this.map = new HashMap<String,QueuedDocument>(); |
| for (QueuedDocument qd : list) |
| { |
| map.put(qd.getDocumentDescription().getDocumentIdentifier(),qd); |
| } |
| } |
| |
| /** Retrieve an existing version string given a document identifier. |
| *@param documentIdentifier is the document identifier. |
| *@return the document version string, or null if the document was never previously indexed. |
| */ |
| @Override |
| public String getIndexedVersionString(String documentIdentifier) |
| throws ManifoldCFException |
| { |
| return getIndexedVersionString(documentIdentifier,null); |
| } |
| |
| /** Retrieve a component existing version string given a document identifier. |
| *@param documentIdentifier is the document identifier. |
| *@param componentIdentifier is the component identifier, if any. |
| *@return the document version string, or null of the document component was never previously indexed. |
| */ |
| @Override |
| public String getIndexedVersionString(String documentIdentifier, String componentIdentifier) |
| throws ManifoldCFException |
| { |
| QueuedDocument qd = map.get(documentIdentifier); |
| DocumentIngestStatusSet status = qd.getLastIngestedStatus(lastOutputConnectionName); |
| if (status == null) |
| return null; |
| String componentIdentifierHash; |
| if (componentIdentifier == null) |
| componentIdentifierHash = null; |
| else |
| componentIdentifierHash = ManifoldCF.hash(componentIdentifier); |
| DocumentIngestStatus s = status.getComponent(componentIdentifierHash); |
| if (s == null) |
| return null; |
| return s.getDocumentVersion(); |
| } |
| |
| } |
| |
| /** The ingest logger class */ |
| protected static class OutputActivity extends CheckActivity implements IOutputActivity |
| { |
| |
| // Connection name |
| protected final String connectionName; |
| // Connection manager |
| protected final IRepositoryConnectionManager connMgr; |
| |
| /** Constructor */ |
| public OutputActivity(String connectionName, IRepositoryConnectionManager connMgr) |
| { |
| this.connectionName = connectionName; |
| this.connMgr = connMgr; |
| } |
| |
| /** Record time-stamped information about the activity of the output connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityURI is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityURI, String resultCode, String resultDescription) |
| throws ManifoldCFException |
| { |
| connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode, |
| resultDescription,null); |
| } |
| |
| /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method |
| * includes the authority name with the access token, if any, so that each authority may establish its own token space. |
| *@param authorityNameString is the name of the authority to use to qualify the access token. |
| *@param accessToken is the raw, repository access token. |
| *@return the properly qualified access token. |
| */ |
| @Override |
| public String qualifyAccessToken(String authorityNameString, String accessToken) |
| throws ManifoldCFException |
| { |
| if (authorityNameString == null) |
| return URLEncoder.encode(accessToken); |
| else |
| return URLEncoder.encode(authorityNameString) + ":" + URLEncoder.encode(accessToken); |
| } |
| |
| /** Send a document via the pipeline to the next output connection. |
| *@param documentURI is the document's URI. |
| *@param document is the document data to be processed (handed to the output data store). |
| *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. |
| *@throws IOException only if there's an IO error reading the data from the document. |
| */ |
| @Override |
| public int sendDocument(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| // No downstream connection at output connection level. |
| return IPipelineConnector.DOCUMENTSTATUS_REJECTED; |
| } |
| |
| /** Send NO document via the pipeline to the next output connection. This is equivalent |
| * to sending an empty document placeholder. |
| */ |
| @Override |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| } |
| |
| } |
| |
| protected final static long interruptionRetryTime = 5L*60L*1000L; |
| protected static void handleIOException(IOException e, String context) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException))) |
| throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED); |
| |
| long currentTime = System.currentTimeMillis(); |
| |
| if (e instanceof java.net.ConnectException) |
| { |
| // Server isn't up at all. Try for a brief time then give up. |
| String message = "Server could not be contacted during "+context+": "+e.getMessage(); |
| Logging.connectors.warn(message,e); |
| throw new ServiceInterruption(message, |
| e, |
| currentTime + interruptionRetryTime, |
| -1L, |
| 3, |
| true); |
| } |
| |
| if (e instanceof java.net.SocketTimeoutException) |
| { |
| String message2 = "Socket timeout exception during "+context+": "+e.getMessage(); |
| Logging.connectors.warn(message2,e); |
| throw new ServiceInterruption(message2, |
| e, |
| currentTime + interruptionRetryTime, |
| currentTime + 20L * 60000L, |
| -1, |
| false); |
| } |
| |
| if (e.getClass().getName().equals("java.net.SocketException")) |
| { |
| // In the past we would have treated this as a straight document rejection, and |
| // treated it in the same manner as a 400. The reasoning is that the server can |
| // perfectly legally send out a 400 and drop the connection immediately thereafter, |
| // this a race condition. |
| // However, Solr 4.0 (or the Jetty version that the example runs on) seems |
| // to have a bug where it drops the connection when two simultaneous documents come in |
| // at the same time. This is the final version of Solr 4.0 so we need to deal with |
| // this. |
| if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 || |
| e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 || |
| e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond") != -1) |
| { |
| // Treat it as a service interruption, but with a limited number of retries. |
| // In that way we won't burden the user with a huge retry interval; it should |
| // give up fairly quickly, and yet NOT give up if the error was merely transient |
| String message = "Server dropped connection during "+context+": "+e.getMessage(); |
| Logging.connectors.warn(message,e); |
| throw new ServiceInterruption(message, |
| e, |
| currentTime + interruptionRetryTime, |
| -1L, |
| 3, |
| false); |
| } |
| |
| // Other socket exceptions are service interruptions - but if we keep getting them, it means |
| // that a socket timeout is probably set too low to accept this particular document. So |
| // we retry for a while, then skip the document. |
| String message2 = "Socket exception during "+context+": "+e.getMessage(); |
| Logging.connectors.warn(message2,e); |
| throw new ServiceInterruption(message2, |
| e, |
| currentTime + interruptionRetryTime, |
| currentTime + 20L * 60000L, |
| -1, |
| false); |
| } |
| |
| // Otherwise, no idea what the trouble is, so presume that retries might fix it. |
| String message3 = "IO exception during "+context+": "+e.getMessage(); |
| Logging.connectors.warn(message3,e); |
| throw new ServiceInterruption(message3, |
| e, |
| currentTime + interruptionRetryTime, |
| currentTime + 2L * 60L * 60000L, |
| -1, |
| true); |
| } |
| |
| } |