blob: a1b6e352b26fd01a269b22b75da7f97566d5ef72 [file] [log] [blame]
/* $Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.system;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import java.util.*;
import java.lang.reflect.*;
/** This class describes a thread whose job it is to continuously reset document priorities, based on recent
* queuing activity. The goal is to evenly distribute queued documents across jobs and within classes of documents
* inside jobs.
* The way it works is for the thread to do some number of pending documents at a time. The documents are fetched in such
* a way as to get the ones that have been least recently assessed preferentially. The assessment process involves
* getting hold of a connector for the job that owns the document, and calculating the priority based on the recent
* history as maintained in the queueTracker object.
*/
public class SetPriorityThread extends Thread
{
public static final String _rcsid = "@(#)$Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $";
// Local data
/** This is the number of documents per cycle */
protected final int cycleCount;
/** The blocking documents object */
protected final BlockingDocuments blockingDocuments;
/** Process ID */
protected final String processID;
/** Constructor.
*/
public SetPriorityThread(int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
throws ManifoldCFException
{
super();
this.blockingDocuments = blockingDocuments;
this.processID = processID;
cycleCount = workerThreadCount * 10;
setName("Set priority thread");
setDaemon(true);
// This thread's priority is highest so that stuffer thread does not go wanting
setPriority(MAX_PRIORITY);
}
public void run()
{
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
Logging.threads.debug("Set priority thread coming up");
// Job description map (local) - designed to improve performance.
// Cleared and reloaded on every batch of documents.
HashMap jobDescriptionMap = new HashMap();
// Repository connection map (local) - designed to improve performance.
// Cleared and reloaded on every batch of documents.
HashMap connectionMap = new HashMap();
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
Logging.threads.debug("Set priority thread woke up");
// We're going to go through all eligible documents. We will pick them up in
// chunks however.
long currentTime = System.currentTimeMillis();
// Note well:
// The priority that is given to a document is based on what priorities have been handed out in the past. It is based
// on the documents that have been queued for action, what job they belong to, the throttling in effect for the job,
// and what the connection says that the document's throttling bins are.
// Periodically, however, QueueTracker deliberately destroys the history. My thinking here is that we need to "start over"
// periodically to guarantee that we respond appropriately to changes in the environment - specifically, starting and
// stopping jobs, changing throttling parameters, etc.
// Clear the job description map and connection map
jobDescriptionMap.clear();
connectionMap.clear();
// Do up to cycleCount documents in a "block". Beyond this number we reset everything and loop back around.
// This allows everything to restart, and higher priority documents can be found again.
int processedCount = 0;
while (true)
{
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
if (processedCount >= cycleCount)
{
Logging.threads.debug("Done reprioritizing because exceeded cycle count");
break;
}
// Cycle through the current list of stuffer-identified documents until we come to the end. Reprioritize these
// first. NOTE: These documents will already have document priorities.
DocumentDescription desc = blockingDocuments.getBlockingDocument();
if (desc != null)
{
ManifoldCF.writeDocumentPriorities(threadContext,
new DocumentDescription[]{desc},connectionMap,jobDescriptionMap);
processedCount++;
continue;
}
// Grab a list of document identifiers to set priority on.
// We may well wind up calculating priority for documents that wind up having their
// state changed before we can write back, but this is okay because update is only
// going to be permitted for rows that still have the right state.
DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(processID,1000);
if (descs.length > 0)
{
ManifoldCF.writeDocumentPriorities(threadContext,
descs,connectionMap,jobDescriptionMap);
processedCount += descs.length;
continue;
}
Logging.threads.debug("Done reprioritizing because no more documents to reprioritize");
ManifoldCF.sleep(5000L);
break;
}
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
Logging.threads.error("Set priority thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// Log it, but keep the thread alive
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
{
System.exit(1);
}
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println("agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
System.exit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("SetPriorityThread initialization error tossed: "+e.getMessage(),e);
System.exit(-300);
}
}
}