Use database fields to permit multiple reprioritization threads to be active at the same time, cross cluster.
git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-1100@1638083 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
index e3455d9..3383464 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
@@ -225,10 +225,11 @@
/** Get a list of not-yet-processed documents to reprioritize. Documents in all jobs will be
* returned by this method. Up to n document descriptions will be returned.
+ *@param processID is the process that requests the reprioritization documents.
*@param n is the maximum number of document descriptions desired.
*@return the document descriptions.
*/
- public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
+ public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(String processID, int n)
throws ManifoldCFException;
/** Save a set of document priorities. In the case where a document was eligible to have its
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
index 5e45330..c0c6f39 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
@@ -33,6 +33,7 @@
public static final String _rcsid = "@(#)$Id: JobManager.java 998576 2010-09-19 01:11:02Z kwright $";
protected static final String stufferLock = "_STUFFER_";
+ protected static final String reprioritizationLock = "_REPRIORITIZER_";
protected static final String deleteStufferLock = "_DELETESTUFFER_";
protected static final String expireStufferLock = "_EXPIRESTUFFER_";
protected static final String cleanStufferLock = "_CLEANSTUFFER_";
@@ -2064,47 +2065,106 @@
/** Get a list of not-yet-processed documents to reprioritize. Documents in all jobs will be
* returned by this method. Up to n document descriptions will be returned.
+ *@param processID is the process that requests the reprioritization documents.
*@param n is the maximum number of document descriptions desired.
*@return the document descriptions.
*/
@Override
- public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
+ public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(String processID, int n)
throws ManifoldCFException
{
- StringBuilder sb = new StringBuilder("SELECT ");
- ArrayList list = new ArrayList();
-
- // This query MUST return only documents that are in a pending state which belong to an active job!!!
-
- sb.append(jobQueue.idField).append(",")
- .append(jobQueue.docHashField).append(",")
- .append(jobQueue.docIDField).append(",")
- .append(jobQueue.jobIDField)
- .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
- .append(database.buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(true))})).append(" ")
- .append(database.constructOffsetLimitClause(0,n));
-
- // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
- //jobQueue.unconditionallyAnalyzeTables();
-
- //IResultSet set = database.performQuery(sb.toString(),list,null,null,n,null);
- IResultSet set = database.performQuery(sb.toString(),list,null,null);
-
- DocumentDescription[] rval = new DocumentDescription[set.getRowCount()];
-
- int i = 0;
- while (i < set.getRowCount())
+ // Retry loop - in case we get a deadlock despite our best efforts
+ while (true)
{
- IResultRow row = set.getRow(i);
- rval[i] =new DocumentDescription((Long)row.getValue(jobQueue.idField),
- (Long)row.getValue(jobQueue.jobIDField),
- (String)row.getValue(jobQueue.docHashField),
- (String)row.getValue(jobQueue.docIDField));
- i++;
- }
+ long sleepAmt = 0L;
- return rval;
+ // Write lock insures that only one thread cluster-wide can be doing this at a given time, so FOR UPDATE is unneeded.
+ lockManager.enterWriteLock(reprioritizationLock);
+ try
+ {
+ // Start the transaction now
+ database.beginTransaction();
+ try
+ {
+
+ StringBuilder sb = new StringBuilder("SELECT ");
+ ArrayList list = new ArrayList();
+
+ // This query MUST return only documents that are in a pending state which belong to an active job!!!
+
+ sb.append(jobQueue.idField).append(",")
+ .append(jobQueue.docHashField).append(",")
+ .append(jobQueue.docIDField).append(",")
+ .append(jobQueue.jobIDField)
+ .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(JobQueue.NEEDPRIORITY_TRUE))})).append(" ")
+ .append(database.constructOffsetLimitClause(0,n));
+
+ // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
+ //jobQueue.unconditionallyAnalyzeTables();
+
+ //IResultSet set = database.performQuery(sb.toString(),list,null,null,n,null);
+ IResultSet set = database.performQuery(sb.toString(),list,null,null);
+
+ DocumentDescription[] rval = new DocumentDescription[set.getRowCount()];
+ // To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash,
+ // before updating any rows in jobqueue.
+ String[] docIDHashes = new String[set.getRowCount()];
+ Map<String,DocumentDescription> storageMap = new HashMap<String,DocumentDescription>();
+
+ for (int i = 0 ; i < set.getRowCount() ; i++)
+ {
+ IResultRow row = set.getRow(i);
+ String docHash = (String)row.getValue(jobQueue.docHashField);
+ Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+ rval[i] = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+ jobID,
+ docHash,
+ (String)row.getValue(jobQueue.docIDField));
+ // Compute the doc ID hash
+ docIDHashes[i] = docHash + ":" + jobID;
+ storageMap.put(docIDHashes[i],rval[i]);
+ }
+
+ java.util.Arrays.sort(docIDHashes);
+ for (String docIDHash : docIDHashes)
+ {
+ DocumentDescription dd = storageMap.get(docIDHash);
+ jobQueue.markNeedPriorityInProgress(dd.getID(),processID);
+ }
+
+ database.performCommit();
+ return rval;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction finding documents for reprioritization: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(reprioritizationLock);
+ sleepFor(sleepAmt);
+ }
+ }
}
/** Save a set of document priorities. In the case where a document was eligible to have its
@@ -2159,8 +2219,8 @@
{
IResultRow row = set.getRow(0);
// Grab the needPriority value
- boolean needPriority = jobQueue.stringToNeedPriority((String)row.getValue(jobQueue.needPriorityField));
- if (needPriority)
+ int needPriority = jobQueue.stringToNeedPriority((String)row.getValue(jobQueue.needPriorityField));
+ if (needPriority == JobQueue.NEEDPRIORITY_INPROGRESS)
{
IPriorityCalculator priority = priorities[index];
jobQueue.writeDocPriority(dd.getID(),priority);
@@ -2882,8 +2942,8 @@
// To avoid deadlock, we want to update the document id hashes in order. This means reading into a structure I can sort by docid hash,
// before updating any rows in jobqueue.
String[] docIDHashes = new String[set.getRowCount()];
- Map storageMap = new HashMap();
- Map statusMap = new HashMap();
+ Map<String,DocumentDescription> storageMap = new HashMap<String,DocumentDescription>();
+ Map<String,Integer> statusMap = new HashMap<String,Integer>();
int i = 0;
while (i < set.getRowCount())
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
index b79cbdd..377c837 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
@@ -45,6 +45,7 @@
* <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
* <tr><td>seedingprocessid</td><td>VARCHAR(16)</td><td></td></tr>
* <tr><td>needpriority</td><td>CHAR(1)</td><td></td></tr>
+ * <tr><td>needpriorityprocessid</td><td>VARCHAR(16)</td><td></td></tr>
* </table>
* <br><br>
*
@@ -77,6 +78,11 @@
public final static int ACTION_RESCAN = 0;
public final static int ACTION_REMOVE = 1;
+ // Need priority status
+ public final static int NEEDPRIORITY_FALSE = 0;
+ public final static int NEEDPRIORITY_INPROGRESS = 1;
+ public final static int NEEDPRIORITY_TRUE = 2;
+
// State descriptions are as follows:
// PENDING means a newly-added reference that has not been scanned before.
// ACTIVE means a newly-added reference that is being scanned for the first time.
@@ -118,15 +124,15 @@
public static final String processIDField = "processid";
public static final String seedingProcessIDField = "seedingprocessid";
public static final String needPriorityField = "needpriority";
+ public static final String needPriorityProcessIDField = "needpriorityprocessid";
public static final double noDocPriorityValue = 1e9;
public static final Double nullDocPriority = new Double(noDocPriorityValue + 1.0);
- protected static Map statusMap;
+ protected static final Map<String,Integer> statusMap = new HashMap<String,Integer>();
static
{
- statusMap = new HashMap();
statusMap.put("P",new Integer(STATUS_PENDING));
statusMap.put("A",new Integer(STATUS_ACTIVE));
statusMap.put("C",new Integer(STATUS_COMPLETE));
@@ -142,25 +148,32 @@
statusMap.put("H",new Integer(STATUS_HOPCOUNTREMOVED));
}
- protected static Map seedstatusMap;
+ protected static final Map<String,Integer> seedstatusMap = new HashMap<String,Integer>();
static
{
- seedstatusMap = new HashMap();
seedstatusMap.put("F",new Integer(SEEDSTATUS_NOTSEED));
seedstatusMap.put("S",new Integer(SEEDSTATUS_SEED));
seedstatusMap.put("N",new Integer(SEEDSTATUS_NEWSEED));
}
- protected static Map actionMap;
+ protected static final Map<String,Integer> actionMap = new HashMap<String,Integer>();
static
{
- actionMap = new HashMap();
actionMap.put("R",new Integer(ACTION_RESCAN));
actionMap.put("D",new Integer(ACTION_REMOVE));
}
+ protected static final Map<String,Integer> needPriorityMap = new HashMap<String,Integer>();
+
+ static
+ {
+ needPriorityMap.put("T",new Integer(NEEDPRIORITY_TRUE));
+ needPriorityMap.put("I",new Integer(NEEDPRIORITY_INPROGRESS));
+ needPriorityMap.put("F",new Integer(NEEDPRIORITY_FALSE));
+ }
+
/** Prerequisite event manager */
protected PrereqEventManager prereqEventManager;
@@ -209,6 +222,7 @@
map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
map.put(seedingProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
map.put(needPriorityField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+ map.put(needPriorityProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
performCreate(map,null);
}
else
@@ -343,12 +357,22 @@
public void restart(String processID)
throws ManifoldCFException
{
- // Map ACTIVE back to PENDING.
+ // Map NEEDPRIORITY_INPROCESS back to NEEDPRIORITY_TRUE
HashMap map = new HashMap();
+ ArrayList list = new ArrayList();
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
+ map.put(needPriorityProcessIDField,null);
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS)),
+ new UnitaryClause(processIDField,processID)});
+ performUpdate(map,"WHERE "+query,list,null);
+
+ // Map ACTIVE back to PENDING.
+ map.clear();
+ list.clear();
map.put(statusField,statusToString(STATUS_PENDING));
map.put(processIDField,null);
- ArrayList list = new ArrayList();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
+ query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVE),
statusToString(STATUS_ACTIVENEEDRESCAN)}),
@@ -408,12 +432,21 @@
public void restart()
throws ManifoldCFException
{
- // Map ACTIVE back to PENDING.
+ // Map NEEDPRIORITY_INPROGRESS to NEEDPRIORITY_TRUE
HashMap map = new HashMap();
+ ArrayList list = new ArrayList();
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
+ map.put(needPriorityProcessIDField,null);
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS))});
+ performUpdate(map,"WHERE "+query,list,null);
+
+ // Map ACTIVE back to PENDING.
+ map.clear();
+ list.clear();
map.put(statusField,statusToString(STATUS_PENDING));
map.put(processIDField,null);
- ArrayList list = new ArrayList();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
+ query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(statusField,new Object[]{
statusToString(STATUS_ACTIVE),
statusToString(STATUS_ACTIVENEEDRESCAN)})});
@@ -733,7 +766,7 @@
// Map COMPLETE to PENDINGPURGATORY
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
- map.put(needPriorityField,needPriorityToString(true));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -791,7 +824,7 @@
// Map COMPLETE to PENDINGPURGATORY.
HashMap map = new HashMap();
map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
- map.put(needPriorityField,needPriorityToString(true));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
// Do not reset priorities here! They should all be blank at this point.
map.put(checkTimeField,new Long(0L));
map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -865,12 +898,27 @@
noteModifications(0,0,1);
}
+ /** Prepare to calculate a document priority for a given row. */
+ public void markNeedPriorityInProgress(Long rowID, String processID)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS));
+ map.put(needPriorityProcessIDField,processID);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,rowID)});
+ performUpdate(map,"WHERE "+query,list,null);
+ noteModifications(0,1,0);
+ }
+
/** Write out a document priority */
public void writeDocPriority(Long rowID, IPriorityCalculator priority)
throws ManifoldCFException
{
HashMap map = new HashMap();
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
+ map.put(needPriorityProcessIDField,null);
map.put(docPriorityField,new Double(priority.getDocumentPriority()));
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -884,7 +932,8 @@
throws ManifoldCFException
{
HashMap map = new HashMap();
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
+ map.put(needPriorityProcessIDField,null);
map.put(docPriorityField,nullDocPriority);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -900,7 +949,7 @@
throws ManifoldCFException
{
HashMap map = new HashMap();
- map.put(needPriorityField,needPriorityToString(true));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
map.put(docPriorityField,nullDocPriority);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -929,7 +978,7 @@
checkTimeValue = null;
// Remove document priority; we don't want to pollute the queue. See CONNECTORS-290.
map.put(docPriorityField,nullDocPriority);
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
case STATUS_ACTIVENEEDRESCAN:
case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -1241,7 +1290,7 @@
map.put(failCountField,null);
// Update the doc priority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
case STATUS_PENDING:
@@ -1318,7 +1367,7 @@
map.put(seedingProcessIDField,processID);
// Set the document priority
map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
@@ -1517,7 +1566,7 @@
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
case STATUS_COMPLETE:
case STATUS_BEINGCLEANED:
@@ -1534,7 +1583,7 @@
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
}
return;
@@ -1563,7 +1612,7 @@
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
}
return;
@@ -1587,7 +1636,7 @@
map.put(failCountField,null);
// Going into pending: set the docpriority.
map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
break;
}
return;
@@ -1643,7 +1692,7 @@
map.put(statusField,statusToString(STATUS_PENDING));
// Be sure to set the priority also
map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
- map.put(needPriorityField,needPriorityToString(false));
+ map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
performInsert(map,null);
prereqEventManager.addRows(recordID,prereqEvents);
noteModifications(1,0,0);
@@ -1678,7 +1727,7 @@
{
if (x == null || x.length() == 0)
return SEEDSTATUS_NOTSEED;
- Integer y = (Integer)seedstatusMap.get(x);
+ Integer y = seedstatusMap.get(x);
if (y == null)
throw new ManifoldCFException("Unknown seed status code: "+x);
return y.intValue();
@@ -1689,7 +1738,7 @@
public static int stringToAction(String value)
throws ManifoldCFException
{
- Integer x = (Integer)actionMap.get(value);
+ Integer x = actionMap.get(value);
if (x == null)
throw new ManifoldCFException("Unknown action string: '"+value+"'");
return x.intValue();
@@ -1712,22 +1761,33 @@
/** Convert need priority value to boolean.
*/
- public static boolean stringToNeedPriority(String value)
+ public static int stringToNeedPriority(String value)
throws ManifoldCFException
{
- if (value != null && value.equals("T"))
- return true;
- return false;
+ if (value == null || value.length() == 0)
+ return NEEDPRIORITY_FALSE;
+ Integer x = needPriorityMap.get(value);
+ if (x == null)
+ throw new ManifoldCFException("Unknown needpriority string: '"+value+"'");
+ return x.intValue();
}
/** Convert boolean to need priority value.
*/
- public static String needPriorityToString(boolean value)
+ public static String needPriorityToString(int value)
throws ManifoldCFException
{
- if (value)
+ switch (value)
+ {
+ case NEEDPRIORITY_TRUE:
return "T";
- return "F";
+ case NEEDPRIORITY_FALSE:
+ return "F";
+ case NEEDPRIORITY_INPROGRESS:
+ return "I";
+ default:
+ throw new ManifoldCFException("Bad needpriority value: "+Integer.toString(value));
+ }
}
/** Convert status field value to integer.
@@ -1737,7 +1797,7 @@
public static int stringToStatus(String value)
throws ManifoldCFException
{
- Integer x = (Integer)statusMap.get(value);
+ Integer x = statusMap.get(value);
if (x == null)
throw new ManifoldCFException("Unknown status string: '"+value+"'");
return x.intValue();
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
index a066937..a1b6e35 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
@@ -135,7 +135,7 @@
// We may well wind up calculating priority for documents that wind up having their
// state changed before we can write back, but this is okay because update is only
// going to be permitted for rows that still have the right state.
- DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(1000);
+ DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(processID,1000);
if (descs.length > 0)
{
ManifoldCF.writeDocumentPriorities(threadContext,