Flush as we go, not all at the end.
git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-954@1604108 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
index 366a1c7..6aaea17 100644
--- a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
+++ b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/AmazonCloudSearchConnector.java
@@ -68,8 +68,8 @@
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
-import org.apache.manifoldcf.core.system.ManifoldCF;
-import org.apache.manifoldcf.crawler.system.Logging;
+import org.apache.manifoldcf.agents.system.ManifoldCF;
+import org.apache.manifoldcf.agents.system.Logging;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
@@ -261,7 +261,7 @@
status = getStatusFromJsonResponse(responsbody);
} catch (ManifoldCFException e)
{
- Logging.connectors.debug(e);
+ Logging.ingest.debug(e);
return "Could not get status from response body. Check Access Policy setting of your domain of Amazon CloudSearch.: " + e.getMessage();
}
@@ -284,13 +284,13 @@
return "Connection NOT working.";
} catch (ClientProtocolException e) {
- Logging.connectors.debug(e);
+ Logging.ingest.debug(e);
return "Protocol exception: "+e.getMessage();
} catch (IOException e) {
- Logging.connectors.debug(e);
+ Logging.ingest.debug(e);
return "IO exception: "+e.getMessage();
} catch (ServiceInterruption e) {
- Logging.connectors.debug(e);
+ Logging.ingest.debug(e);
return "Transient exception: "+e.getMessage();
}
}
@@ -453,7 +453,7 @@
new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
-
+ conditionallyFlushDocuments();
return DOCUMENTSTATUS_ACCEPTED;
}
@@ -487,18 +487,35 @@
{
handleIOException(e);
}
+ conditionallyFlushDocuments();
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
getSession();
-
+ flushDocuments();
+ }
+
+ protected static final int CHUNK_SIZE = 1000;
+
+ protected void conditionallyFlushDocuments()
+ throws ManifoldCFException, ServiceInterruption
+ {
+ if (documentChunkManager.equalOrMoreThan(serverHost, serverPath, CHUNK_SIZE))
+ flushDocuments();
+ }
+
+ protected void flushDocuments()
+ throws ManifoldCFException, ServiceInterruption
+ {
+ Logging.ingest.info("AmazonCloudSearch: Starting flush to Amazon");
+
// Repeat until we are empty of cached stuff
int chunkNumber = 0;
while (true)
{
- DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, 1000);
+ DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, CHUNK_SIZE);
try
{
if (records.length == 0)
@@ -517,13 +534,13 @@
String status = getStatusFromJsonResponse(responsbody);
if("success".equals(status))
{
- Logging.connectors.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
+ Logging.ingest.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
//remove documents from table..
documentChunkManager.deleteChunk(records);
}
else
{
- Logging.connectors.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
+ Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
}
}
@@ -730,7 +747,7 @@
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
ManifoldCFException.INTERRUPTED);
}
- Logging.connectors.warn(
+ Logging.ingest.warn(
"Amazon CloudSearch: IO exception: " + e.getMessage(), e);
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
diff --git a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
index 031e379..c3d6bd4 100644
--- a/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
+++ b/connectors/amazoncloudsearch/connector/src/main/java/org/apache/manifoldcf/agents/output/amazoncloudsearch/DocumentChunkManager.java
@@ -210,6 +210,29 @@
}
}
+ /** Determine if there are N documents or more.
+ */
+ public boolean equalOrMoreThan(String host, String path, int maximumNumber)
+ throws ManifoldCFException
+ {
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(HOST_FIELD,host),
+ new UnitaryClause(PATH_FIELD,path)});
+ IResultSet set = performQuery("SELECT "+constructCountClause(UID_FIELD)+" AS countval FROM "+getTableName()+" WHERE "+query+" "+constructOffsetLimitClause(0,maximumNumber),params,null,null);
+ long count;
+ if (set.getRowCount() > 0)
+ {
+ IResultRow row = set.getRow(0);
+ Long countVal = (Long)row.getValue("countval");
+ count = countVal.longValue();
+ }
+ else
+ count = 0L;
+
+ return count >= maximumNumber;
+ }
+
/** Read a chunk of documents.
*/
public DocumentRecord[] readChunk(String host, String path, int maximumNumber)