blob: b70888c47a632ac1317b5d816b96c13a1197418a [file] [log] [blame]
/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.system;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import java.util.*;
import java.lang.reflect.*;
/** This class represents the start delete thread. This thread's job is to detect when a job is
* ready for deletion, initialize it, and put it into the DELETING state.
*/
public class StartDeleteThread extends Thread
{
public static final String _rcsid = "@(#)$Id$";
/** Delete startup reset manager */
protected static DeleteStartupResetManager resetManager = new DeleteStartupResetManager();
/** Constructor.
*/
public StartDeleteThread()
throws ManifoldCFException
{
super();
setName("Delete startup thread");
setDaemon(true);
}
public void run()
{
resetManager.registerMe();
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
IDBInterface database = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
// Before we begin, conditionally reset
resetManager.waitForReset(threadContext);
// Accumulate the wait before doing the next check.
// We start with 10 seconds, which is the maximum. If there's a service request
// that's faster than that, we'll adjust the time downward.
long waitTime = 10000L;
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Checking for deleting jobs");
// See if there are any starting jobs.
// Note: Since this following call changes the job state, we must be careful to reset it on any kind of failure.
JobDeleteRecord[] deleteJobs = jobManager.getJobsReadyForDelete();
try
{
if (deleteJobs.length == 0)
{
ManifoldCF.sleep(waitTime);
continue;
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Found "+Integer.toString(deleteJobs.length)+" jobs ready to be deleted");
long currentTime = System.currentTimeMillis();
// Loop through jobs
int i = 0;
while (i < deleteJobs.length)
{
JobDeleteRecord jsr = deleteJobs[i++];
Long jobID = jsr.getJobID();
try
{
jobManager.prepareDeleteScan(jobID);
// Start deleting this job!
jobManager.noteJobDeleteStarted(jobID,currentTime);
jsr.noteStarted();
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw new InterruptedException();
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
// We cannot abort the delete startup, but if we fall through, we'll put the job back into
// the state whence it came. So, fall through.
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
}
}
}
finally
{
// Clean up all jobs that did not start
ManifoldCFException exception = null;
int i = 0;
while (i < deleteJobs.length)
{
JobDeleteRecord jsr = deleteJobs[i++];
if (!jsr.wasStarted())
{
// Clean up from failed start.
try
{
jobManager.resetStartDeleteJob(jsr.getJobID());
}
catch (ManifoldCFException e)
{
exception = e;
}
}
}
if (exception != null)
throw exception;
}
// Sleep for the retry interval.
ManifoldCF.sleep(waitTime);
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
resetManager.noteEvent();
Logging.threads.error("Start delete thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// Log it, but keep the thread alive
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
{
// Shut the whole system down!
System.exit(1);
}
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println("agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
System.exit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("StartDeleteThread initialization error tossed: "+e.getMessage(),e);
System.exit(-300);
}
}
/** Class which handles reset for seeding thread pool (of which there's
* typically only one member). The reset action here
* is to move the status of jobs back from "seeding" to normal.
*/
protected static class DeleteStartupResetManager extends ResetManager
{
/** Constructor. */
public DeleteStartupResetManager()
{
super();
}
/** Reset */
protected void performResetLogic(IThreadContext tc)
throws ManifoldCFException
{
IJobManager jobManager = JobManagerFactory.make(tc);
jobManager.resetDeleteStartupWorkerStatus();
}
/** Do the wakeup logic.
*/
protected void performWakeupLogic()
{
}
}
}