blob: e52408c96695c8fa297c3692d4bfff65bdf6b1d9 [file] [log] [blame]
/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.system;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import java.util.*;
import java.lang.reflect.*;
/** This class looks for documents that need to be deleted (as part of an end-of-job cleanup), and
* queues them up for the various document cleanup threads to take care of.
* To do this, this thread performs a query which returns a chunk of results, then queues those
* results. The individual document delete threads will be waiting on the queue.
* Once the queue is full enough, the thread then sleeps until the delete queue is empty again.
*/
public class DocumentCleanupStufferThread extends Thread
{
public static final String _rcsid = "@(#)$Id$";
// Local data
/** This is a reference to the static main document queue */
protected final DocumentCleanupQueue documentCleanupQueue;
/** This is the reset manager */
protected final DocCleanupResetManager resetManager;
/** This is the number of entries we want to stuff at any one time. */
protected final int n;
/** Process ID */
protected final String processID;
/** Constructor.
*@param documentCleanupQueue is the document queue we'll be stuffing.
*@param n is the maximum number of threads that will be doing delete processing.
*/
public DocumentCleanupStufferThread(DocumentCleanupQueue documentCleanupQueue, int n, DocCleanupResetManager resetManager, String processID)
throws ManifoldCFException
{
super();
this.documentCleanupQueue = documentCleanupQueue;
this.n = n;
this.resetManager = resetManager;
this.processID = processID;
setName("Document cleanup stuffer thread");
setDaemon(true);
}
public void run()
{
resetManager.registerMe();
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
ArrayList docList = new ArrayList();
IDBInterface database = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
int deleteChunkSize = database.getMaxInClause();
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
resetManager.waitForReset(threadContext);
// Wait until the delete queue is "empty" (meaning that some delete threads
// can run out of work if we don't act).
if (documentCleanupQueue.checkIfEmpty(n) == false)
{
ManifoldCF.sleep(100L);
continue;
}
Logging.threads.debug("Document cleanup stuffer thread woke up");
long currentTime = System.currentTimeMillis();
// Get a single chunk at a time (but keep going until everything is stuffed)
// This method will set the status of the documents in question
// to "beingcleaned".
DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(processID,deleteChunkSize,currentTime);
DocumentDescription[] descs = documentsToClean.getDocumentSet();
boolean[] removeFromIndex = documentsToClean.getFlags();
// If there are no chunks at all, then we can sleep for a while.
// The theory is that we need to allow stuff to accumulate.
if (descs.length == 0)
{
Logging.threads.debug("Document cleanup stuffer thread found nothing to do");
ManifoldCF.sleep(1000L); // 1 second
continue;
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Document cleanup stuffer thread found "+Integer.toString(descs.length)+" documents");
// Do the stuffing. Each set must be segregated by job, since we need the job ID in the doc set.
Map jobMap = new HashMap();
int k = 0;
while (k < descs.length)
{
CleanupQueuedDocument x = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
Long jobID = descs[k].getJobID();
List y = (List)jobMap.get(jobID);
if (y == null)
{
y = new ArrayList();
jobMap.put(jobID,y);
}
y.add(x);
k++;
}
Iterator iter = jobMap.keySet().iterator();
while (iter.hasNext())
{
Long jobID = (Long)iter.next();
IJobDescription jobDescription = jobManager.load(jobID,true);
List y = (List)jobMap.get(jobID);
CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[y.size()];
k = 0;
while (k < docDescs.length)
{
docDescs[k] = (CleanupQueuedDocument)y.get(k);
k++;
}
DocumentCleanupSet set = new DocumentCleanupSet(docDescs,jobDescription);
documentCleanupQueue.addDocuments(set);
}
// If we don't wait here, the other threads don't have a chance to queue anything else up.
yield();
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
resetManager.noteEvent();
Logging.threads.error("Cleanup stuffer thread aborting and restarting due to database connection reset",e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// Log it, but keep the thread alive
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
{
// Shut the whole system down!
System.exit(1);
}
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println("agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
System.exit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("DocumentCleanupStufferThread initialization error tossed: "+e.getMessage(),e);
System.exit(-300);
}
}
}