Flush as we go, not all at the end.

git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-954@1604108 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
index 366a1c7..6aaea17 100644
--- a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
+++ b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
@@ -68,8 +68,8 @@
 import org.apache.manifoldcf.core.interfaces.IPostParameters;
 import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
 import org.apache.manifoldcf.core.interfaces.SpecificationNode;
-import org.apache.manifoldcf.core.system.ManifoldCF;
-import org.apache.manifoldcf.crawler.system.Logging;
+import org.apache.manifoldcf.agents.system.ManifoldCF;
+import org.apache.manifoldcf.agents.system.Logging;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
@@ -261,7 +261,7 @@
         status = getStatusFromJsonResponse(responsbody);
       } catch (ManifoldCFException e)
       {
-        Logging.connectors.debug(e);
+        Logging.ingest.debug(e);
         return "Could not get status from response body. Check Access Policy setting of your domain of Amazon CloudSearch.: " + e.getMessage();
       }
           
@@ -284,13 +284,13 @@
       return "Connection NOT working.";
       
     } catch (ClientProtocolException e) {
-      Logging.connectors.debug(e);
+      Logging.ingest.debug(e);
       return "Protocol exception: "+e.getMessage();
     } catch (IOException e) {
-      Logging.connectors.debug(e);
+      Logging.ingest.debug(e);
       return "IO exception: "+e.getMessage();
     } catch (ServiceInterruption e) {
-      Logging.connectors.debug(e);
+      Logging.ingest.debug(e);
       return "Transient exception: "+e.getMessage();
     }
   }
@@ -453,7 +453,7 @@
       new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
     
     documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
-    
+    conditionallyFlushDocuments();
     return DOCUMENTSTATUS_ACCEPTED;
   }
   
@@ -487,18 +487,35 @@
     {
       handleIOException(e);
     }
+    conditionallyFlushDocuments();
   }
   
   @Override
   public void noteJobComplete(IOutputNotifyActivity activities)
       throws ManifoldCFException, ServiceInterruption {
     getSession();
-    
+    flushDocuments();
+  }
+  
+  protected static final int CHUNK_SIZE = 1000;
+
+  protected void conditionallyFlushDocuments()
+    throws ManifoldCFException, ServiceInterruption
+  {
+    if (documentChunkManager.equalOrMoreThan(serverHost, serverPath, CHUNK_SIZE))
+      flushDocuments();
+  }
+  
+  protected void flushDocuments()
+    throws ManifoldCFException, ServiceInterruption
+  {
+    Logging.ingest.info("AmazonCloudSearch: Starting flush to Amazon");
+
     // Repeat until we are empty of cached stuff
     int chunkNumber = 0;
     while (true)
     {
-      DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, 1000);
+      DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, CHUNK_SIZE);
       try
       {
         if (records.length == 0)
@@ -517,13 +534,13 @@
         String status = getStatusFromJsonResponse(responsbody);
         if("success".equals(status))
         {
-          Logging.connectors.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
+          Logging.ingest.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
           //remove documents from table..
           documentChunkManager.deleteChunk(records);
         }
         else
         {
-          Logging.connectors.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
+          Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
           throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
         }
       }
@@ -730,7 +747,7 @@
       throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
           ManifoldCFException.INTERRUPTED);
     }
-    Logging.connectors.warn(
+    Logging.ingest.warn(
         "Amazon CloudSearch: IO exception: " + e.getMessage(), e);
     long currentTime = System.currentTimeMillis();
     throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
diff --git a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
index 031e379..c3d6bd4 100644
--- a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
+++ b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
@@ -210,6 +210,29 @@
     }
   }
   
+  /** Determine if there are N documents or more.
+  */
+  public boolean equalOrMoreThan(String host, String path, int maximumNumber)
+    throws ManifoldCFException
+  {
+    ArrayList params = new ArrayList();
+    String query = buildConjunctionClause(params,new ClauseDescription[]{
+      new UnitaryClause(HOST_FIELD,host),
+      new UnitaryClause(PATH_FIELD,path)});
+    IResultSet set = performQuery("SELECT "+constructCountClause(UID_FIELD)+" AS countval FROM "+getTableName()+" WHERE "+query+" "+constructOffsetLimitClause(0,maximumNumber),params,null,null);
+    long count;
+    if (set.getRowCount() > 0)
+    {
+      IResultRow row = set.getRow(0);
+      Long countVal = (Long)row.getValue("countval");
+      count = countVal.longValue();
+    }
+    else
+      count = 0L;
+    
+    return count >= maximumNumber;
+  }
+  
   /** Read a chunk of documents.
   */
   public DocumentRecord[] readChunk(String host, String path, int maximumNumber)