Use database fields to permit multiple reprioritization threads to be active at the same time, cross cluster.

git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-1100@1638083 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
index e3455d9..3383464 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
@@ -225,10 +225,11 @@
 
   /** Get a list of not-yet-processed documents to reprioritize.  Documents in all jobs will be
   * returned by this method.  Up to n document descriptions will be returned.
+  *@param processID is the process that requests the reprioritization documents.
   *@param n is the maximum number of document descriptions desired.
   *@return the document descriptions.
   */
-  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
+  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(String processID, int n)
     throws ManifoldCFException;
 
   /** Save a set of document priorities.  In the case where a document was eligible to have its
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
index 5e45330..c0c6f39 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
@@ -33,6 +33,7 @@
   public static final String _rcsid = "@(#)$Id: JobManager.java 998576 2010-09-19 01:11:02Z kwright $";
 
   protected static final String stufferLock = "_STUFFER_";
+  protected static final String reprioritizationLock = "_REPRIORITIZER_";
   protected static final String deleteStufferLock = "_DELETESTUFFER_";
   protected static final String expireStufferLock = "_EXPIRESTUFFER_";
   protected static final String cleanStufferLock = "_CLEANSTUFFER_";
@@ -2064,47 +2065,106 @@
 
   /** Get a list of not-yet-processed documents to reprioritize.  Documents in all jobs will be
   * returned by this method.  Up to n document descriptions will be returned.
+  *@param processID is the process that requests the reprioritization documents.
   *@param n is the maximum number of document descriptions desired.
   *@return the document descriptions.
   */
   @Override
-  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
+  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(String processID, int n)
     throws ManifoldCFException
   {
-    StringBuilder sb = new StringBuilder("SELECT ");
-    ArrayList list = new ArrayList();
-
-    // This query MUST return only documents that are in a pending state which belong to an active job!!!
-
-    sb.append(jobQueue.idField).append(",")
-      .append(jobQueue.docHashField).append(",")
-      .append(jobQueue.docIDField).append(",")
-      .append(jobQueue.jobIDField)
-      .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
-      .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-        new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(true))})).append(" ")
-      .append(database.constructOffsetLimitClause(0,n));
-
-    // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
-    //jobQueue.unconditionallyAnalyzeTables();
-
-    //IResultSet set = database.performQuery(sb.toString(),list,null,null,n,null);
-    IResultSet set = database.performQuery(sb.toString(),list,null,null);
-
-    DocumentDescription[] rval = new DocumentDescription[set.getRowCount()];
-
-    int i = 0;
-    while (i < set.getRowCount())
+    // Retry loop - in case we get a deadlock despite our best efforts
+    while (true)
     {
-      IResultRow row = set.getRow(i);
-      rval[i] =new DocumentDescription((Long)row.getValue(jobQueue.idField),
-        (Long)row.getValue(jobQueue.jobIDField),
-        (String)row.getValue(jobQueue.docHashField),
-        (String)row.getValue(jobQueue.docIDField));
-      i++;
-    }
+      long sleepAmt = 0L;
 
-    return rval;
+      // Write lock insures that only one thread cluster-wide can be doing this at a given time, so FOR UPDATE is unneeded.
+      lockManager.enterWriteLock(reprioritizationLock);
+      try
+      {
+        // Start the transaction now
+        database.beginTransaction();
+        try
+        {
+
+          StringBuilder sb = new StringBuilder("SELECT ");
+          ArrayList list = new ArrayList();
+
+          // This query MUST return only documents that are in a pending state which belong to an active job!!!
+
+          sb.append(jobQueue.idField).append(",")
+            .append(jobQueue.docHashField).append(",")
+            .append(jobQueue.docIDField).append(",")
+            .append(jobQueue.jobIDField)
+            .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
+            .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+              new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(JobQueue.NEEDPRIORITY_TRUE))})).append(" ")
+            .append(database.constructOffsetLimitClause(0,n));
+
+          // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
+          //jobQueue.unconditionallyAnalyzeTables();
+
+          //IResultSet set = database.performQuery(sb.toString(),list,null,null,n,null);
+          IResultSet set = database.performQuery(sb.toString(),list,null,null);
+
+          DocumentDescription[] rval = new DocumentDescription[set.getRowCount()];
+          // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
+          // before updating any rows in jobqueue.
+          String[] docIDHashes = new String[set.getRowCount()];
+          Map<String,DocumentDescription> storageMap = new HashMap<String,DocumentDescription>();
+
+          for (int i = 0 ; i < set.getRowCount() ; i++)
+          {
+            IResultRow row = set.getRow(i);
+            String docHash = (String)row.getValue(jobQueue.docHashField);
+            Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+            rval[i] = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+              jobID,
+              docHash,
+              (String)row.getValue(jobQueue.docIDField));
+            // Compute the doc ID hash
+            docIDHashes[i] = docHash + ":" + jobID;
+            storageMap.put(docIDHashes[i],rval[i]);
+          }
+          
+          java.util.Arrays.sort(docIDHashes);
+          for (String docIDHash : docIDHashes)
+          {
+            DocumentDescription dd = storageMap.get(docIDHash);
+            jobQueue.markNeedPriorityInProgress(dd.getID(),processID);
+          }
+
+          database.performCommit();
+          return rval;
+        }
+        catch (ManifoldCFException e)
+        {
+          database.signalRollback();
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted transaction finding documents for reprioritization: "+e.getMessage());
+            sleepAmt = getRandomAmount();
+            continue;
+          }
+          throw e;
+        }
+        catch (Error e)
+        {
+          database.signalRollback();
+          throw e;
+        }
+        finally
+        {
+          database.endTransaction();
+        }
+      }
+      finally
+      {
+        lockManager.leaveWriteLock(reprioritizationLock);
+        sleepFor(sleepAmt);
+      }
+    }
   }
 
   /** Save a set of document priorities.  In the case where a document was eligible to have its
@@ -2159,8 +2219,8 @@
           {
             IResultRow row = set.getRow(0);
             // Grab the needPriority value
-            boolean needPriority = jobQueue.stringToNeedPriority((String)row.getValue(jobQueue.needPriorityField));
-            if (needPriority)
+            int needPriority = jobQueue.stringToNeedPriority((String)row.getValue(jobQueue.needPriorityField));
+            if (needPriority == JobQueue.NEEDPRIORITY_INPROGRESS)
             {
               IPriorityCalculator priority = priorities[index];
               jobQueue.writeDocPriority(dd.getID(),priority);
@@ -2882,8 +2942,8 @@
             // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
             // before updating any rows in jobqueue.
             String[] docIDHashes = new String[set.getRowCount()];
-            Map storageMap = new HashMap();
-            Map statusMap = new HashMap();
+            Map<String,DocumentDescription> storageMap = new HashMap<String,DocumentDescription>();
+            Map<String,Integer> statusMap = new HashMap<String,Integer>();
 
             int i = 0;
             while (i < set.getRowCount())
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
index b79cbdd..377c837 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
@@ -45,6 +45,7 @@
  * <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
  * <tr><td>seedingprocessid</td><td>VARCHAR(16)</td><td></td></tr>
  * <tr><td>needpriority</td><td>CHAR(1)</td><td></td></tr>
+ * <tr><td>needpriorityprocessid</td><td>VARCHAR(16)</td><td></td></tr>
  * </table>
  * <br><br>
  * 
@@ -77,6 +78,11 @@
   public final static int ACTION_RESCAN = 0;
   public final static int ACTION_REMOVE = 1;
 
+  // Need priority status
+  public final static int NEEDPRIORITY_FALSE = 0;
+  public final static int NEEDPRIORITY_INPROGRESS = 1;
+  public final static int NEEDPRIORITY_TRUE = 2;
+  
   // State descriptions are as follows:
   // PENDING means a newly-added reference that has not been scanned before.
   // ACTIVE means a newly-added reference that is being scanned for the first time.
@@ -118,15 +124,15 @@
   public static final String processIDField = "processid";
   public static final String seedingProcessIDField = "seedingprocessid";
   public static final String needPriorityField = "needpriority";
+  public static final String needPriorityProcessIDField = "needpriorityprocessid";
   
   public static final double noDocPriorityValue = 1e9;
   public static final Double nullDocPriority = new Double(noDocPriorityValue + 1.0);
   
-  protected static Map statusMap;
+  protected static final Map<String,Integer> statusMap = new HashMap<String,Integer>();
 
   static
   {
-    statusMap = new HashMap();
     statusMap.put("P",new Integer(STATUS_PENDING));
     statusMap.put("A",new Integer(STATUS_ACTIVE));
     statusMap.put("C",new Integer(STATUS_COMPLETE));
@@ -142,25 +148,32 @@
     statusMap.put("H",new Integer(STATUS_HOPCOUNTREMOVED));
   }
 
-  protected static Map seedstatusMap;
+  protected static final Map<String,Integer> seedstatusMap = new HashMap<String,Integer>();
 
   static
   {
-    seedstatusMap = new HashMap();
     seedstatusMap.put("F",new Integer(SEEDSTATUS_NOTSEED));
     seedstatusMap.put("S",new Integer(SEEDSTATUS_SEED));
     seedstatusMap.put("N",new Integer(SEEDSTATUS_NEWSEED));
   }
 
-  protected static Map actionMap;
+  protected static final Map<String,Integer> actionMap = new HashMap<String,Integer>();
 
   static
   {
-    actionMap = new HashMap();
     actionMap.put("R",new Integer(ACTION_RESCAN));
     actionMap.put("D",new Integer(ACTION_REMOVE));
   }
 
+  protected static final Map<String,Integer> needPriorityMap = new HashMap<String,Integer>();
+  
+  static
+  {
+    needPriorityMap.put("T",new Integer(NEEDPRIORITY_TRUE));
+    needPriorityMap.put("I",new Integer(NEEDPRIORITY_INPROGRESS));
+    needPriorityMap.put("F",new Integer(NEEDPRIORITY_FALSE));
+  }
+  
   /** Prerequisite event manager */
   protected PrereqEventManager prereqEventManager;
 
@@ -209,6 +222,7 @@
         map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         map.put(seedingProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         map.put(needPriorityField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+        map.put(needPriorityProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         performCreate(map,null);
       }
       else
@@ -343,12 +357,22 @@
   public void restart(String processID)
     throws ManifoldCFException
   {
-    // Map ACTIVE back to PENDING.
+    // Map NEEDPRIORITY_INPROCESS back to NEEDPRIORITY_TRUE
     HashMap map = new HashMap();
+    ArrayList list = new ArrayList();
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
+    map.put(needPriorityProcessIDField,null);
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS)),
+      new UnitaryClause(processIDField,processID)});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map ACTIVE back to PENDING.
+    map.clear();
+    list.clear();
     map.put(statusField,statusToString(STATUS_PENDING));
     map.put(processIDField,null);
-    ArrayList list = new ArrayList();
-    String query = buildConjunctionClause(list,new ClauseDescription[]{
+    query = buildConjunctionClause(list,new ClauseDescription[]{
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_ACTIVE),
         statusToString(STATUS_ACTIVENEEDRESCAN)}),
@@ -408,12 +432,21 @@
   public void restart()
     throws ManifoldCFException
   {
-    // Map ACTIVE back to PENDING.
+    // Map NEEDPRIORITY_INPROGRESS to NEEDPRIORITY_TRUE
     HashMap map = new HashMap();
+    ArrayList list = new ArrayList();
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
+    map.put(needPriorityProcessIDField,null);
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS))});
+    performUpdate(map,"WHERE "+query,list,null);
+
+    // Map ACTIVE back to PENDING.
+    map.clear();
+    list.clear();
     map.put(statusField,statusToString(STATUS_PENDING));
     map.put(processIDField,null);
-    ArrayList list = new ArrayList();
-    String query = buildConjunctionClause(list,new ClauseDescription[]{
+    query = buildConjunctionClause(list,new ClauseDescription[]{
       new MultiClause(statusField,new Object[]{
         statusToString(STATUS_ACTIVE),
         statusToString(STATUS_ACTIVENEEDRESCAN)})});
@@ -733,7 +766,7 @@
     // Map COMPLETE to PENDINGPURGATORY
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
-    map.put(needPriorityField,needPriorityToString(true));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
     // Do not reset priorities here!  They should all be blank at this point.
     map.put(checkTimeField,new Long(0L));
     map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -791,7 +824,7 @@
     // Map COMPLETE to PENDINGPURGATORY.
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
-    map.put(needPriorityField,needPriorityToString(true));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
     // Do not reset priorities here!  They should all be blank at this point.
     map.put(checkTimeField,new Long(0L));
     map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -865,12 +898,27 @@
     noteModifications(0,0,1);
   }
 
+  /** Prepare to calculate a document priority for a given row. */
+  public void markNeedPriorityInProgress(Long rowID, String processID)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_INPROGRESS));
+    map.put(needPriorityProcessIDField,processID);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,rowID)});
+    performUpdate(map,"WHERE "+query,list,null);
+    noteModifications(0,1,0);
+  }
+  
   /** Write out a document priority */
   public void writeDocPriority(Long rowID, IPriorityCalculator priority)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
-    map.put(needPriorityField,needPriorityToString(false));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
+    map.put(needPriorityProcessIDField,null);
     map.put(docPriorityField,new Double(priority.getDocumentPriority()));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -884,7 +932,8 @@
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
-    map.put(needPriorityField,needPriorityToString(false));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
+    map.put(needPriorityProcessIDField,null);
     map.put(docPriorityField,nullDocPriority);
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -900,7 +949,7 @@
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
-    map.put(needPriorityField,needPriorityToString(true));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_TRUE));
     map.put(docPriorityField,nullDocPriority);
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -929,7 +978,7 @@
       checkTimeValue = null;
       // Remove document priority; we don't want to pollute the queue.  See CONNECTORS-290.
       map.put(docPriorityField,nullDocPriority);
-      map.put(needPriorityField,needPriorityToString(false));
+      map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
       break;
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -1241,7 +1290,7 @@
       map.put(failCountField,null);
       // Update the doc priority.
       map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-      map.put(needPriorityField,needPriorityToString(false));
+      map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
       break;
 
     case STATUS_PENDING:
@@ -1318,7 +1367,7 @@
     map.put(seedingProcessIDField,processID);
     // Set the document priority
     map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
-    map.put(needPriorityField,needPriorityToString(false));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(1,0,0);
@@ -1517,7 +1566,7 @@
       map.put(failCountField,null);
       // Going into pending: set the docpriority.
       map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-      map.put(needPriorityField,needPriorityToString(false));
+      map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
       break;
     case STATUS_COMPLETE:
     case STATUS_BEINGCLEANED:
@@ -1534,7 +1583,7 @@
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(needPriorityField,needPriorityToString(false));
+        map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
         break;
       }
       return;
@@ -1563,7 +1612,7 @@
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(needPriorityField,needPriorityToString(false));
+        map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
         break;
       }
       return;
@@ -1587,7 +1636,7 @@
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(needPriorityField,needPriorityToString(false));
+        map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
         break;
       }
       return;
@@ -1643,7 +1692,7 @@
     map.put(statusField,statusToString(STATUS_PENDING));
     // Be sure to set the priority also
     map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
-    map.put(needPriorityField,needPriorityToString(false));
+    map.put(needPriorityField,needPriorityToString(NEEDPRIORITY_FALSE));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(1,0,0);
@@ -1678,7 +1727,7 @@
   {
     if (x == null || x.length() == 0)
       return SEEDSTATUS_NOTSEED;
-    Integer y = (Integer)seedstatusMap.get(x);
+    Integer y = seedstatusMap.get(x);
     if (y == null)
       throw new ManifoldCFException("Unknown seed status code: "+x);
     return y.intValue();
@@ -1689,7 +1738,7 @@
   public static int stringToAction(String value)
     throws ManifoldCFException
   {
-    Integer x = (Integer)actionMap.get(value);
+    Integer x = actionMap.get(value);
     if (x == null)
       throw new ManifoldCFException("Unknown action string: '"+value+"'");
     return x.intValue();
@@ -1712,22 +1761,33 @@
 
   /** Convert need priority value to boolean.
   */
-  public static boolean stringToNeedPriority(String value)
+  public static int stringToNeedPriority(String value)
     throws ManifoldCFException
   {
-    if (value != null && value.equals("T"))
-      return true;
-    return false;
+    if (value == null || value.length() == 0)
+      return NEEDPRIORITY_FALSE;
+    Integer x = needPriorityMap.get(value);
+    if (x == null)
+      throw new ManifoldCFException("Unknown needpriority string: '"+value+"'");
+    return x.intValue();
   }
   
   /** Convert boolean to need priority value.
   */
-  public static String needPriorityToString(boolean value)
+  public static String needPriorityToString(int value)
     throws ManifoldCFException
   {
-    if (value)
+    switch (value)
+    {
+    case NEEDPRIORITY_TRUE:
       return "T";
-    return "F";
+    case NEEDPRIORITY_FALSE:
+      return "F";
+    case NEEDPRIORITY_INPROGRESS:
+      return "I";
+    default:
+      throw new ManifoldCFException("Bad needpriority value: "+Integer.toString(value));
+    }
   }
   
   /** Convert status field value to integer.
@@ -1737,7 +1797,7 @@
   public static int stringToStatus(String value)
     throws ManifoldCFException
   {
-    Integer x = (Integer)statusMap.get(value);
+    Integer x = statusMap.get(value);
     if (x == null)
       throw new ManifoldCFException("Unknown status string: '"+value+"'");
     return x.intValue();
diff --git a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
index a066937..a1b6e35 100644
--- a/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
+++ b/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
@@ -135,7 +135,7 @@
             // We may well wind up calculating priority for documents that wind up having their
             // state changed before we can write back, but this is okay because update is only
             // going to be permitted for rows that still have the right state.
-            DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(1000);
+            DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(processID,1000);
             if (descs.length > 0)
             {
               ManifoldCF.writeDocumentPriorities(threadContext,