blob: 01d7410b9b25a9a443b550cad19e3b6521a05ceb [file] [log] [blame]
/* $Id: IncrementalIngester.java 988245 2010-08-23 18:39:35Z kwright $ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.incrementalingest;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.agents.system.Logging;
import org.apache.manifoldcf.agents.system.ManifoldCF;
import java.util.*;
import java.io.*;
/** Incremental ingestion API implementation.
* This class is responsible for keeping track of what has been sent where, and also the corresponding version of
* each document so indexed. The space over which this takes place is defined by the individual output connection - that is, the output connection
* seems to "remember" what documents were handed to it.
*
* A secondary purpose of this module is to provide a mapping between the key by which a document is described internally (by an
* identifier hash, plus the name of an identifier space), and the way the document is identified in the output space (by the name of an
* output connection, plus a URI which is considered local to that output connection space).
*
* <br><br>
* <b>ingeststatus</b>
* <table border="1" cellpadding="3" cellspacing="0">
* <tr class="TableHeadingColor">
* <th>Field</th><th>Type</th><th>Description&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</th>
* <tr><td>id</td><td>BIGINT</td><td>Primary Key</td></tr>
* <tr><td>connectionname</td><td>VARCHAR(32)</td><td>Reference:outputconnections.connectionname</td></tr>
* <tr><td>dockey</td><td>VARCHAR(73)</td><td></td></tr>
* <tr><td>docuri</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>urihash</td><td>VARCHAR(40)</td><td></td></tr>
* <tr><td>lastversion</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>lastoutputversion</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>forcedparams</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>changecount</td><td>BIGINT</td><td></td></tr>
* <tr><td>firstingest</td><td>BIGINT</td><td></td></tr>
* <tr><td>lastingest</td><td>BIGINT</td><td></td></tr>
* <tr><td>authorityname</td><td>VARCHAR(32)</td><td></td></tr>
* </table>
* <br><br>
*
*/
public class IncrementalIngester extends org.apache.manifoldcf.core.database.BaseTable implements IIncrementalIngester
{
public static final String _rcsid = "@(#)$Id: IncrementalIngester.java 988245 2010-08-23 18:39:35Z kwright $";
// Fields
protected final static String idField = "id";
protected final static String outputConnNameField = "connectionname";
protected final static String docKeyField = "dockey";
protected final static String docURIField = "docuri";
protected final static String uriHashField = "urihash";
protected final static String lastVersionField = "lastversion";
protected final static String lastOutputVersionField = "lastoutputversion";
protected final static String forcedParamsField = "forcedparams";
protected final static String changeCountField = "changecount";
protected final static String firstIngestField = "firstingest";
protected final static String lastIngestField = "lastingest";
protected final static String authorityNameField = "authorityname";
// Thread context.
protected IThreadContext threadContext;
// Lock manager.
protected ILockManager lockManager;
// Output connection manager
protected IOutputConnectionManager connectionManager;
/** Constructor.
*/
public IncrementalIngester(IThreadContext threadContext, IDBInterface database)
throws ManifoldCFException
{
super(database,"ingeststatus");
this.threadContext = threadContext;
lockManager = LockManagerFactory.make(threadContext);
connectionManager = OutputConnectionManagerFactory.make(threadContext);
}
/** Install the incremental ingestion manager.
*/
public void install()
throws ManifoldCFException
{
String outputConnectionTableName = connectionManager.getTableName();
String outputConnectionNameField = connectionManager.getConnectionNameColumn();
// We always include an outer loop, because some upgrade conditions require retries.
while (true)
{
// Postgresql has a limitation on the number of characters that can be indexed in a column. So we use hashes instead.
Map existing = getTableSchema(null,null);
if (existing == null)
{
HashMap map = new HashMap();
map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
// The document URI field, if null, indicates that the document was not actually ingested!
// This happens when a connector wishes to keep track of a version string, but not actually ingest the doc.
map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
map.put(lastVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(forcedParamsField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(changeCountField,new ColumnDescription("BIGINT",false,false,null,null,false));
map.put(firstIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
map.put(lastIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
map.put(authorityNameField,new ColumnDescription("VARCHAR(32)",false,true,null,null,false));
performCreate(map,null);
}
else
{
// Schema upgrade from 1.1 to 1.2
ColumnDescription cd = (ColumnDescription)existing.get(forcedParamsField);
if (cd == null)
{
Map<String,ColumnDescription> addMap = new HashMap<String,ColumnDescription>();
addMap.put(forcedParamsField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
performAlter(addMap,null,null,null);
}
}
// Now, do indexes
IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField});
IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField,outputConnNameField});
IndexDescription outputConnIndex = new IndexDescription(false,new String[]{outputConnNameField});
// Get rid of indexes that shouldn't be there
Map indexes = getTableIndexes(null,null);
Iterator iter = indexes.keySet().iterator();
while (iter.hasNext())
{
String indexName = (String)iter.next();
IndexDescription id = (IndexDescription)indexes.get(indexName);
if (keyIndex != null && id.equals(keyIndex))
keyIndex = null;
else if (uriHashIndex != null && id.equals(uriHashIndex))
uriHashIndex = null;
else if (outputConnIndex != null && id.equals(outputConnIndex))
outputConnIndex = null;
else if (indexName.indexOf("_pkey") == -1)
// This index shouldn't be here; drop it
performRemoveIndex(indexName);
}
// Add the ones we didn't find
if (uriHashIndex != null)
performAddIndex(null,uriHashIndex);
if (keyIndex != null)
performAddIndex(null,keyIndex);
if (outputConnIndex != null)
performAddIndex(null,outputConnIndex);
// All done; break out of loop
break;
}
}
/** Uninstall the incremental ingestion manager.
*/
public void deinstall()
throws ManifoldCFException
{
performDrop(null);
}
/** Flush all knowledge of what was ingested before.
*/
@Override
public void clearAll()
throws ManifoldCFException
{
performDelete("",null,null);
}
/** Check if a mime type is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
*@param mimeType is the mime type to check.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription, String mimeType)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.checkMimeTypeIndexable(outputDescription,mimeType);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Check if a file is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
*@param localFile is the local file to check.
*@return true if the local file is indexable.
*/
@Override
public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription, File localFile)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.checkDocumentIndexable(outputDescription,localFile);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
*@param length is the length of the document.
*@return true if the file is indexable.
*/
@Override
public boolean checkLengthIndexable(String outputConnectionName, String outputDescription, long length)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.checkLengthIndexable(outputDescription,length);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
*@param url is the url of the document.
*@return true if the file is indexable.
*/
@Override
public boolean checkURLIndexable(String outputConnectionName, String outputDescription, String url)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.checkURLIndexable(outputDescription,url);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Get an output version string for a document.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param spec is the output specification.
*@return the description string.
*/
@Override
public String getOutputDescription(String outputConnectionName, OutputSpecification spec)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.getOutputDescription(spec);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Record a document version, but don't ingest it.
* The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
* ServiceInterruption is thrown if this action must be rescheduled.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
*@param recordTime is the time at which the recording took place, in milliseconds since epoch.
*@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
*/
@Override
public void documentRecord(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
long recordTime, IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
}
performIngestion(connection,docKey,documentVersion,null,null,null,null,recordTime,null,activities);
}
/** Ingest a document.
* This ingests the document, and notes it. If this is a repeat ingestion of the document, this
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
*@param outputVersion is the output version string constructed from the output specification by the output connector.
*@param authorityName is the name of the authority associated with the document, if any.
*@param data is the document data. The data is closed after ingestion is complete.
*@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
*@param documentURI is the URI of the document, which will be used as the key of the document in the index.
*@param activities is an object providing a set of methods that the implementer can use to perform the operation.
*@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
*/
@Override
public boolean documentIngest(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
String outputVersion,
String authorityName,
RepositoryDocument data,
long ingestTime, String documentURI,
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
return documentIngest(outputConnectionName,
identifierClass,
identifierHash,
documentVersion,
outputVersion,
null,
authorityName,
data,
ingestTime,
documentURI,
activities);
}
/** Ingest a document.
* This ingests the document, and notes it. If this is a repeat ingestion of the document, this
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
*@param parameterVersion is the forced parameter version.
*@param outputVersion is the output version string constructed from the output specification by the output connector.
*@param authorityName is the name of the authority associated with the document, if any.
*@param data is the document data. The data is closed after ingestion is complete.
*@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
*@param documentURI is the URI of the document, which will be used as the key of the document in the index.
*@param activities is an object providing a set of methods that the implementer can use to perform the operation.
*@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated).
*/
public boolean documentIngest(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
String outputVersion,
String parameterVersion,
String authorityName,
RepositoryDocument data,
long ingestTime, String documentURI,
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
}
return performIngestion(connection,docKey,documentVersion,outputVersion,parameterVersion,authorityName,
data,ingestTime,documentURI,activities);
}
/** Do the actual ingestion, or just record it if there's nothing to ingest. */
protected boolean performIngestion(IOutputConnection connection,
String docKey, String documentVersion, String outputVersion, String parameterVersion,
String authorityNameString,
RepositoryDocument data,
long ingestTime, String documentURI,
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
// No transactions; not safe because post may take too much time
// First, calculate a document uri hash value
String documentURIHash = null;
if (documentURI != null)
documentURIHash = ManifoldCF.hash(documentURI);
String oldURI = null;
String oldURIHash = null;
String oldOutputVersion = null;
while (true)
{
long sleepAmt = 0L;
try
{
// See what uri was used before for this doc, if any
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(docKeyField,docKey),
new UnitaryClause(outputConnNameField,connection.getName())});
IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
" WHERE "+query,list,null,null);
if (set.getRowCount() > 0)
{
IResultRow row = set.getRow(0);
oldURI = (String)row.getValue(docURIField);
oldURIHash = (String)row.getValue(uriHashField);
oldOutputVersion = (String)row.getValue(lastOutputVersionField);
}
break;
}
catch (ManifoldCFException e)
{
// Look for deadlock and retry if so
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
{
if (Logging.perf.isDebugEnabled())
Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
sleepAmt = getSleepAmt();
continue;
}
throw e;
}
finally
{
sleepFor(sleepAmt);
}
}
// If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
// dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well.
// But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
// to block the rare case that multiple threads try to work on the same URI.
int uriCount = 0;
if (documentURI != null)
uriCount++;
if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
uriCount++;
String[] lockArray = new String[uriCount];
uriCount = 0;
if (documentURI != null)
lockArray[uriCount++] = connection.getName()+":"+documentURI;
if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
lockArray[uriCount++] = connection.getName()+":"+oldURI;
lockManager.enterCriticalSections(null,null,lockArray);
try
{
ArrayList list = new ArrayList();
if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
{
// Delete all records from the database that match the old URI, except for THIS record.
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(uriHashField,"=",oldURIHash),
new UnitaryClause(outputConnNameField,"=",connection.getName())});
list.add(docKey);
performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
removeDocument(connection,oldURI,oldOutputVersion,activities);
}
if (documentURI != null)
{
// Get rid of all records that match the NEW uri, except for this record.
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(uriHashField,"=",documentURIHash),
new UnitaryClause(outputConnNameField,"=",connection.getName())});
list.add(docKey);
performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
}
// Now, we know we are ready for the ingest.
if (documentURI != null)
{
// Here are the cases:
// 1) There was a service interruption before the upload started.
// (In that case, we don't need to log anything, just reschedule).
// 2) There was a service interruption after the document was transmitted.
// (In that case, we should presume that the document was ingested, but
// reschedule another import anyway.)
// 3) Everything went OK
// (need to log the ingestion.)
// 4) Everything went OK, but we were told we have an illegal document.
// (We note the ingestion because if we don't we will be forced to repeat ourselves.
// In theory, document doesn't need to be deleted, but there is no way to signal
// that at the moment.)
// Note an ingestion before we actually try it.
// This is a marker that says "something is there"; it has an empty version, which indicates
// that we don't know anything about it. That means it will be reingested when the
// next version comes along, and will be deleted if called for also.
noteDocumentIngest(connection.getName(),docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
int result = addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
}
// If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
// to noteDocumentIngest by having the null documentURI.
noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
return true;
}
finally
{
lockManager.leaveCriticalSections(null,null,lockArray);
}
}
/** Note the fact that we checked a document (and found that it did not need to be ingested, because the
* versions agreed).
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes are the set of document identifier hashes.
*@param checkTime is the time at which the check took place, in milliseconds since epoch.
*/
@Override
public void documentCheckMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
long checkTime)
throws ManifoldCFException
{
beginTransaction();
try
{
int maxClauses;
HashMap docIDValues = new HashMap();
int j = 0;
while (j < identifierHashes.length)
{
String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
docIDValues.put(docDBString,docDBString);
j++;
}
// Now, perform n queries, each of them no larger the maxInClause in length.
// Create a list of row id's from this.
HashMap rowIDSet = new HashMap();
Iterator iter = docIDValues.keySet().iterator();
j = 0;
ArrayList list = new ArrayList();
maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
while (iter.hasNext())
{
if (j == maxClauses)
{
findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
// Now, break row id's into chunks too; submit one chunk at a time
j = 0;
list.clear();
iter = rowIDSet.keySet().iterator();
maxClauses = maxClausesUpdateRowIds();
while (iter.hasNext())
{
if (j == maxClauses)
{
updateRowIds(list,checkTime);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
updateRowIds(list,checkTime);
}
catch (ManifoldCFException e)
{
signalRollback();
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
}
/** Note the fact that we checked a document (and found that it did not need to be ingested, because the
* versions agreed).
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param checkTime is the time at which the check took place, in milliseconds since epoch.
*/
public void documentCheck(String outputConnectionName,
String identifierClass, String identifierHash,
long checkTime)
throws ManifoldCFException
{
documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
}
/** Calculate the number of clauses.
*/
protected int maxClausesUpdateRowIds()
{
return findConjunctionClauseMax(new ClauseDescription[]{});
}
/** Update a chunk of row ids.
*/
protected void updateRowIds(ArrayList list, long checkTime)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(idField,list)});
HashMap map = new HashMap();
map.put(lastIngestField,new Long(checkTime));
performUpdate(map,"WHERE "+query,newList,null);
}
/** Delete multiple documents from the search engine index.
*@param outputConnectionNames are the names of the output connections associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDeleteMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
// Segregate request by connection names
HashMap keyMap = new HashMap();
int i = 0;
while (i < outputConnectionNames.length)
{
String outputConnectionName = outputConnectionNames[i];
ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
if (list == null)
{
list = new ArrayList();
keyMap.put(outputConnectionName,list);
}
list.add(new Integer(i));
i++;
}
// Create the return array.
Iterator iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
String outputConnectionName = (String)iter.next();
ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
i = 0;
while (i < localIdentifierClasses.length)
{
int index = ((Integer)list.get(i)).intValue();
localIdentifierClasses[i] = identifierClasses[index];
localIdentifierHashes[i] = identifierHashes[index];
i++;
}
documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
}
}
/** Delete multiple documents from the search engine index.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDeleteMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnection connection = connectionManager.load(outputConnectionName);
if (Logging.ingest.isDebugEnabled())
{
int i = 0;
while (i < identifierHashes.length)
{
Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
i++;
}
}
// No transactions. Time for the operation may exceed transaction timeout.
// Obtain the current URIs of all of these.
DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
// Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
// (This guarantees that when this operation is complete the database reflects reality.)
int validURIcount = 0;
int i = 0;
while (i < uris.length)
{
if (uris[i] != null && uris[i].getURI() != null)
validURIcount++;
i++;
}
String[] lockArray = new String[validURIcount];
String[] validURIArray = new String[validURIcount];
validURIcount = 0;
i = 0;
while (i < uris.length)
{
if (uris[i] != null && uris[i].getURI() != null)
{
validURIArray[validURIcount] = uris[i].getURI();
lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
validURIcount++;
}
i++;
}
lockManager.enterCriticalSections(null,null,lockArray);
try
{
// Fetch the document URIs for the listed documents
int j = 0;
while (j < uris.length)
{
if (uris[j] != null && uris[j].getURI() != null)
removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
j++;
}
// Now, get rid of all rows that match the given uris.
// Do the queries together, then the deletes
beginTransaction();
try
{
// The basic process is this:
// 1) Come up with a set of urihash values
// 2) Find the matching, corresponding id values
// 3) Delete the rows corresponding to the id values, in sequence
// Process (1 & 2) has to be broken down into chunks that contain the maximum
// number of doc hash values each. We need to avoid repeating doc hash values,
// so the first step is to come up with ALL the doc hash values before looping
// over them.
int maxClauses;
// Find all the documents that match this set of URIs
HashMap docURIHashValues = new HashMap();
HashMap docURIValues = new HashMap();
j = 0;
while (j < validURIArray.length)
{
String docDBString = validURIArray[j++];
String docDBHashString = ManifoldCF.hash(docDBString);
docURIValues.put(docDBString,docDBString);
docURIHashValues.put(docDBHashString,docDBHashString);
}
// Now, perform n queries, each of them no larger the maxInClause in length.
// Create a list of row id's from this.
HashMap rowIDSet = new HashMap();
Iterator iter = docURIHashValues.keySet().iterator();
j = 0;
ArrayList hashList = new ArrayList();
maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
while (iter.hasNext())
{
if (j == maxClauses)
{
findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
hashList.clear();
j = 0;
}
hashList.add(iter.next());
j++;
}
if (j > 0)
findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
// Next, go through the list of row IDs, and delete them in chunks
j = 0;
ArrayList list = new ArrayList();
iter = rowIDSet.keySet().iterator();
maxClauses = maxClausesDeleteRowIds();
while (iter.hasNext())
{
if (j == maxClauses)
{
deleteRowIds(list);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
deleteRowIds(list);
// Now, find the set of documents that remain that match the document identifiers.
HashMap docIdValues = new HashMap();
j = 0;
while (j < identifierHashes.length)
{
String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
docIdValues.put(docDBString,docDBString);
j++;
}
// Now, perform n queries, each of them no larger the maxInClause in length.
// Create a list of row id's from this.
rowIDSet.clear();
iter = docIdValues.keySet().iterator();
j = 0;
list.clear();
maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
while (iter.hasNext())
{
if (j == maxClauses)
{
findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
// Next, go through the list of row IDs, and delete them in chunks
j = 0;
list.clear();
iter = rowIDSet.keySet().iterator();
maxClauses = maxClausesDeleteRowIds();
while (iter.hasNext())
{
if (j == maxClauses)
{
deleteRowIds(list);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
deleteRowIds(list);
}
catch (ManifoldCFException e)
{
signalRollback();
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
}
finally
{
lockManager.leaveCriticalSections(null,null,lockArray);
}
}
/** Calculate the clauses.
*/
protected int maxClausesRowIdsForURIs(String outputConnectionName)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
/** Given values and parameters corresponding to a set of hash values, add corresponding
* table row id's to the output map.
*/
protected void findRowIdsForURIs(String outputConnectionName, HashMap rowIDSet, HashMap uris, ArrayList hashParamValues)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(uriHashField,hashParamValues),
new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
getTableName()+" WHERE "+query,list,null,null);
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
String docURI = (String)row.getValue(docURIField);
if (docURI != null && docURI.length() > 0)
{
if (uris.get(docURI) != null)
{
Long rowID = (Long)row.getValue(idField);
rowIDSet.put(rowID,rowID);
}
}
}
}
/** Calculate the maximum number of doc ids we should use.
*/
protected int maxClausesRowIdsForDocIds(String outputConnectionName)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
/** Given values and parameters corresponding to a set of hash values, add corresponding
* table row id's to the output map.
*/
protected void findRowIdsForDocIds(String outputConnectionName, HashMap rowIDSet, ArrayList paramValues)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(docKeyField,paramValues),
new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+idField+" FROM "+
getTableName()+" WHERE "+query,list,null,null);
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
Long rowID = (Long)row.getValue(idField);
rowIDSet.put(rowID,rowID);
}
}
/** Calculate the maximum number of clauses.
*/
protected int maxClausesDeleteRowIds()
{
return findConjunctionClauseMax(new ClauseDescription[]{});
}
/** Delete a chunk of row ids.
*/
protected void deleteRowIds(ArrayList list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(idField,list)});
performDelete("WHERE "+query,newList,null);
}
/** Delete a document from the search engine index.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDelete(String outputConnectionName,
String identifierClass, String identifierHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
documentDeleteMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},activities);
}
/** Find out what URIs a SET of document URIs are currently ingested.
*@param identifierHashes is the array of document id's to check.
*@return the array of current document uri's. Null returned for identifiers
* that don't exist in the index.
*/
protected DeleteInfo[] getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
HashMap map = new HashMap();
int i = 0;
while (i < identifierHashes.length)
{
map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
rval[i] = null;
i++;
}
beginTransaction();
try
{
ArrayList list = new ArrayList();
int maxCount = maxClauseDocumentURIChunk(outputConnectionName);
int j = 0;
Iterator iter = map.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
{
getDocumentURIChunk(rval,map,outputConnectionName,list);
j = 0;
list.clear();
}
list.add(iter.next());
j++;
}
if (j > 0)
getDocumentURIChunk(rval,map,outputConnectionName,list);
return rval;
}
catch (ManifoldCFException e)
{
signalRollback();
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
}
/** Look up ingestion data for a SET of documents.
*@param outputConnectionNames are the names of the output connections associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the array of document identifier hashes to look up.
*@return the array of document data. Null will come back for any identifier that doesn't
* exist in the index.
*/
@Override
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
// Segregate request by connection names
HashMap keyMap = new HashMap();
int i = 0;
while (i < outputConnectionNames.length)
{
String outputConnectionName = outputConnectionNames[i];
ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
if (list == null)
{
list = new ArrayList();
keyMap.put(outputConnectionName,list);
}
list.add(new Integer(i));
i++;
}
// Create the return array.
DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
Iterator iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
String outputConnectionName = (String)iter.next();
ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
i = 0;
while (i < localIdentifierClasses.length)
{
int index = ((Integer)list.get(i)).intValue();
localIdentifierClasses[i] = identifierClasses[index];
localIdentifierHashes[i] = identifierHashes[index];
i++;
}
DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
i = 0;
while (i < localRval.length)
{
int index = ((Integer)list.get(i)).intValue();
rval[index] = localRval[i];
i++;
}
}
return rval;
}
/** Look up ingestion data for a SET of documents.
*@param outputConnectionName is the names of the output connection associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the array of document identifier hashes to look up.
*@return the array of document data. Null will come back for any identifier that doesn't
* exist in the index.
*/
@Override
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
// Build the return array
DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
// Build a map, so we can convert an identifier into an array index.
HashMap indexMap = new HashMap();
int i = 0;
while (i < identifierHashes.length)
{
indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
rval[i] = null;
i++;
}
beginTransaction();
try
{
ArrayList list = new ArrayList();
int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
int j = 0;
Iterator iter = indexMap.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
{
getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
j = 0;
list.clear();
}
list.add(iter.next());
j++;
}
if (j > 0)
getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
return rval;
}
catch (ManifoldCFException e)
{
signalRollback();
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
}
/** Look up ingestion data for a documents.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@return the current document's ingestion data, or null if the document is not currently ingested.
*/
@Override
public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
{
return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
}
/** Calculate the average time interval between changes for a document.
* This is based on the data gathered for the document.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
@Override
public long getDocumentUpdateInterval(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
{
return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
}
/** Calculate the average time interval between changes for a document.
* This is based on the data gathered for the document.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the hashes of the ids of the documents.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
@Override
public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
// Do these all at once!!
// First, create a return array
long[] rval = new long[identifierHashes.length];
// Also create a map from identifier to return index.
HashMap returnMap = new HashMap();
// Finally, need the set of hash codes
HashMap idCodes = new HashMap();
int j = 0;
while (j < identifierHashes.length)
{
String key = makeKey(identifierClasses[j],identifierHashes[j]);
rval[j] = 0L;
returnMap.put(key,new Integer(j));
idCodes.put(key,key);
j++;
}
// Get the chunk size
int maxClause = maxClauseGetIntervals(outputConnectionName);
// Loop through the hash codes
Iterator iter = idCodes.keySet().iterator();
ArrayList list = new ArrayList();
j = 0;
while (iter.hasNext())
{
if (j == maxClause)
{
getIntervals(rval,outputConnectionName,list,returnMap);
list.clear();
j = 0;
}
list.add(iter.next());
j++;
}
if (j > 0)
getIntervals(rval,outputConnectionName,list,returnMap);
return rval;
}
/** Calculate the number of clauses.
*/
protected int maxClauseGetIntervals(String outputConnectionName)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
/** Query for and calculate the interval for a bunch of hashcodes.
*@param rval is the array to stuff calculated return values into.
*@param list is the list of parameters.
*@param queryPart is the part of the query pertaining to the list of hashcodes
*@param returnMap is a mapping from document id to rval index.
*/
protected void getIntervals(long[] rval, String outputConnectionName, ArrayList list, HashMap returnMap)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(docKeyField,list),
new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+docKeyField+","+changeCountField+","+firstIngestField+","+lastIngestField+
" FROM "+getTableName()+" WHERE "+query,newList,null,null);
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
String docHash = (String)row.getValue(docKeyField);
Integer index = (Integer)returnMap.get(docHash);
if (index != null)
{
// Calculate the return value
long changeCount = ((Long)row.getValue(changeCountField)).longValue();
long firstIngest = ((Long)row.getValue(firstIngestField)).longValue();
long lastIngest = ((Long)row.getValue(lastIngestField)).longValue();
rval[index.intValue()] = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
}
}
}
/** Reset all documents belonging to a specific output connection, because we've got information that
* that system has been reconfigured. This will force all such documents to be reindexed the next time
* they are checked.
*@param outputConnectionName is the name of the output connection associated with this action.
*/
public void resetOutputConnection(String outputConnectionName)
throws ManifoldCFException
{
// We're not going to blow away the records, but we are going to set their versions to mean, "reindex required"
HashMap map = new HashMap();
map.put(lastVersionField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
performUpdate(map,"WHERE "+query,list,null);
}
/** Note the ingestion of a document, or the "update" of a document.
*@param outputConnectionName is the name of the output connection.
*@param docKey is the key string describing the document.
*@param documentVersion is a string describing the new version of the document.
*@param outputVersion is the version string calculated for the output connection.
*@param authorityNameString is the name of the relevant authority connection.
*@param packedForcedParameters is the string we use to determine differences in packed parameters.
*@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
*@param documentURI is the uri the document can be accessed at, or null (which signals that we are to record the version, but no
* ingestion took place).
*@param documentURIHash is the hash of the document uri.
*/
protected void noteDocumentIngest(String outputConnectionName,
String docKey, String documentVersion,
String outputVersion, String packedForcedParameters,
String authorityNameString,
long ingestTime, String documentURI, String documentURIHash)
throws ManifoldCFException
{
HashMap map = new HashMap();
while (true)
{
// The table can have at most one row per URI, for non-null URIs. It can also have at most one row per document identifier.
// However, for null URI's, multiple rows are allowed. Null URIs have a special meaning, which is that
// the document was not actually ingested.
// To make sure the constraints are enforced, we cannot simply look for the row and insert one if not found. This is because
// postgresql does not cause a lock to be created on rows that don't yet exist, so multiple transactions of the kind described
// can lead to multiple rows with the same key. Instead, we *could* lock the whole table down, but that would interfere with
// parallelism. The lowest-impact approach is to make sure an index constraint is in place, and first attempt to do an INSERT.
// That attempt will fail if a record already exists. Then, an update can be attempted.
//
// In the situation where the INSERT fails, the current transaction is aborted and a new transaction must be performed.
// This means that it is impossible to structure things so that the UPDATE is guaranteed to succeed. So, on the event of an
// INSERT failure, the UPDATE is tried, but if that fails too, then the INSERT is tried again. This should also handle the
// case where a DELETE in another transaction removes the database row before it can be UPDATEd.
//
// If the UPDATE does not appear to modify any rows, this is also a signal that the INSERT must be retried.
//
// Try the update first. Typically this succeeds except in the case where a doc is indexed for the first time.
map.clear();
map.put(lastVersionField,documentVersion);
map.put(lastOutputVersionField,outputVersion);
map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
if (documentURI != null)
{
map.put(docURIField,documentURI);
map.put(uriHashField,documentURIHash);
}
if (authorityNameString != null)
map.put(authorityNameField,authorityNameString);
else
map.put(authorityNameField,"");
// Transaction abort due to deadlock should be retried here.
while (true)
{
long sleepAmt = 0L;
beginTransaction();
try
{
// Look for existing row.
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(docKeyField,docKey),
new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+idField+","+changeCountField+" FROM "+getTableName()+" WHERE "+
query+" FOR UPDATE",list,null,null);
IResultRow row = null;
if (set.getRowCount() > 0)
row = set.getRow(0);
if (row != null)
{
// Update the record
list.clear();
query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(idField,row.getValue(idField))});
long changeCount = ((Long)row.getValue(changeCountField)).longValue();
changeCount++;
map.put(changeCountField,new Long(changeCount));
performUpdate(map,"WHERE "+query,list,null);
// Update successful!
performCommit();
return;
}
// Update failed to find a matching record, so try the insert
break;
}
catch (ManifoldCFException e)
{
signalRollback();
if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
{
if (Logging.perf.isDebugEnabled())
Logging.perf.debug("Aborted transaction noting ingestion: "+e.getMessage());
sleepAmt = getSleepAmt();
continue;
}
throw e;
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
sleepFor(sleepAmt);
}
}
// Set up for insert
map.clear();
map.put(lastVersionField,documentVersion);
map.put(lastOutputVersionField,outputVersion);
map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
if (documentURI != null)
{
map.put(docURIField,documentURI);
map.put(uriHashField,documentURIHash);
}
if (authorityNameString != null)
map.put(authorityNameField,authorityNameString);
else
map.put(authorityNameField,"");
Long id = new Long(IDFactory.make(threadContext));
map.put(idField,id);
map.put(outputConnNameField,outputConnectionName);
map.put(docKeyField,docKey);
map.put(changeCountField,new Long(1));
map.put(firstIngestField,map.get(lastIngestField));
beginTransaction();
try
{
performInsert(map,null);
noteModifications(1,0,0);
performCommit();
return;
}
catch (ManifoldCFException e)
{
signalRollback();
// If this is simply a constraint violation, we just want to fall through and try the update!
if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
throw e;
// Otherwise, exit transaction and fall through to 'update' attempt
}
catch (Error e)
{
signalRollback();
throw e;
}
finally
{
endTransaction();
}
// Insert must have failed. Attempt an update.
}
}
/** Calculate how many clauses at a time
*/
protected int maxClauseDocumentURIChunk(String outputConnectionName)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
/** Get a chunk of document uris.
*@param rval is the string array where the uris should be put.
*@param map is the map from id to index.
*@param clause is the in clause for the query.
*@param list is the parameter list for the query.
*/
protected void getDocumentURIChunk(DeleteInfo[] rval, Map map, String outputConnectionName, ArrayList list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(docKeyField,list),
new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+
query,newList,null,null);
// Go through list and put into buckets.
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
String docHash = row.getValue(docKeyField).toString();
Integer position = (Integer)map.get(docHash);
if (position != null)
{
String lastURI = (String)row.getValue(docURIField);
if (lastURI != null && lastURI.length() == 0)
lastURI = null;
String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
rval[position.intValue()] = new DeleteInfo(lastURI,lastOutputVersion);
}
}
}
/** Count the clauses
*/
protected int maxClauseDocumentIngestDataChunk(String outputConnectionName)
{
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
/** Get a chunk of document ingest data records.
*@param rval is the document ingest status array where the data should be put.
*@param map is the map from id to index.
*@param clause is the in clause for the query.
*@param list is the parameter list for the query.
*/
protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map map, String outputConnectionName, ArrayList list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(docKeyField,list),
new UnitaryClause(outputConnNameField,outputConnectionName)});
// Get the primary records associated with this hash value
IResultSet set = performQuery("SELECT "+idField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+
" FROM "+getTableName()+" WHERE "+query,newList,null,null);
// Now, go through the original request once more, this time building the result
int i = 0;
while (i < set.getRowCount())
{
IResultRow row = set.getRow(i++);
String docHash = row.getValue(docKeyField).toString();
Integer position = (Integer)map.get(docHash);
if (position != null)
{
Long id = (Long)row.getValue(idField);
String lastVersion = (String)row.getValue(lastVersionField);
String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
String authorityName = (String)row.getValue(authorityNameField);
String paramVersion = (String)row.getValue(forcedParamsField);
rval[position.intValue()] = new DocumentIngestStatus(lastVersion,lastOutputVersion,authorityName,paramVersion);
}
}
}
// Protected methods
/** Add or replace document, using the specified output connection, via the standard pool.
*/
protected int addOrReplaceDocument(IOutputConnection connection, String documentURI, String outputDescription,
RepositoryDocument document, String authorityNameString,
IOutputAddActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
return connector.addOrReplaceDocument(documentURI,outputDescription,document,authorityNameString,activities);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Remove document, using the specified output connection, via the standard pool.
*/
protected void removeDocument(IOutputConnection connection, String documentURI, String outputDescription, IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
try
{
connector.removeDocument(documentURI,outputDescription,activities);
}
finally
{
OutputConnectorFactory.release(connector);
}
}
/** Make a key from a document class and a hash */
protected static String makeKey(String documentClass, String documentHash)
{
return documentClass + ":" + documentHash;
}
/** This class contains the information necessary to delete a document */
protected static class DeleteInfo
{
protected String uriValue;
protected String outputVersion;
public DeleteInfo(String uriValue, String outputVersion)
{
this.uriValue = uriValue;
this.outputVersion = outputVersion;
}
public String getURI()
{
return uriValue;
}
public String getOutputVersion()
{
return outputVersion;
}
}
}