blob: 0a775e48d6fe4fb1de6ad8d36139868a882ca7c3 [file] [log] [blame]
/* $Id: JobNotificationThread.java 998081 2010-09-17 11:33:15Z kwright $ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.system;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import java.util.*;
import java.lang.reflect.*;
/** This class represents the thread that notices jobs that have completed their "notify connector" phase, and resets them back to
* inactive.
*/
public class JobNotificationThread extends Thread
{
public static final String _rcsid = "@(#)$Id: JobNotificationThread.java 998081 2010-09-17 11:33:15Z kwright $";
/** Notification reset manager */
protected static NotificationResetManager resetManager = new NotificationResetManager();
/** Constructor.
*/
public JobNotificationThread()
throws ManifoldCFException
{
super();
setName("Job notification thread");
setDaemon(true);
}
public void run()
{
resetManager.registerMe();
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IJobManager jobManager = JobManagerFactory.make(threadContext);
IOutputConnectionManager connectionManager = OutputConnectionManagerFactory.make(threadContext);
IRepositoryConnectionManager repositoryConnectionManager = RepositoryConnectionManagerFactory.make(threadContext);
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
// Before we begin, conditionally reset
resetManager.waitForReset(threadContext);
JobNotifyRecord[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
try
{
HashMap connectionNames = new HashMap();
int k = 0;
while (k < jobsNeedingNotification.length)
{
JobNotifyRecord jsr = jobsNeedingNotification[k++];
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String repositoryConnectionName = job.getConnectionName();
String outputConnectionName = job.getOutputConnectionName();
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
connectionNames.put(c,c);
}
}
// Attempt to notify the specified connections
HashMap notifiedConnections = new HashMap();
Iterator iter = connectionNames.keySet().iterator();
while (iter.hasNext())
{
OutputAndRepositoryConnection connections = (OutputAndRepositoryConnection)iter.next();
String outputConnectionName = connections.getOutputConnectionName();
String repositoryConnectionName = connections.getRepositoryConnectionName();
OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
IOutputConnection connection = connectionManager.load(outputConnectionName);
if (connection != null)
{
// Grab an appropriate connection instance
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector != null)
{
try
{
// Do the notification itself
try
{
connector.noteJobComplete(activity);
}
catch (ServiceInterruption e)
{
Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
continue;
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw e;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
throw e;
// Nothing special; report the error and keep going.
Logging.threads.error(e.getMessage(),e);
}
notifiedConnections.put(connections,connections);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
}
}
// Go through jobs again, and put the notified ones into the inactive state.
k = 0;
while (k < jobsNeedingNotification.length)
{
JobNotifyRecord jsr = jobsNeedingNotification[k++];
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String outputConnectionName = job.getOutputConnectionName();
String repositoryConnectionName = job.getConnectionName();
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
if (notifiedConnections.get(c) != null)
{
// When done, put the job into the Inactive state. Otherwise, the notification will be retried until it succeeds.
jobManager.inactivateJob(jobID);
jsr.noteStarted();
}
}
}
}
finally
{
// Clean up all jobs that did not start
ManifoldCFException exception = null;
int i = 0;
while (i < jobsNeedingNotification.length)
{
JobNotifyRecord jsr = jobsNeedingNotification[i++];
if (!jsr.wasStarted())
{
// Clean up from failed start.
try
{
jobManager.resetNotifyJob(jsr.getJobID());
}
catch (ManifoldCFException e)
{
exception = e;
}
}
}
if (exception != null)
throw exception;
}
ManifoldCF.sleep(10000L);
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
resetManager.noteEvent();
Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// Log it, but keep the thread alive
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
{
// Shut the whole system down!
System.exit(1);
}
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println("agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
System.exit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("JobNotificationThread initialization error tossed: "+e.getMessage(),e);
System.exit(-300);
}
}
/** Output connection/repository connection pair object */
protected static class OutputAndRepositoryConnection
{
protected String outputConnectionName;
protected String repositoryConnectionName;
public OutputAndRepositoryConnection(String outputConnectionName, String repositoryConnectionName)
{
this.outputConnectionName = outputConnectionName;
this.repositoryConnectionName = repositoryConnectionName;
}
public String getOutputConnectionName()
{
return outputConnectionName;
}
public String getRepositoryConnectionName()
{
return repositoryConnectionName;
}
public boolean equals(Object o)
{
if (!(o instanceof OutputAndRepositoryConnection))
return false;
OutputAndRepositoryConnection x = (OutputAndRepositoryConnection)o;
return this.outputConnectionName.equals(x.outputConnectionName) && this.repositoryConnectionName.equals(x.repositoryConnectionName);
}
public int hashCode()
{
return outputConnectionName.hashCode() + repositoryConnectionName.hashCode();
}
}
/** The ingest logger class */
protected static class OutputNotifyActivity implements IOutputNotifyActivity
{
// Connection name
protected String connectionName;
// Connection manager
protected IRepositoryConnectionManager connMgr;
// Output connection name
protected String outputConnectionName;
/** Constructor */
public OutputNotifyActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
{
this.connectionName = connectionName;
this.connMgr = connMgr;
this.outputConnectionName = outputConnectionName;
}
/** Record time-stamped information about the activity of the output connector.
*@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
* activity has an associated time; the startTime field records when the activity began. A null value
* indicates that the start time and the finishing time are the same.
*@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
* used to categorize what kind of activity is being recorded. For example, a web connector might record a
* "fetch document" activity. Cannot be null.
*@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
*@param entityURI is a (possibly long) string which identifies the object involved in the history record.
* The interpretation of this field will differ from connector to connector. May be null.
*@param resultCode contains a terse description of the result of the activity. The description is limited in
* size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
*@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
* described in the resultCode field. This field is not meant to be queried on. May be null.
*/
public void recordActivity(Long startTime, String activityType, Long dataSize,
String entityURI, String resultCode, String resultDescription)
throws ManifoldCFException
{
connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
resultDescription,null);
}
}
/** Class which handles reset for seeding thread pool (of which there's
* typically only one member). The reset action here
* is to move the status of jobs back from "seeding" to normal.
*/
protected static class NotificationResetManager extends ResetManager
{
/** Constructor. */
public NotificationResetManager()
{
super();
}
/** Reset */
protected void performResetLogic(IThreadContext tc)
throws ManifoldCFException
{
IJobManager jobManager = JobManagerFactory.make(tc);
jobManager.resetNotificationWorkerStatus();
}
/** Do the wakeup logic.
*/
protected void performWakeupLogic()
{
}
}
}