blob: 60f0f070c301612f1876dce8aa9013bbb6f94f21 [file] [log] [blame]
/* $Id: JobQueue.java 988245 2010-08-23 18:39:35Z kwright $ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.jobs;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import org.apache.manifoldcf.crawler.system.ManifoldCF;
import java.util.*;
/** This is the job queue manager class. It is responsible for managing the jobqueue database table.
*
* <br><br>
* <b>jobqueue</b>
* <table border="1" cellpadding="3" cellspacing="0">
* <tr class="TableHeadingColor">
* <th>Field</th><th>Type</th><th>Description&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</th>
* <tr><td>id</td><td>BIGINT</td><td>Primary Key</td></tr>
* <tr><td>jobid</td><td>BIGINT</td><td>Reference:jobs.id</td></tr>
* <tr><td>dochash</td><td>VARCHAR(40)</td><td></td></tr>
* <tr><td>docid</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>checktime</td><td>BIGINT</td><td></td></tr>
* <tr><td>failtime</td><td>BIGINT</td><td></td></tr>
* <tr><td>failcount</td><td>BIGINT</td><td></td></tr>
* <tr><td>status</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>isseed</td><td>CHAR(1)</td><td></td></tr>
* <tr><td>docpriority</td><td>FLOAT</td><td></td></tr>
* <tr><td>priorityset</td><td>BIGINT</td><td></td></tr>
* <tr><td>checkaction</td><td>CHAR(1)</td><td></td></tr>
* </table>
* <br><br>
*
*/
public class JobQueue extends org.apache.manifoldcf.core.database.BaseTable
{
public static final String _rcsid = "@(#)$Id: JobQueue.java 988245 2010-08-23 18:39:35Z kwright $";
// Seeding status values
public final static int SEEDSTATUS_NOTSEED = 0;
public final static int SEEDSTATUS_SEED = 1;
public final static int SEEDSTATUS_NEWSEED = 2;
// Status values
public final static int STATUS_PENDING = 0;
public final static int STATUS_ACTIVE = 1;
public final static int STATUS_COMPLETE = 2;
public final static int STATUS_UNCHANGED = 3;
public final static int STATUS_PENDINGPURGATORY = 4;
public final static int STATUS_ACTIVEPURGATORY = 5;
public final static int STATUS_PURGATORY = 6;
public final static int STATUS_BEINGDELETED = 7;
public final static int STATUS_ACTIVENEEDRESCAN = 8;
public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 9;
public final static int STATUS_BEINGCLEANED = 10;
public final static int STATUS_ELIGIBLEFORDELETE = 11;
public final static int STATUS_HOPCOUNTREMOVED = 12;
// Action values
public final static int ACTION_RESCAN = 0;
public final static int ACTION_REMOVE = 1;
// State descriptions are as follows:
// PENDING means a newly-added reference that has not been scanned before.
// ACTIVE means a newly-added reference that is being scanned for the first time.
// COMPLETE means a reference that has been already scanned (and does not need to be
// scanned again for this job session)
// PURGATORY means a reference that was complete before, which means it will need to be deleted if
// it isn't included in this job session)
// PENDINGPURGATORY means a reference that was complete before, but which has been rediscovered in
// this job session, but hasn't been scanned yet
// ACTIVEPURGATORY means a reference that was PENDINGPURGATORY before, and has been picked up by a
// thread for processing
//
// PENDINGPURGATORY and ACTIVEPURGATORY exist in order to allow the system to properly recover from
// an aborted job. On recovery, PENDING and ACTIVE records are deleted (since they were never
// completed), while PENDINGPURGATORY and ACTIVEPURGATORY records are retained but get marked as PURGATORY.
//
// BEINGDELETED means that the document is queued because the owning job is being
// deleted. It exists so that jobs that are active can avoid processing a document until the cleanup
// activity is done.
//
// BEINGCLEANED means that the document is queued because the owning job is in the SHUTTINGDOWN
// state, and the document was never encountered during the crawl.
// Field names
public static final String idField = "id";
public static final String jobIDField = "jobid";
public static final String docHashField = "dochash";
public static final String docIDField = "docid";
public static final String checkTimeField = "checktime";
public static final String statusField = "status";
public static final String failTimeField = "failtime";
public static final String failCountField = "failcount";
public static final String isSeedField = "isseed";
public static final String docPriorityField = "docpriority";
public static final String prioritySetField = "priorityset";
public static final String checkActionField = "checkaction";
public static final double noDocPriorityValue = 1e9;
public static final Double nullDocPriority = new Double(noDocPriorityValue + 1.0);
protected static Map statusMap;
static
{
statusMap = new HashMap();
statusMap.put("P",new Integer(STATUS_PENDING));
statusMap.put("A",new Integer(STATUS_ACTIVE));
statusMap.put("C",new Integer(STATUS_COMPLETE));
statusMap.put("U",new Integer(STATUS_UNCHANGED));
statusMap.put("G",new Integer(STATUS_PENDINGPURGATORY));
statusMap.put("F",new Integer(STATUS_ACTIVEPURGATORY));
statusMap.put("Z",new Integer(STATUS_PURGATORY));
statusMap.put("E",new Integer(STATUS_ELIGIBLEFORDELETE));
statusMap.put("D",new Integer(STATUS_BEINGDELETED));
statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
statusMap.put("H",new Integer(STATUS_HOPCOUNTREMOVED));
}
protected static Map seedstatusMap;
static
{
seedstatusMap = new HashMap();
seedstatusMap.put("F",new Integer(SEEDSTATUS_NOTSEED));
seedstatusMap.put("S",new Integer(SEEDSTATUS_SEED));
seedstatusMap.put("N",new Integer(SEEDSTATUS_NEWSEED));
}
protected static Map actionMap;
static
{
actionMap = new HashMap();
actionMap.put("R",new Integer(ACTION_RESCAN));
actionMap.put("D",new Integer(ACTION_REMOVE));
}
/** Prerequisite event manager */
protected PrereqEventManager prereqEventManager;
/** Thread context */
protected IThreadContext threadContext;
/** Cached getNextDocuments order-by index hint */
protected String getNextDocumentsIndexHint = null;
/** Constructor.
*@param database is the database handle.
*/
public JobQueue(IThreadContext tc, IDBInterface database)
throws ManifoldCFException
{
super(database,"jobqueue");
this.threadContext = tc;
prereqEventManager = new PrereqEventManager(database);
}
/** Install or upgrade.
*/
public void install(String jobsTable, String jobsColumn)
throws ManifoldCFException
{
// Standard practice to use outer loop to allow retry in case of upgrade.
while (true)
{
// Handle schema
Map existing = getTableSchema(null,null);
if (existing == null)
{
HashMap map = new HashMap();
map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
map.put(jobIDField,new ColumnDescription("BIGINT",false,false,jobsTable,jobsColumn,false));
// this is the local document identifier.
map.put(docHashField,new ColumnDescription("VARCHAR(40)",false,false,null,null,false));
map.put(docIDField,new ColumnDescription("LONGTEXT",false,false,null,null,false));
map.put(checkTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(statusField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
map.put(isSeedField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
map.put(docPriorityField,new ColumnDescription("FLOAT",false,true,null,null,false));
map.put(prioritySetField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(checkActionField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
performCreate(map,null);
}
else
{
// Upgrade; null docpriority fields bashed to 'infinity', so they don't slow down MySQL
Map map = new HashMap();
map.put(docPriorityField,nullDocPriority);
performUpdate(map,"WHERE "+docPriorityField+" IS NULL",null,null);
}
// Secondary table installation
prereqEventManager.install(getTableName(),idField);
// Handle indexes
IndexDescription uniqueIndex = new IndexDescription(true,new String[]{docHashField,jobIDField});
IndexDescription jobStatusIndex = new IndexDescription(false,new String[]{jobIDField,statusField});
IndexDescription jobSeedIndex = new IndexDescription(false,new String[]{isSeedField,jobIDField});
IndexDescription failTimeIndex = new IndexDescription(false,new String[]{failTimeField,jobIDField});
IndexDescription actionTimeStatusIndex = new IndexDescription(false,new String[]{statusField,checkActionField,checkTimeField});
IndexDescription prioritysetStatusIndex = new IndexDescription(false,new String[]{statusField,prioritySetField});
// No evidence that the extra fields help at all, for any database...
IndexDescription docpriorityIndex = new IndexDescription(false,new String[]{docPriorityField,statusField,checkActionField,checkTimeField});
// Get rid of unused indexes
Map indexes = getTableIndexes(null,null);
Iterator iter = indexes.keySet().iterator();
while (iter.hasNext())
{
String indexName = (String)iter.next();
IndexDescription id = (IndexDescription)indexes.get(indexName);
if (uniqueIndex != null && id.equals(uniqueIndex))
uniqueIndex = null;
else if (jobStatusIndex != null && id.equals(jobStatusIndex))
jobStatusIndex = null;
else if (jobSeedIndex != null && id.equals(jobSeedIndex))
jobSeedIndex = null;
else if (failTimeIndex != null && id.equals(failTimeIndex))
failTimeIndex = null;
else if (actionTimeStatusIndex != null && id.equals(actionTimeStatusIndex))
actionTimeStatusIndex = null;
else if (prioritysetStatusIndex != null && id.equals(prioritysetStatusIndex))
prioritysetStatusIndex = null;
else if (docpriorityIndex != null && id.equals(docpriorityIndex))
docpriorityIndex = null;
else if (indexName.indexOf("_pkey") == -1)
// This index shouldn't be here; drop it
performRemoveIndex(indexName);
}
// Build missing indexes
if (jobStatusIndex != null)
performAddIndex(null,jobStatusIndex);
if (jobSeedIndex != null)
performAddIndex(null,jobSeedIndex);
if (failTimeIndex != null)
performAddIndex(null,failTimeIndex);
if (actionTimeStatusIndex != null)
performAddIndex(null,actionTimeStatusIndex);
if (prioritysetStatusIndex != null)
performAddIndex(null,prioritysetStatusIndex);
if (docpriorityIndex != null)
performAddIndex(null,docpriorityIndex);
if (uniqueIndex != null)
performAddIndex(null,uniqueIndex);
break;
}
}
/** Get the 'getNextDocuments' index hint.
*/
public String getGetNextDocumentsIndexHint()
throws ManifoldCFException
{
if (getNextDocumentsIndexHint == null)
{
// Figure out what index it is
getNextDocumentsIndexHint = getDBInterface().constructIndexHintClause(getTableName(),
new IndexDescription(false,new String[]{docPriorityField,statusField,checkActionField,checkTimeField}));
}
return getNextDocumentsIndexHint;
}
/** Analyze job tables due to major event */
public void unconditionallyAnalyzeTables()
throws ManifoldCFException
{
long startTime = System.currentTimeMillis();
Logging.perf.debug("Beginning to analyze jobqueue table");
analyzeTable();
Logging.perf.debug("Done analyzing jobqueue table in "+new Long(System.currentTimeMillis()-startTime)+" ms");
}
/** Uninstall.
*/
public void deinstall()
throws ManifoldCFException
{
beginTransaction();
try
{
prereqEventManager.deinstall();
performDrop(null);
}
catch (ManifoldCFException e)
{
signalRollback();
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
}
/** Restart.
* This method should be called at initial startup time. It resets the status of all documents to something
* reasonable, so the jobs can be restarted and work properly to completion.
*/
public void restart()
throws ManifoldCFException
{
// Map ACTIVE back to PENDING.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDING));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVE),
statusToString(STATUS_ACTIVENEEDRESCAN)})});
performUpdate(map,"WHERE "+query,list,null);
// Map ACTIVEPURGATORY to PENDINGPURGATORY
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVEPURGATORY),
statusToString(STATUS_ACTIVENEEDRESCANPURGATORY)})});
performUpdate(map,"WHERE "+query,list,null);
// Map BEINGDELETED to ELIGIBLEFORDELETE
map.put(statusField,statusToString(STATUS_ELIGIBLEFORDELETE));
map.put(checkTimeField,new Long(0L));
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(statusField,statusToString(STATUS_BEINGDELETED))});
performUpdate(map,"WHERE "+query,list,null);
// Map BEINGCLEANED to PURGATORY
map.put(statusField,statusToString(STATUS_PURGATORY));
map.put(checkTimeField,new Long(0L));
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(statusField,statusToString(STATUS_BEINGCLEANED))});
performUpdate(map,"WHERE "+query,list,null);
// Map newseed fields to seed
map.clear();
map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED))});
performUpdate(map,"WHERE "+query,list,null);
// Clear out all failtime fields (since we obviously haven't been retrying whilst we were not
// running)
map.clear();
map.put(failTimeField,null);
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new NullCheckClause(failTimeField,false)});
performUpdate(map,"WHERE "+query,list,null);
// Reindex the jobqueue table, since we've probably made lots of bad tuples doing the above operations.
reindexTable();
unconditionallyAnalyzeTables();
}
/** Flip all records for a job that have status HOPCOUNTREMOVED back to PENDING.
* NOTE: We need to actually schedule these!!! so the following can't really work. ???
*/
public void reactivateHopcountRemovedRecords(Long jobID)
throws ManifoldCFException
{
Map map = new HashMap();
// Map HOPCOUNTREMOVED to PENDING
map.put(statusField,statusToString(STATUS_PENDING));
map.put(checkTimeField,new Long(0L));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
performUpdate(map,"WHERE "+query,list,null);
}
/** Delete all records for a job that have status HOPCOUNTREMOVED.
*/
public void deleteHopcountRemovedRecords(Long jobID)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
performDelete("WHERE "+query,list,null);
}
/** Clear the failtimes for all documents associated with a job.
* This method is called when the system detects that a significant delaying event has occurred,
* and therefore the "failure clock" needs to be reset.
*@param jobID is the job identifier.
*/
public void clearFailTimes(Long jobID)
throws ManifoldCFException
{
Map map = new HashMap();
map.put(failTimeField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new NullCheckClause(failTimeField,false),
new UnitaryClause(jobIDField,jobID)});
performUpdate(map,"WHERE "+query,list,null);
}
/** Reset as part of restoring document worker threads.
* This will get called if something went wrong that could have screwed up the
* status of a worker thread. The threads all die/end, and this method
* resets any active documents back to the right state (waiting for stuffing).
*/
public void resetDocumentWorkerStatus()
throws ManifoldCFException
{
// Map ACTIVE back to PENDING.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDING));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVE),
statusToString(STATUS_ACTIVENEEDRESCAN)})});
performUpdate(map,"WHERE "+query,list,null);
// Map ACTIVEPURGATORY to PENDINGPURGATORY
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVEPURGATORY),
statusToString(STATUS_ACTIVENEEDRESCANPURGATORY)})});
performUpdate(map,"WHERE "+query,list,null);
}
/** Reset doc delete worker status.
*/
public void resetDocDeleteWorkerStatus()
throws ManifoldCFException
{
HashMap map = new HashMap();
// Map BEINGDELETED to ELIGIBLEFORDELETE
map.put(statusField,statusToString(STATUS_ELIGIBLEFORDELETE));
map.put(checkTimeField,new Long(0L));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(statusField,statusToString(STATUS_BEINGDELETED))});
performUpdate(map,"WHERE "+query,list,null);
}
/** Reset doc cleaning worker status.
*/
public void resetDocCleanupWorkerStatus()
throws ManifoldCFException
{
HashMap map = new HashMap();
// Map BEINGCLEANED to PURGATORY
map.put(statusField,statusToString(STATUS_PURGATORY));
map.put(checkTimeField,new Long(0L));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(statusField,statusToString(STATUS_BEINGCLEANED))});
performUpdate(map,"WHERE "+query,list,null);
}
/** Prepare for a job delete pass. This will not be called
* unless the job is in an INACTIVE state.
* Does the following:
* (1) Delete PENDING entries
* (2) Maps PENDINGPURGATORY, PURGATORY, and COMPLETED entries to ELIGIBLEFORDELETE
*@param jobID is the job identifier.
*/
public void prepareDeleteScan(Long jobID)
throws ManifoldCFException
{
// Delete PENDING entries
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause("t0."+jobIDField,jobID),
new MultiClause("t0."+statusField,new Object[]{
statusToString(STATUS_PENDING),
statusToString(STATUS_HOPCOUNTREMOVED)})});
// Clean out prereqevents table first
prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,query,list);
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new MultiClause(statusField,new Object[]{
statusToString(STATUS_PENDING),
statusToString(STATUS_HOPCOUNTREMOVED)})});
performDelete("WHERE "+query,list,null);
// Turn PENDINGPURGATORY, PURGATORY, COMPLETED into ELIGIBLEFORDELETE.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_ELIGIBLEFORDELETE));
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new MultiClause(statusField,new Object[]{
statusToString(STATUS_PENDINGPURGATORY),
statusToString(STATUS_COMPLETE),
statusToString(STATUS_UNCHANGED),
statusToString(STATUS_PURGATORY)})});
performUpdate(map,"WHERE "+query,list,null);
// Not accurate, but best we can do without overhead
noteModifications(0,2,0);
// Do an analyze, otherwise our plans are going to be crap right off the bat
unconditionallyAnalyzeTables();
}
/** Prepare for a "full scan" job. This will not be called
* unless the job is in the "INACTIVE" state.
* This does the following:
* (1) get rid of all PENDING entries.
* (2) map PENDINGPURGATORY entries to PURGATORY.
* (4) map COMPLETED entries to PURGATORY.
*@param jobID is the job identifier.
*/
public void prepareFullScan(Long jobID)
throws ManifoldCFException
{
// Delete PENDING entries
ArrayList list = new ArrayList();
list.add(jobID);
list.add(statusToString(STATUS_PENDING));
// Clean out prereqevents table first
prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,"t0."+jobIDField+"=? AND t0."+statusField+"=?",list);
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new UnitaryClause(statusField,statusToString(STATUS_PENDING))});
performDelete("WHERE "+query,list,null);
// Turn PENDINGPURGATORY and COMPLETED into PURGATORY.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PURGATORY));
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
// Do not reset priorities. This means, of course, that they may be out of date - but they are probably more accurate in their current form
// than being set back to some arbitrary value.
// The alternative, which would be to reprioritize all the documents at this point, is somewhat attractive, but let's see if we can get away
// without for now.
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new MultiClause(statusField,new Object[]{
statusToString(STATUS_PENDINGPURGATORY),
statusToString(STATUS_UNCHANGED),
statusToString(STATUS_COMPLETE)})});
performUpdate(map,"WHERE "+query,list,null);
// Not accurate, but best we can do without overhead
noteModifications(0,2,0);
// Do an analyze, otherwise our plans are going to be crap right off the bat
unconditionallyAnalyzeTables();
}
/** Prepare for a "partial" job. This is called ONLY when the job is inactive.
*
* This method maps all COMPLETE entries to UNCHANGED. The purpose is to
* allow discovery to find the documents that need to be processed. If they were
* marked as COMPLETE that would stop them from being queued.
*@param jobID is the job identifier.
*/
public void preparePartialScan(Long jobID)
throws ManifoldCFException
{
// Map COMPLETE to UNCHANGED.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_UNCHANGED));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new UnitaryClause(statusField,statusToString(STATUS_COMPLETE))});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
// Do an analyze, otherwise our plans are going to be crap right off the bat
unconditionallyAnalyzeTables();
}
/** Prepare for an "incremental" job. This is called ONLY when the job is inactive;
* that is, there should be no ACTIVE or ACTIVEPURGATORY entries at all.
*
* The preparation for starting an incremental job is to requeue all documents that are
* currently in the system that are marked "COMPLETE". These get marked as "PENDINGPURGATORY",
* since the idea is to queue them in such a way that we know they were ingested before.
*@param jobID is the job identifier.
*/
public void prepareIncrementalScan(Long jobID)
throws ManifoldCFException
{
// Map COMPLETE to PENDINGPURGATORY.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new MultiClause(statusField,new Object[]{
statusToString(STATUS_COMPLETE),
statusToString(STATUS_UNCHANGED)})});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
// Do an analyze, otherwise our plans are going to be crap right off the bat
unconditionallyAnalyzeTables();
}
/** Delete ingested document identifiers (as part of deleting the owning job).
* The number of identifiers specified is guaranteed to be less than the maxInClauseCount
* for the database.
*@param identifiers is the set of document identifiers.
*/
public void deleteIngestedDocumentIdentifiers(DocumentDescription[] identifiers)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
int i = 0;
while (i < identifiers.length)
{
list.add(identifiers[i].getID());
i++;
}
doDeletes(list);
noteModifications(0,0,identifiers.length);
}
/** Check if there are any outstanding active documents for a job */
public boolean checkJobBusy(Long jobID)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVE),
statusToString(STATUS_ACTIVEPURGATORY),
statusToString(STATUS_ACTIVENEEDRESCAN),
statusToString(STATUS_ACTIVENEEDRESCANPURGATORY)})});
IResultSet set = performQuery("SELECT "+docHashField+" FROM "+getTableName()+
" WHERE "+query+" "+constructOffsetLimitClause(0,1),list,null,null,1);
return set.getRowCount() > 0;
}
/** For a job deletion: Delete all records for a job.
*@param jobID is the job identifier.
*/
public void deleteAllJobRecords(Long jobID)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
list.add(jobID);
// Clean out prereqevents table first
prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,"t0."+jobIDField+"=?",list);
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID)});
performDelete("WHERE "+query,list,null);
noteModifications(0,0,1);
}
/** Write out a document priority */
public void writeDocPriority(long currentTime, Long rowID, double priority)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(prioritySetField,new Long(currentTime));
map.put(docPriorityField,new Double(priority));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,rowID)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Clear all document priorities for a job */
public void clearDocPriorities(Long jobID)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(prioritySetField,null);
map.put(docPriorityField,nullDocPriority);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the "completed" status for a record.
*/
public void updateCompletedRecord(Long recID, int currentStatus)
throws ManifoldCFException
{
HashMap map = new HashMap();
int newStatus;
String actionFieldValue;
Long checkTimeValue;
switch (currentStatus)
{
case STATUS_ACTIVE:
case STATUS_ACTIVEPURGATORY:
newStatus = STATUS_COMPLETE;
actionFieldValue = null;
checkTimeValue = null;
// Remove document priority; we don't want to pollute the queue. See CONNECTORS-290.
map.put(docPriorityField,nullDocPriority);
map.put(prioritySetField,null);
break;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
newStatus = STATUS_PENDINGPURGATORY;
actionFieldValue = actionToString(ACTION_RESCAN);
checkTimeValue = new Long(0L);
// Leave doc priority unchanged.
break;
default:
throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
}
map.put(statusField,statusToString(newStatus));
map.put(checkTimeField,checkTimeValue);
map.put(checkActionField,actionFieldValue);
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,recID)});
performUpdate(map,"WHERE "+query,list,null);
}
/** Either mark a record as hopcountremoved, or set status to "rescan", depending on the
* record's state.
*/
public boolean updateOrHopcountRemoveRecord(Long recID, int currentStatus)
throws ManifoldCFException
{
HashMap map = new HashMap();
int newStatus;
String actionFieldValue;
Long checkTimeValue;
boolean rval;
switch (currentStatus)
{
case STATUS_ACTIVE:
case STATUS_ACTIVEPURGATORY:
// Mark as hopcountremove
newStatus = STATUS_HOPCOUNTREMOVED;
actionFieldValue = actionToString(ACTION_RESCAN);
checkTimeValue = new Long(0L);
rval = true;
break;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
newStatus = STATUS_PENDINGPURGATORY;
actionFieldValue = actionToString(ACTION_RESCAN);
checkTimeValue = new Long(0L);
rval = false;
// Leave doc priority unchanged.
break;
default:
throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
}
map.put(statusField,statusToString(newStatus));
map.put(checkTimeField,checkTimeValue);
map.put(checkActionField,actionFieldValue);
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,recID)});
performUpdate(map,"WHERE "+query,list,null);
return rval;
}
/** Set the status to active on a record, leaving alone priority or check time.
*@param id is the job queue id.
*@param currentStatus is the current status
*/
public void updateActiveRecord(Long id, int currentStatus)
throws ManifoldCFException
{
int newStatus;
switch (currentStatus)
{
case STATUS_PENDING:
newStatus = STATUS_ACTIVE;
break;
case STATUS_PENDINGPURGATORY:
newStatus = STATUS_ACTIVEPURGATORY;
break;
default:
throw new ManifoldCFException("Unexpected status value for jobqueue record "+id.toString()+"; got "+Integer.toString(currentStatus));
}
HashMap map = new HashMap();
map.put(statusField,statusToString(newStatus));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the status on a record, including check time and priority.
* The status set MUST be a PENDING or PENDINGPURGATORY status.
*@param id is the job queue id.
*@param status is the desired status
*@param checkTime is the check time.
*/
public void setStatus(Long id, int status,
Long checkTime, int action, long failTime, int failCount)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(status));
map.put(checkTimeField,checkTime);
map.put(checkActionField,actionToString(action));
if (failTime == -1L)
map.put(failTimeField,null);
else
map.put(failTimeField,new Long(failTime));
if (failCount == -1)
map.put(failCountField,null);
else
map.put(failCountField,new Long(failCount));
// This does not need to set docPriorityField, because we want to preserve whatever
// priority was in place from before.
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the status of a document to "being deleted".
*/
public void setDeletingStatus(Long id)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_BEINGDELETED));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the status of a document to be "no longer deleting" */
public void setUndeletingStatus(Long id, long checkTime)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_ELIGIBLEFORDELETE));
map.put(checkTimeField,new Long(checkTime));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the status of a document to "being cleaned".
*/
public void setCleaningStatus(Long id)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_BEINGCLEANED));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Set the status of a document to be "no longer cleaning" */
public void setUncleaningStatus(Long id, long checkTime)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PURGATORY));
map.put(checkTimeField,new Long(checkTime));
map.put(checkActionField,null);
map.put(failTimeField,null);
map.put(failCountField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,id)});
performUpdate(map,"WHERE "+query,list,null);
noteModifications(0,1,0);
}
/** Remove multiple records entirely.
*@param ids is the set of job queue id's
*/
public void deleteRecordMultiple(Long[] ids)
throws ManifoldCFException
{
// Delete in chunks
int maxClause = maxClauseDoDeletes();
ArrayList list = new ArrayList();
int j = 0;
int i = 0;
while (i < ids.length)
{
if (j == maxClause)
{
doDeletes(list);
list.clear();
j = 0;
}
list.add(ids[i++]);
j++;
}
if (j > 0)
doDeletes(list);
noteModifications(0,0,ids.length);
}
/** Calculate the number of deletes we can do at once.
*/
protected int maxClauseDoDeletes()
{
return findConjunctionClauseMax(new ClauseDescription[]{});
}
/** Do a batch of deletes.
*/
protected void doDeletes(ArrayList list)
throws ManifoldCFException
{
// Clean out prereqevents table first
prereqEventManager.deleteRows(list);
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(idField,list)});
performDelete("WHERE "+query,newList,null);
}
/** Remove a record entirely.
*@param id is the job queue id.
*/
public void deleteRecord(Long id)
throws ManifoldCFException
{
deleteRecordMultiple(new Long[]{id});
}
/** Update an existing record (as the result of an initial add).
* The record is presumed to exist and have been locked, via "FOR UPDATE".
*/
public boolean updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
long desiredExecuteTime, long currentTime, double desiredPriority, String[] prereqEvents)
throws ManifoldCFException
{
// The general rule here is:
// If doesn't exist, make a PENDING entry.
// If PENDING, keep it as PENDING.
// If COMPLETE, make a PENDING entry.
// If PURGATORY, make a PENDINGPURGATORY entry.
// Leave everything else alone and do nothing.
boolean rval = false;
HashMap map = new HashMap();
switch (currentStatus)
{
case STATUS_ACTIVE:
case STATUS_ACTIVEPURGATORY:
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
case STATUS_BEINGCLEANED:
// These are all the active states. Being in this state implies that a thread may be working on the document. We
// must not interrupt it.
// Initial adds never bring along any carrydown info, so we should be satisfied as long as the record exists.
break;
case STATUS_COMPLETE:
case STATUS_UNCHANGED:
case STATUS_PURGATORY:
// Set the status and time both
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
if (desiredExecuteTime == -1L)
map.put(checkTimeField,new Long(0L));
else
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Update the doc priority.
map.put(docPriorityField,new Double(desiredPriority));
map.put(prioritySetField,new Long(currentTime));
rval = true;
break;
case STATUS_PENDING:
// Bump up the schedule if called for
Long cv = checkTimeValue;
if (cv != null)
{
long currentExecuteTime = cv.longValue();
if ((desiredExecuteTime == -1L ||currentExecuteTime <= desiredExecuteTime))
{
break;
}
}
else
{
if (desiredExecuteTime == -1L)
{
break;
}
}
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// The existing doc priority field should be preserved.
break;
case STATUS_PENDINGPURGATORY:
// In this case we presume that the reason we are in this state is due to adaptive crawling or retry, so DON'T bump up the schedule!
// The existing doc priority field should also be preserved.
break;
default:
break;
}
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
// Delete any existing prereqevent entries first
prereqEventManager.deleteRows(recordID);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,recordID)});
performUpdate(map,"WHERE "+query,list,null);
// Insert prereqevent entries, if any
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(0,1,0);
return rval;
}
/** Insert a new record into the jobqueue table (as part of adding an initial reference).
*
*@param jobID is the job identifier.
*@param docHash is the hash of the local document identifier.
*@param docID is the local document identifier.
*/
public void insertNewRecordInitial(Long jobID, String docHash, String docID, double desiredDocPriority,
long desiredExecuteTime, long currentTime, String[] prereqEvents)
throws ManifoldCFException
{
// No prerequisites should be possible at this point.
HashMap map = new HashMap();
Long recordID = new Long(IDFactory.make(threadContext));
map.put(idField,recordID);
if (desiredExecuteTime == -1L)
map.put(checkTimeField,new Long(0L));
else
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(jobIDField,jobID);
map.put(docHashField,docHash);
map.put(docIDField,docID);
map.put(statusField,statusToString(STATUS_PENDING));
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
// Set the document priority
map.put(docPriorityField,new Double(desiredDocPriority));
map.put(prioritySetField,new Long(currentTime));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
}
/** Note the remaining documents that do NOT need to be queued. These are noted so that the
* doneDocumentsInitial() method does not clean up seeds from previous runs wrongly.
*/
public void addRemainingDocumentsInitial(Long jobID, String[] docIDHashes)
throws ManifoldCFException
{
if (docIDHashes.length == 0)
return;
// Basically, all we want to do is move the documents whose status is still SEED
// to become NEWSEED.
HashMap inSet = new HashMap();
int j = 0;
while (j < docIDHashes.length)
{
String docIDHash = docIDHashes[j++];
inSet.put(docIDHash,docIDHash);
}
HashMap idMap = new HashMap();
int k = 0;
// To avoid deadlock, use 1 instead of something larger. The docIDs are presumed to come in in sorted order.
int maxClause = 1;
ArrayList list = new ArrayList();
j = 0;
while (j < docIDHashes.length)
{
String docIDHash = docIDHashes[j++];
if (k == maxClause)
{
processRemainingDocuments(idMap,jobID,list,inSet);
k = 0;
list.clear();
}
list.add(docIDHash);
k++;
}
if (k > 0)
processRemainingDocuments(idMap,jobID,list,inSet);
// We have a set of id's. Process those in bulk.
k = 0;
list.clear();
maxClause = maxClauseUpdateRemainingDocuments();
Iterator idValues = idMap.keySet().iterator();
while (idValues.hasNext())
{
if (k == maxClause)
{
updateRemainingDocuments(list);
k = 0;
list.clear();
}
list.add(idValues.next());
k++;
}
if (k > 0)
updateRemainingDocuments(list);
noteModifications(0,docIDHashes.length,0);
}
/** Calculate max */
protected int maxClauseProcessRemainingDocuments(Long jobID)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID)});
}
/** Process the specified set of documents. */
protected void processRemainingDocuments(Map idMap, Long jobID, ArrayList list, Map inSet)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(docHashField,list),
new UnitaryClause(jobIDField,jobID)});
newList.add(seedstatusToString(SEEDSTATUS_SEED));
IResultSet set = performQuery("SELECT "+idField+","+docHashField+" FROM "+getTableName()+
" WHERE "+query+" AND "+isSeedField+"=? FOR UPDATE",newList,null,null);
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
String docIDHash = (String)row.getValue(docHashField);
if (inSet.get(docIDHash) != null)
{
Long idValue = (Long)row.getValue(idField);
idMap.put(idValue,idValue);
}
}
}
/** Get the maximum count */
protected int maxClauseUpdateRemainingDocuments()
{
return findConjunctionClauseMax(new ClauseDescription[]{});
}
/** Update the specified set of documents to be "NEWSEED" */
protected void updateRemainingDocuments(ArrayList list)
throws ManifoldCFException
{
HashMap map = new HashMap();
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NEWSEED));
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(idField,list)});
performUpdate(map,"WHERE "+query,newList,null);
}
/** Complete the initial set of documents. This method converts the seeding statuses for the
* job to their steady-state values.
* SEEDSTATUS_SEED becomes SEEDSTATUS_NOTSEED, and SEEDSTATUS_NEWSEED becomes
* SEEDSTATUS_SEED. If the seeding was partial, then all previous seeds are preserved as such.
*@param jobID is the job identifier.
*@param isPartial is true of the passed list of seeds is not complete.
*/
public void doneDocumentsInitial(Long jobID, boolean isPartial)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query;
HashMap map = new HashMap();
if (!isPartial)
{
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_SEED)),
new UnitaryClause(jobIDField,jobID)});
map.put(isSeedField,seedstatusToString(SEEDSTATUS_NOTSEED));
performUpdate(map,"WHERE "+query,list,null);
list.clear();
}
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_SEED)),
new UnitaryClause(jobIDField,jobID)});
map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
performUpdate(map,"WHERE "+query,list,null);
}
/** Get all the current seeds.
* Returns the seed document identifiers for a job.
*@param jobID is the job identifier.
*@return the document identifier hashes that are currently considered to be seeds.
*/
public String[] getAllSeeds(Long jobID)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(isSeedField,seedstatusToString(SEEDSTATUS_SEED)),
new UnitaryClause(jobIDField,jobID)});
IResultSet set = performQuery("SELECT "+docHashField+" FROM "+getTableName()+" WHERE "+query,
list,null,null);
String[] rval = new String[set.getRowCount()];
int i = 0;
while (i < rval.length)
{
IResultRow row = set.getRow(i);
rval[i++] = (String)row.getValue(docHashField);
}
return rval;
}
/** Update an existing record (as the result of a reference add).
* The record is presumed to exist and have been locked, via "FOR UPDATE".
*@return true if the document priority slot has been retained, false if freed.
*/
public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
double desiredPriority, String[] prereqEvents)
throws ManifoldCFException
{
boolean rval = false;
HashMap map = new HashMap();
switch (currentStatus)
{
case STATUS_PURGATORY:
case STATUS_UNCHANGED:
// Set the status and time both
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority));
map.put(prioritySetField,new Long(currentTime));
rval = true;
break;
case STATUS_COMPLETE:
case STATUS_BEINGCLEANED:
// Requeue the document for processing, if there have been other changes.
if (otherChangesSeen)
{
// The document has been processed before, so it has to go into PENDINGPURGATORY.
// Set the status and time both
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority));
map.put(prioritySetField,new Long(currentTime));
rval = true;
break;
}
return rval;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
// Document is in the queue, but already needs a rescan for prior reasons.
// We're done.
return rval;
case STATUS_ACTIVE:
// Document is in the queue.
// The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
// Any update to the carrydown information may therefore be too late for the current processing cycle.
// Given that being the case, the "proper" thing to do is to requeue the document when the processing is completed, so that we can guarantee
// reprocessing will take place.
// Additional document states must therefore be added to represent the situation:
// (ACTIVE or ACTIVEPURGATORY equivalent, but where document is requeued upon completion, rather than put into "COMPLETED".
if (otherChangesSeen)
{
// Flip the state to the new one, and set the document priority at this time too - it will be preserved when the
// processing is completed.
map.put(statusField,statusToString(STATUS_ACTIVENEEDRESCAN));
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority));
map.put(prioritySetField,new Long(currentTime));
rval = true;
break;
}
return rval;
case STATUS_ACTIVEPURGATORY:
// Document is in the queue.
// The problem here is that we have no idea when the document is actually being worked on; we only find out when the document is actually *done*.
// Any update to the carrydown information may therefore be too late for the current processing cycle.
// Given that being the case, the "proper" thing to do is to requeue the document when the processing is completed, so that we can guarantee
// reprocessing will take place.
// Additional document states must therefore be added to represent the situation:
// (ACTIVE or ACTIVEPURGATORY equivalent, but where document is requeued upon completion, rather than put into "COMPLETED".
if (otherChangesSeen)
{
// Flip the state to the new one, and set the document priority at this time too - it will be preserved when the
// processing is completed.
map.put(statusField,statusToString(STATUS_ACTIVENEEDRESCANPURGATORY));
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority));
map.put(prioritySetField,new Long(currentTime));
rval = true;
break;
}
return rval;
case STATUS_PENDING:
// Document is already waiting to be processed.
// Bump up the schedule, if called for. Otherwise, just leave it alone.
Long cv = checkTimeValue;
if (cv != null)
{
long currentExecuteTime = cv.longValue();
if (currentExecuteTime <= desiredExecuteTime)
return rval;
}
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(failTimeField,null);
map.put(failCountField,null);
// Leave doc priority alone
break;
case STATUS_PENDINGPURGATORY:
// This is just like PENDING except we know that the document was processed at least once before.
// In this case we presume that the schedule was already set for adaptive or retry reasons, so DON'T change the schedule or activity
// Also, leave doc priority alone
// Fall through...
default:
return rval;
}
prereqEventManager.deleteRows(recordID);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,recordID)});
performUpdate(map,"WHERE "+query,list,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(0,1,0);
return rval;
}
/** Insert a new record into the jobqueue table (as part of adding a child reference).
*
*/
public void insertNewRecord(Long jobID, String docIDHash, String docID, double desiredDocPriority, long desiredExecuteTime,
long currentTime, String[] prereqEvents)
throws ManifoldCFException
{
HashMap map = new HashMap();
Long recordID = new Long(IDFactory.make(threadContext));
map.put(idField,recordID);
map.put(checkTimeField,new Long(desiredExecuteTime));
map.put(checkActionField,actionToString(ACTION_RESCAN));
map.put(jobIDField,jobID);
map.put(docHashField,docIDHash);
map.put(docIDField,docID);
map.put(statusField,statusToString(STATUS_PENDING));
// Be sure to set the priority also
map.put(docPriorityField,new Double(desiredDocPriority));
map.put(prioritySetField,new Long(currentTime));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
}
// Methods to convert status strings to integers and back
/** Convert seedstatus value to a string.
*/
public static String seedstatusToString(int status)
throws ManifoldCFException
{
switch (status)
{
case SEEDSTATUS_NOTSEED:
return "F";
case SEEDSTATUS_SEED:
return "S";
case SEEDSTATUS_NEWSEED:
return "N";
default:
throw new ManifoldCFException("Invalid seed status: "+Integer.toString(status));
}
}
/** Convert seedstatus field value to a boolean.
*/
public static int stringToSeedstatus(String x)
throws ManifoldCFException
{
if (x == null || x.length() == 0)
return SEEDSTATUS_NOTSEED;
Integer y = (Integer)seedstatusMap.get(x);
if (y == null)
throw new ManifoldCFException("Unknown seed status code: "+x);
return y.intValue();
}
/** Convert action field value to integer.
*/
public static int stringToAction(String value)
throws ManifoldCFException
{
Integer x = (Integer)actionMap.get(value);
if (x == null)
throw new ManifoldCFException("Unknown action string: '"+value+"'");
return x.intValue();
}
/** Convert integer to action string */
public static String actionToString(int action)
throws ManifoldCFException
{
switch (action)
{
case ACTION_RESCAN:
return "R";
case ACTION_REMOVE:
return "D";
default:
throw new ManifoldCFException("Bad action value: "+Integer.toString(action));
}
}
/** Convert status field value to integer.
*@param value is the string.
*@return the integer.
*/
public static int stringToStatus(String value)
throws ManifoldCFException
{
Integer x = (Integer)statusMap.get(value);
if (x == null)
throw new ManifoldCFException("Unknown status string: '"+value+"'");
return x.intValue();
}
/** Convert status to string.
*@param status is the status value.
*@return the database string.
*/
public static String statusToString(int status)
throws ManifoldCFException
{
switch (status)
{
case STATUS_PENDING:
return "P";
case STATUS_ACTIVE:
return "A";
case STATUS_COMPLETE:
return "C";
case STATUS_UNCHANGED:
return "U";
case STATUS_PENDINGPURGATORY:
return "G";
case STATUS_ACTIVEPURGATORY:
return "F";
case STATUS_PURGATORY:
return "Z";
case STATUS_ELIGIBLEFORDELETE:
return "E";
case STATUS_BEINGDELETED:
return "D";
case STATUS_ACTIVENEEDRESCAN:
return "a";
case STATUS_ACTIVENEEDRESCANPURGATORY:
return "f";
case STATUS_BEINGCLEANED:
return "d";
case STATUS_HOPCOUNTREMOVED:
return "H";
default:
throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
}
}
/** Get a hash value from a document id string. This will convert the string into something that can fit in 20 characters.
* (Someday this will be an MD5 hash, but for now just use java hashing.)
*@param documentIdentifier is the input document id string.
*@return the hash code.
*/
public static String getHashCode(String documentIdentifier)
throws ManifoldCFException
{
return ManifoldCF.hash(documentIdentifier);
}
// This class filters an ordered resultset to return only the duplicates
protected static class DuplicateFinder implements ILimitChecker
{
protected Long prevJobID = null;
protected String prevDocIDHash = null;
public DuplicateFinder()
{
}
/** See if this class can be legitimately compared against another of
* the same type.
*@return true if comparisons will ever return "true".
*/
public boolean doesCompareWork()
{
return false;
}
/** Create a duplicate of this class instance. All current state should be preserved.
* NOTE: Since doesCompareWork() returns false, queries using this limit checker cannot
* be cached, and therefore duplicate() is never called from the query executor.
*@return the duplicate.
*/
public ILimitChecker duplicate()
{
DuplicateFinder df = new DuplicateFinder();
df.prevJobID = prevJobID;
df.prevDocIDHash = prevDocIDHash;
return df;
}
/** Find the hashcode for this class. This will only ever be used if
* doesCompareWork() returns true.
*@return the hashcode.
*/
public int hashCode()
{
return 0;
}
/** Compare two objects and see if equal. This will only ever be used
* if doesCompareWork() returns true.
*@param object is the object to compare against.
*@return true if equal.
*/
public boolean equals(Object object)
{
return false;
}
/** See if a result row should be included in the final result set.
*@param row is the result row to check.
*@return true if it should be included, false otherwise.
*/
public boolean checkInclude(IResultRow row)
throws ManifoldCFException
{
Long jobID = (Long)row.getValue(jobIDField);
String docIDHash = (String)row.getValue(docHashField);
// If this is a duplicate, we want to keep it!
if (prevJobID != null && jobID.equals(prevJobID) && docIDHash.equals(prevDocIDHash))
return true;
prevJobID = jobID;
prevDocIDHash = docIDHash;
return false;
}
/** See if we should examine another row.
*@return true if we need to keep going, or false if we are done.
*/
public boolean checkContinue()
throws ManifoldCFException
{
return true;
}
}
}