| /* $Id: IncrementalIngester.java 988245 2010-08-23 18:39:35Z kwright $ */ |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.manifoldcf.agents.incrementalingest; |
| |
| import org.apache.manifoldcf.core.interfaces.*; |
| import org.apache.manifoldcf.agents.interfaces.*; |
| import org.apache.manifoldcf.agents.system.Logging; |
| import org.apache.manifoldcf.agents.system.ManifoldCF; |
| import java.util.*; |
| import java.io.*; |
| |
| /** Incremental ingestion API implementation. |
| * This class is responsible for keeping track of what has been sent where, and also the corresponding version of |
| * each document so indexed. The space over which this takes place is defined by the individual output connection - that is, the output connection |
| * seems to "remember" what documents were handed to it. |
| * |
| * A secondary purpose of this module is to provide a mapping between the key by which a document is described internally (by an |
| * identifier hash, plus the name of an identifier space), and the way the document is identified in the output space (by the name of an |
| * output connection, plus a URI which is considered local to that output connection space). |
| * |
| * <br><br> |
| * <b>ingeststatus</b> |
| * <table border="1" cellpadding="3" cellspacing="0"> |
| * <tr class="TableHeadingColor"> |
| * <th>Field</th><th>Type</th><th>Description </th> |
| * <tr><td>id</td><td>BIGINT</td><td>Primary Key</td></tr> |
| * <tr><td>connectionname</td><td>VARCHAR(32)</td><td>Reference:outputconnections.connectionname</td></tr> |
| * <tr><td>dockey</td><td>VARCHAR(73)</td><td></td></tr> |
| * <tr><td>componenthash</td><td>VARCHAR(40)</td><td></td></tr> |
| * <tr><td>docuri</td><td>LONGTEXT</td><td></td></tr> |
| * <tr><td>urihash</td><td>VARCHAR(40)</td><td></td></tr> |
| * <tr><td>lastversion</td><td>LONGTEXT</td><td></td></tr> |
| * <tr><td>lastoutputversion</td><td>LONGTEXT</td><td></td></tr> |
| * <tr><td>lasttransformationversion</td><td>LONGTEXT</td><td></td></tr> |
| * <tr><td>changecount</td><td>BIGINT</td><td></td></tr> |
| * <tr><td>firstingest</td><td>BIGINT</td><td></td></tr> |
| * <tr><td>lastingest</td><td>BIGINT</td><td></td></tr> |
| * <tr><td>authorityname</td><td>VARCHAR(32)</td><td></td></tr> |
| * </table> |
| * <br><br> |
| * |
| */ |
| public class IncrementalIngester extends org.apache.manifoldcf.core.database.BaseTable implements IIncrementalIngester |
| { |
| public static final String _rcsid = "@(#)$Id: IncrementalIngester.java 988245 2010-08-23 18:39:35Z kwright $"; |
| |
| // Fields |
| protected final static String idField = "id"; |
| protected final static String outputConnNameField = "connectionname"; |
| protected final static String docKeyField = "dockey"; |
| protected final static String componentHashField = "componenthash"; |
| protected final static String docURIField = "docuri"; |
| protected final static String uriHashField = "urihash"; |
| protected final static String lastVersionField = "lastversion"; |
| protected final static String lastOutputVersionField = "lastoutputversion"; |
| protected final static String lastTransformationVersionField = "lasttransformationversion"; |
| protected final static String changeCountField = "changecount"; |
| protected final static String firstIngestField = "firstingest"; |
| protected final static String lastIngestField = "lastingest"; |
| protected final static String authorityNameField = "authorityname"; |
| |
| // Thread context. |
| protected final IThreadContext threadContext; |
| // Lock manager. |
| protected final ILockManager lockManager; |
| // Output connection manager |
| protected final IOutputConnectionManager connectionManager; |
| // Output connector pool manager |
| protected final IOutputConnectorPool outputConnectorPool; |
| // Transformation connector pool manager |
| protected final ITransformationConnectorPool transformationConnectorPool; |
| |
| /** Constructor. |
| */ |
| public IncrementalIngester(IThreadContext threadContext, IDBInterface database) |
| throws ManifoldCFException |
| { |
| super(database,"ingeststatus"); |
| this.threadContext = threadContext; |
| lockManager = LockManagerFactory.make(threadContext); |
| connectionManager = OutputConnectionManagerFactory.make(threadContext); |
| outputConnectorPool = OutputConnectorPoolFactory.make(threadContext); |
| transformationConnectorPool = TransformationConnectorPoolFactory.make(threadContext); |
| } |
| |
| /** Install the incremental ingestion manager. |
| */ |
| @Override |
| public void install() |
| throws ManifoldCFException |
| { |
| String outputConnectionTableName = connectionManager.getTableName(); |
| String outputConnectionNameField = connectionManager.getConnectionNameColumn(); |
| |
| // We always include an outer loop, because some upgrade conditions require retries. |
| while (true) |
| { |
| // Postgresql has a limitation on the number of characters that can be indexed in a column. So we use hashes instead. |
| Map existing = getTableSchema(null,null); |
| if (existing == null) |
| { |
| HashMap map = new HashMap(); |
| map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false)); |
| map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false)); |
| map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false)); |
| map.put(componentHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false)); |
| // The document URI field, if null, indicates that the document was not actually ingested! |
| // This happens when a connector wishes to keep track of a version string, but not actually ingest the doc. |
| map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false)); |
| map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false)); |
| map.put(lastVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false)); |
| map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false)); |
| map.put(lastTransformationVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false)); |
| map.put(changeCountField,new ColumnDescription("BIGINT",false,false,null,null,false)); |
| map.put(firstIngestField,new ColumnDescription("BIGINT",false,false,null,null,false)); |
| map.put(lastIngestField,new ColumnDescription("BIGINT",false,false,null,null,false)); |
| map.put(authorityNameField,new ColumnDescription("VARCHAR(32)",false,true,null,null,false)); |
| performCreate(map,null); |
| } |
| else |
| { |
| // Upgrades from 2.0 onward go here |
| } |
| |
| // Now, do indexes |
| IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField,componentHashField}); |
| IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField,outputConnNameField}); |
| IndexDescription outputConnIndex = new IndexDescription(false,new String[]{outputConnNameField}); |
| |
| // Get rid of indexes that shouldn't be there |
| Map indexes = getTableIndexes(null,null); |
| Iterator iter = indexes.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| String indexName = (String)iter.next(); |
| IndexDescription id = (IndexDescription)indexes.get(indexName); |
| |
| if (keyIndex != null && id.equals(keyIndex)) |
| keyIndex = null; |
| else if (uriHashIndex != null && id.equals(uriHashIndex)) |
| uriHashIndex = null; |
| else if (outputConnIndex != null && id.equals(outputConnIndex)) |
| outputConnIndex = null; |
| else if (indexName.indexOf("_pkey") == -1) |
| // This index shouldn't be here; drop it |
| performRemoveIndex(indexName); |
| } |
| |
| // Add the ones we didn't find |
| if (uriHashIndex != null) |
| performAddIndex(null,uriHashIndex); |
| |
| if (keyIndex != null) |
| performAddIndex(null,keyIndex); |
| |
| if (outputConnIndex != null) |
| performAddIndex(null,outputConnIndex); |
| |
| // All done; break out of loop |
| break; |
| } |
| |
| } |
| |
| /** Uninstall the incremental ingestion manager. |
| */ |
| @Override |
| public void deinstall() |
| throws ManifoldCFException |
| { |
| performDrop(null); |
| } |
| |
| /** Flush all knowledge of what was ingested before. |
| */ |
| @Override |
| public void clearAll() |
| throws ManifoldCFException |
| { |
| performDelete("",null,null); |
| } |
| |
| /** From a pipeline specification, get the name of the output connection that will be indexed last |
| * in the pipeline. |
| *@param pipelineSpecificationBasic is the basic pipeline specification. |
| *@return the last indexed output connection name. |
| */ |
| @Override |
| public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic) |
| { |
| // It's always the last in the sequence. |
| int count = pipelineSpecificationBasic.getOutputCount(); |
| if (count == 0) |
| return null; |
| return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(count-1)); |
| } |
| |
| /** From a pipeline specification, get the name of the output connection that will be indexed first |
| * in the pipeline. |
| *@param pipelineSpecificationBasic is the basic pipeline specification. |
| *@return the first indexed output connection name. |
| */ |
| @Override |
| public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic) |
| { |
| if (pipelineSpecificationBasic.getOutputCount() == 0) |
| return null; |
| return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(0)); |
| } |
| |
| /** Check if a date is indexable. |
| *@param pipelineSpecification is the IPipelineSpecification object for this pipeline. |
| *@param date is the date to check. |
| *@param activity are the activities available to this method. |
| *@return true if the mimeType is indexable. |
| */ |
| @Override |
| public boolean checkDateIndexable( |
| IPipelineSpecification pipelineSpecification, |
| Date date, |
| IOutputCheckActivity activity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineObject pipeline = pipelineGrab(pipelineSpecification); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("One or more connectors are not installed",0L); |
| try |
| { |
| return pipeline.checkDateIndexable(date,activity); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Check if a mime type is indexable. |
| *@param pipelineSpecification is the IPipelineSpecification object for this pipeline. |
| *@param mimeType is the mime type to check. |
| *@param activity are the activities available to this method. |
| *@return true if the mimeType is indexable. |
| */ |
| @Override |
| public boolean checkMimeTypeIndexable( |
| IPipelineSpecification pipelineSpecification, |
| String mimeType, |
| IOutputCheckActivity activity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineObject pipeline = pipelineGrab(pipelineSpecification); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("One or more connectors are not installed",0L); |
| try |
| { |
| return pipeline.checkMimeTypeIndexable(mimeType,activity); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Check if a file is indexable. |
| *@param pipelineSpecification is the IPipelineSpecification object for this pipeline. |
| *@param localFile is the local file to check. |
| *@param activity are the activities available to this method. |
| *@return true if the local file is indexable. |
| */ |
| @Override |
| public boolean checkDocumentIndexable( |
| IPipelineSpecification pipelineSpecification, |
| File localFile, |
| IOutputCheckActivity activity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineObject pipeline = pipelineGrab(pipelineSpecification); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("One or more connectors are not installed",0L); |
| try |
| { |
| return pipeline.checkDocumentIndexable(localFile,activity); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors |
| * to help filter out documents that are too long to be indexable. |
| *@param pipelineSpecification is the IPipelineSpecification object for this pipeline. |
| *@param length is the length of the document. |
| *@param activity are the activities available to this method. |
| *@return true if the file is indexable. |
| */ |
| @Override |
| public boolean checkLengthIndexable( |
| IPipelineSpecification pipelineSpecification, |
| long length, |
| IOutputCheckActivity activity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineObject pipeline = pipelineGrab(pipelineSpecification); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("One or more connectors are not installed",0L); |
| try |
| { |
| return pipeline.checkLengthIndexable(length,activity); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors |
| * to help filter out documents that not indexable. |
| *@param pipelineSpecification is the IPipelineSpecification object for this pipeline. |
| *@param url is the url of the document. |
| *@param activity are the activities available to this method. |
| *@return true if the file is indexable. |
| */ |
| @Override |
| public boolean checkURLIndexable( |
| IPipelineSpecification pipelineSpecification, |
| String url, |
| IOutputCheckActivity activity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineObject pipeline = pipelineGrab(pipelineSpecification); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("One or more connectors are not installed",0L); |
| try |
| { |
| return pipeline.checkURLIndexable(url,activity); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Grab the entire pipeline. |
| *@param transformationConnections - the transformation connections, in order |
| *@param outputConnection - the output connection |
| *@param transformationDescriptionStrings - the array of description strings for transformations |
| *@param outputDescriptionString - the output description string |
| *@return the pipeline description, or null if any part of the pipeline cannot be grabbed. |
| */ |
| protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineSpecificationWithVersions pipelineConnections) |
| throws ManifoldCFException |
| { |
| // Pick up all needed transformation connectors |
| ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections()); |
| for (ITransformationConnector c : transformationConnectors) |
| { |
| if (c == null) |
| { |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| return null; |
| } |
| } |
| |
| // Pick up all needed output connectors. If this fails we have to release the transformation connectors. |
| try |
| { |
| IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections()); |
| for (IOutputConnector c : outputConnectors) |
| { |
| if (c == null) |
| { |
| outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors); |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| return null; |
| } |
| } |
| return new PipelineObjectWithVersions(pipelineConnections,transformationConnectors,outputConnectors); |
| } |
| catch (Throwable e) |
| { |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| if (e instanceof ManifoldCFException) |
| throw (ManifoldCFException)e; |
| else if (e instanceof RuntimeException) |
| throw (RuntimeException)e; |
| else if (e instanceof Error) |
| throw (Error)e; |
| else |
| throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e); |
| } |
| } |
| |
| /** Grab the entire pipeline. |
| *@param transformationConnections - the transformation connections, in order |
| *@param outputConnection - the output connection |
| *@param transformationDescriptionStrings - the array of description strings for transformations |
| *@param outputDescriptionString - the output description string |
| *@return the pipeline description, or null if any part of the pipeline cannot be grabbed. |
| */ |
| protected PipelineObject pipelineGrab(IPipelineSpecification pipelineConnections) |
| throws ManifoldCFException |
| { |
| // Pick up all needed transformation connectors |
| ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections()); |
| for (ITransformationConnector c : transformationConnectors) |
| { |
| if (c == null) |
| { |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| return null; |
| } |
| } |
| |
| // Pick up all needed output connectors. If this fails we have to release the transformation connectors. |
| try |
| { |
| IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections()); |
| for (IOutputConnector c : outputConnectors) |
| { |
| if (c == null) |
| { |
| outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors); |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| return null; |
| } |
| } |
| return new PipelineObject(pipelineConnections,transformationConnectors,outputConnectors); |
| } |
| catch (Throwable e) |
| { |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| if (e instanceof ManifoldCFException) |
| throw (ManifoldCFException)e; |
| else if (e instanceof RuntimeException) |
| throw (RuntimeException)e; |
| else if (e instanceof Error) |
| throw (Error)e; |
| else |
| throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e); |
| } |
| } |
| |
| /** Get an output version string for a document. |
| *@param outputConnection is the output connection associated with this action. |
| *@param spec is the output specification. |
| *@return the description string. |
| */ |
| @Override |
| public VersionContext getOutputDescription(IOutputConnection outputConnection, Specification spec) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| IOutputConnector connector = outputConnectorPool.grab(outputConnection); |
| if (connector == null) |
| // The connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("Output connector not installed",0L); |
| try |
| { |
| return connector.getPipelineDescription(spec); |
| } |
| finally |
| { |
| outputConnectorPool.release(outputConnection,connector); |
| } |
| |
| } |
| |
| /** Get transformation version string for a document. |
| *@param transformationConnection is the transformation connection associated with this action. |
| *@param spec is the transformation specification. |
| *@return the description string. |
| */ |
| @Override |
| public VersionContext getTransformationDescription(ITransformationConnection transformationConnection, Specification spec) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| ITransformationConnector connector = transformationConnectorPool.grab(transformationConnection); |
| if (connector == null) |
| // The connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("Transformation connector not installed",0L); |
| try |
| { |
| return connector.getPipelineDescription(spec); |
| } |
| finally |
| { |
| transformationConnectorPool.release(transformationConnection,connector); |
| } |
| } |
| |
| /** Determine whether we need to fetch or refetch a document. |
| * Pass in information including the pipeline specification with existing version info, plus new document and parameter version strings. |
| * If no outputs need to be updated, then this method will return false. If any outputs need updating, then true is returned. |
| *@param pipelineSpecificationWithVersions is the pipeline specification including new version info for all transformation and output |
| * connections. |
| *@param newDocumentVersion is the newly-determined document version. |
| *@param newAuthorityNameString is the newly-determined authority name. |
| *@return true if the document needs to be refetched. |
| */ |
| @Override |
| public boolean checkFetchDocument( |
| IPipelineSpecificationWithVersions pipelineSpecificationWithVersions, |
| String newDocumentVersion, |
| String newAuthorityNameString) |
| { |
| if (newAuthorityNameString == null) |
| newAuthorityNameString = ""; |
| // Cycle through the outputs |
| for (int i = 0; i < pipelineSpecificationWithVersions.getOutputCount(); i++) |
| { |
| int stage = pipelineSpecificationWithVersions.getOutputStage(i); |
| String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i); |
| String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i); |
| String oldAuthorityName = pipelineSpecificationWithVersions.getAuthorityNameString(i); |
| // If it looks like we never indexed this output before, we need to do it now. |
| if (oldDocumentVersion == null) |
| return true; |
| // Look first at the version strings that aren't pipeline dependent |
| if (!oldDocumentVersion.equals(newDocumentVersion) || |
| !oldAuthorityName.equals(newAuthorityNameString) || |
| !oldOutputVersion.equals(pipelineSpecificationWithVersions.getStageDescriptionString(stage).getVersionString())) |
| return true; |
| |
| // Everything matches so far. Next step is to compute a transformation path an corresponding version string. |
| String newTransformationVersion = computePackedTransformationVersion(pipelineSpecificationWithVersions,stage); |
| if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion)) |
| return true; |
| } |
| // Everything matches, so no reindexing is needed. |
| return false; |
| } |
| |
| /** Compute a transformation version given a pipeline specification and starting output stage. |
| *@param pipelineSpecification is the pipeline specification. |
| *@param stage is the stage number of the output stage. |
| *@return the transformation version string, which will be a composite of all the transformations applied. |
| */ |
| protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage) |
| { |
| // First, count the stages we need to represent |
| int stageCount = 0; |
| int currentStage = stage; |
| while (true) |
| { |
| int newStage = pipelineSpecification.getStageParent(currentStage); |
| if (newStage == -1) |
| break; |
| stageCount++; |
| currentStage = newStage; |
| } |
| // Doesn't matter how we pack it; I've chosen to do it in reverse for convenience |
| String[] stageNames = new String[stageCount]; |
| String[] stageDescriptions = new String[stageCount]; |
| stageCount = 0; |
| currentStage = stage; |
| while (true) |
| { |
| int newStage = pipelineSpecification.getStageParent(currentStage); |
| if (newStage == -1) |
| break; |
| stageNames[stageCount] = pipelineSpecification.getStageConnectionName(newStage); |
| stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage).getVersionString(); |
| stageCount++; |
| currentStage = newStage; |
| } |
| // Finally, do the packing. |
| StringBuilder sb = new StringBuilder(); |
| packList(sb,stageNames,'+'); |
| packList(sb,stageDescriptions,'!'); |
| return sb.toString(); |
| } |
| |
| protected static void packList(StringBuilder output, String[] values, char delimiter) |
| { |
| pack(output,Integer.toString(values.length),delimiter); |
| int i = 0; |
| while (i < values.length) |
| { |
| pack(output,values[i++],delimiter); |
| } |
| } |
| |
| protected static void pack(StringBuilder sb, String value, char delim) |
| { |
| for (int i = 0; i < value.length(); i++) |
| { |
| char x = value.charAt(i); |
| if (x == delim || x == '\\') |
| { |
| sb.append('\\'); |
| } |
| sb.append(x); |
| } |
| sb.append(delim); |
| } |
| |
| /** Record a document version, but don't ingest it. |
| * The purpose of this method is to update document version information without reindexing the document. |
| *@param pipelineSpecificationBasic is the basic pipeline specification needed. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hashed document identifier. |
| *@param componentHash is the hashed component identifier, if any. |
| *@param documentVersion is the document version. |
| *@param recordTime is the time at which the recording took place, in milliseconds since epoch. |
| */ |
| @Override |
| public void documentRecord( |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String identifierClass, String identifierHash, String componentHash, |
| String documentVersion, long recordTime) |
| throws ManifoldCFException |
| { |
| // This method is called when a connector decides that the last indexed version of the document is in fact just fine, |
| // but the document version information should be updated. |
| // The code pathway is therefore similar to that of document indexing, EXCEPT that no indexing will ever |
| // take place. This has some interesting side effects. For example: |
| // (1) In the case of a document collision with another job using the same repository connection, the last document |
| // indexed cannot be changed. Updating the version string for the document would therefore be misleading. This |
| // case should be detected and prevented from occurring, by refusing to perform the update. |
| // On the other hand, only one thread at a time can be processing the document at a given time, and therefore |
| // since the connector detected "no change", we are safe to presume we can just update the version info. |
| // (2) In the case of a URL conflict with another job, since nothing changes and no new URL is recorded, no cleanup |
| // of conflicting records sharing the same URL should be needed. |
| |
| String docKey = makeKey(identifierClass,identifierHash); |
| |
| String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic); |
| |
| if (Logging.ingest.isDebugEnabled()) |
| { |
| Logging.ingest.debug("Recording document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" for output connections '"+outputConnectionNames+"'"); |
| } |
| |
| for (int k = 0; k < outputConnectionNames.length; k++) |
| { |
| String outputConnectionName = outputConnectionNames[k]; |
| |
| // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled |
| // to noteDocumentIngest by having the null documentURI. |
| noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,null,null,null,recordTime,null,null); |
| } |
| } |
| |
| /** Remove a document from specified indexes, just as if an empty document |
| * was indexed, and record the necessary version information. |
| * This method is conceptually similar to documentIngest(), but does not actually take |
| * a document or allow it to be transformed. If there is a document already |
| * indexed, it is removed from the index. |
| *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hashed document identifier. |
| *@param componentHash is the hashed component identifier, if any. |
| *@param documentVersion is the document version. |
| *@param authorityName is the name of the authority associated with the document, if any. |
| *@param recordTime is the time at which the recording took place, in milliseconds since epoch. |
| *@param activities is an object providing a set of methods that the implementer can use to perform the operation. |
| */ |
| @Override |
| public void documentNoData( |
| IPipelineSpecificationWithVersions pipelineSpecificationWithVersions, |
| String identifierClass, String identifierHash, String componentHash, |
| String documentVersion, |
| String authorityName, |
| long recordTime, |
| IOutputActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| String docKey = makeKey(identifierClass,identifierHash); |
| |
| if (Logging.ingest.isDebugEnabled()) |
| { |
| Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions)+"'"); |
| } |
| |
| // Set up a pipeline |
| PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineSpecificationWithVersions); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("Pipeline connector not installed",0L); |
| try |
| { |
| pipeline.noDocument(docKey,componentHash,documentVersion,authorityName,activities,recordTime); |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Ingest a document. |
| * This ingests the document, and notes it. If this is a repeat ingestion of the document, this |
| * method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata |
| * described by the RepositoryDocument object passed to this method. |
| * ServiceInterruption is thrown if the document ingestion must be rescheduled. |
| *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hashed document identifier. |
| *@param componentHash is the hashed component identifier, if any. |
| *@param documentVersion is the document version. |
| *@param authorityName is the name of the authority associated with the document, if any. |
| *@param data is the document data. The data is closed after ingestion is complete. |
| *@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch. |
| *@param documentURI is the URI of the document, which will be used as the key of the document in the index. |
| *@param activities is an object providing a set of methods that the implementer can use to perform the operation. |
| *@return true if the ingest was ok, false if the ingest is illegal (and should not be repeated). |
| *@throws IOException only if data stream throws an IOException. |
| */ |
| @Override |
| public boolean documentIngest( |
| IPipelineSpecificationWithVersions pipelineSpecificationWithVersions, |
| String identifierClass, String identifierHash, String componentHash, |
| String documentVersion, |
| String authorityName, |
| RepositoryDocument data, |
| long ingestTime, String documentURI, |
| IOutputActivity activities) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| String docKey = makeKey(identifierClass,identifierHash); |
| |
| if (Logging.ingest.isDebugEnabled()) |
| { |
| Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions)+"'"); |
| } |
| |
| // Set indexing date |
| data.setIndexingDate(new Date()); |
| |
| // Set up a pipeline |
| PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineSpecificationWithVersions); |
| if (pipeline == null) |
| // A connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("Pipeline connector not installed",0L); |
| try |
| { |
| return pipeline.addOrReplaceDocumentWithException(docKey,componentHash,documentURI,data,documentVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED; |
| } |
| finally |
| { |
| pipeline.release(); |
| } |
| } |
| |
| /** Remove a document component from the search engine index. |
| *@param pipelineConnections is the pipeline specification. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hash of the id of the document. |
| *@param componentHash is the hashed component identifier, if any. |
| *@param activities is the object to use to log the details of the ingestion attempt. May be null. |
| */ |
| @Override |
| public void documentRemove( |
| IPipelineConnections pipelineConnections, |
| String identifierClass, String identifierHash, String componentHash, |
| IOutputRemoveActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| documentRemoveMultiple(pipelineConnections, |
| new String[]{identifierClass}, |
| new String[]{identifierHash}, |
| componentHash, |
| activities); |
| } |
| |
| protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic) |
| { |
| String[] rval = new String[pipelineSpecificationBasic.getOutputCount()]; |
| for (int i = 0; i < rval.length; i++) |
| { |
| rval[i] = pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(i)); |
| } |
| return rval; |
| } |
| |
| /** Note the fact that we checked a document (and found that it did not need to be ingested, because the |
| * versions agreed). |
| *@param pipelineSpecificationBasic is a pipeline specification. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes are the set of document identifier hashes. |
| *@param checkTime is the time at which the check took place, in milliseconds since epoch. |
| */ |
| @Override |
| public void documentCheckMultiple( |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String[] identifierClasses, String[] identifierHashes, |
| long checkTime) |
| throws ManifoldCFException |
| { |
| // Extract output connection names from pipeline spec |
| String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic); |
| beginTransaction(); |
| try |
| { |
| int maxClauses; |
| |
| Set<String> docIDValues = new HashSet<String>(); |
| for (int j = 0; j < identifierHashes.length; j++) |
| { |
| String docDBString = makeKey(identifierClasses[j],identifierHashes[j]); |
| docIDValues.add(docDBString); |
| } |
| |
| // Now, perform n queries, each of them no larger the maxInClause in length. |
| // Create a list of row id's from this. |
| Set<Long> rowIDSet = new HashSet<Long>(); |
| Iterator<String> iter = docIDValues.iterator(); |
| int j = 0; |
| List<String> list = new ArrayList<String>(); |
| maxClauses = maxClausesRowIdsForDocIds(outputConnectionNames); |
| while (iter.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| findRowIdsForDocIds(outputConnectionNames,rowIDSet,list); |
| list.clear(); |
| j = 0; |
| } |
| list.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| findRowIdsForDocIds(outputConnectionNames,rowIDSet,list); |
| |
| // Now, break row id's into chunks too; submit one chunk at a time |
| j = 0; |
| List<Long> list2 = new ArrayList<Long>(); |
| Iterator<Long> iter2 = rowIDSet.iterator(); |
| maxClauses = maxClausesUpdateRowIds(); |
| while (iter2.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| updateRowIds(list2,checkTime); |
| list2.clear(); |
| j = 0; |
| } |
| list2.add(iter2.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| updateRowIds(list2,checkTime); |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| } |
| |
| /** Note the fact that we checked a document (and found that it did not need to be ingested, because the |
| * versions agreed). |
| *@param pipelineSpecificationBasic is a basic pipeline specification. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hashed document identifier. |
| *@param checkTime is the time at which the check took place, in milliseconds since epoch. |
| */ |
| @Override |
| public void documentCheck( |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String identifierClass, String identifierHash, |
| long checkTime) |
| throws ManifoldCFException |
| { |
| documentCheckMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},checkTime); |
| } |
| |
| /** Calculate the number of clauses. |
| */ |
| protected int maxClausesUpdateRowIds() |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{}); |
| } |
| |
| /** Update a chunk of row ids. |
| */ |
| protected void updateRowIds(List<Long> list, long checkTime) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(idField,list)}); |
| |
| HashMap map = new HashMap(); |
| map.put(lastIngestField,new Long(checkTime)); |
| performUpdate(map,"WHERE "+query,newList,null); |
| } |
| |
| |
| /** Delete multiple documents from the search engine index. |
| *@param pipelineConnections are the pipeline specifications associated with the documents. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes is tha array of document identifier hashes if the documents. |
| *@param activities is the object to use to log the details of the ingestion attempt. May be null. |
| */ |
| @Override |
| public void documentDeleteMultiple( |
| IPipelineConnections[] pipelineConnections, |
| String[] identifierClasses, String[] identifierHashes, |
| IOutputRemoveActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // Segregate request by pipeline spec instance address. Not perfect but works in the |
| // environment it is used it. |
| Map<IPipelineConnections,List<Integer>> keyMap = new HashMap<IPipelineConnections,List<Integer>>(); |
| for (int i = 0; i < pipelineConnections.length; i++) |
| { |
| IPipelineConnections spec = pipelineConnections[i]; |
| List<Integer> list = keyMap.get(spec); |
| if (list == null) |
| { |
| list = new ArrayList<Integer>(); |
| keyMap.put(spec,list); |
| } |
| list.add(new Integer(i)); |
| } |
| |
| // Create the return array. |
| Iterator<IPipelineConnections> iter = keyMap.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| IPipelineConnections spec = iter.next(); |
| List<Integer> list = keyMap.get(spec); |
| String[] localIdentifierClasses = new String[list.size()]; |
| String[] localIdentifierHashes = new String[list.size()]; |
| for (int i = 0; i < localIdentifierClasses.length; i++) |
| { |
| int index = list.get(i).intValue(); |
| localIdentifierClasses[i] = identifierClasses[index]; |
| localIdentifierHashes[i] = identifierHashes[index]; |
| } |
| documentDeleteMultiple(spec,localIdentifierClasses,localIdentifierHashes,activities); |
| } |
| } |
| |
| protected static String createURILockName(String outputConnectionName, String uriHash) |
| { |
| // The lock name needs to be constrained to some acceptably small number in order to avoid |
| // a lot of zookeeper locks. See CONNECTORS-1123. |
| int hashCode = outputConnectionName.hashCode() + uriHash.hashCode(); |
| hashCode &= 0xffff; |
| return "URILOCK-"+hashCode; |
| } |
| |
| /** Delete multiple documents from the search engine index. |
| *@param pipelineConnections is the pipeline specification. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes is tha array of document identifier hashes if the documents. |
| *@param activities is the object to use to log the details of the ingestion attempt. May be null. |
| */ |
| @Override |
| public void documentDeleteMultiple( |
| IPipelineConnections pipelineConnections, |
| String[] identifierClasses, String[] identifierHashes, |
| IOutputRemoveActivity originalActivities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| String[] outputConnectionNames = pipelineConnections.getOutputConnectionNames(); |
| IOutputConnection[] outputConnections = pipelineConnections.getOutputConnections(); |
| |
| // No transactions here, so we can cycle through the connection names one at a time |
| for (int z = 0; z < outputConnectionNames.length; z++) |
| { |
| String outputConnectionName = outputConnectionNames[z]; |
| IOutputConnection connection = outputConnections[z]; |
| |
| IOutputRemoveActivity activities = new OutputRemoveActivitiesWrapper(originalActivities,outputConnectionName); |
| |
| if (Logging.ingest.isDebugEnabled()) |
| { |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'"); |
| } |
| } |
| |
| // No transactions. Time for the operation may exceed transaction timeout. |
| |
| // Obtain the current URIs of all of these. |
| List<DeleteInfo> uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes); |
| |
| // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting. |
| // (This guarantees that when this operation is complete the database reflects reality.) |
| int validURIcount = 0; |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| validURIcount++; |
| } |
| String[] lockArray = new String[validURIcount]; |
| String[] validURIArray = new String[validURIcount]; |
| String[] validURIHashArray = new String[validURIcount]; |
| validURIcount = 0; |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| { |
| validURIArray[validURIcount] = uri.getURI(); |
| validURIHashArray[validURIcount] = uri.getURIHash(); |
| lockArray[validURIcount] = createURILockName(outputConnectionName,validURIHashArray[validURIcount]); |
| validURIcount++; |
| } |
| } |
| |
| lockManager.enterLocks(null,null,lockArray); |
| try |
| { |
| // Fetch the document URIs for the listed documents |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| { |
| removeDocument(connection,uri.getURI(),uri.getOutputVersion(),activities); |
| } |
| } |
| |
| // Now, get rid of all rows that match the given uris. |
| // Do the queries together, then the deletes |
| beginTransaction(); |
| try |
| { |
| // The basic process is this: |
| // 1) Come up with a set of urihash values |
| // 2) Find the matching, corresponding id values |
| // 3) Delete the rows corresponding to the id values, in sequence |
| |
| // Process (1 & 2) has to be broken down into chunks that contain the maximum |
| // number of doc hash values each. We need to avoid repeating doc hash values, |
| // so the first step is to come up with ALL the doc hash values before looping |
| // over them. |
| |
| int maxClauses; |
| |
| // Find all the documents that match this set of URIs |
| Set<String> docURIHashValues = new HashSet<String>(); |
| Set<String> docURIValues = new HashSet<String>(); |
| for (String docDBString : validURIArray) |
| { |
| docURIValues.add(docDBString); |
| } |
| for (String docDBString : validURIHashArray) |
| { |
| docURIHashValues.add(docDBString); |
| } |
| |
| // Now, perform n queries, each of them no larger the maxInClause in length. |
| // Create a list of row id's from this. |
| Set<Long> rowIDSet = new HashSet<Long>(); |
| Iterator<String> iter = docURIHashValues.iterator(); |
| int j = 0; |
| List<String> hashList = new ArrayList<String>(); |
| maxClauses = maxClausesRowIdsForURIs(outputConnectionName); |
| while (iter.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList); |
| hashList.clear(); |
| j = 0; |
| } |
| hashList.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList); |
| |
| // Next, go through the list of row IDs, and delete them in chunks |
| j = 0; |
| List<Long> list = new ArrayList<Long>(); |
| Iterator<Long> iter2 = rowIDSet.iterator(); |
| maxClauses = maxClausesDeleteRowIds(); |
| while (iter2.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| deleteRowIds(list); |
| list.clear(); |
| j = 0; |
| } |
| list.add(iter2.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| deleteRowIds(list); |
| |
| // Now, find the set of documents that remain that match the document identifiers. |
| Set<String> docIdValues = new HashSet<String>(); |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| String docDBString = makeKey(identifierClasses[i],identifierHashes[i]); |
| docIdValues.add(docDBString); |
| } |
| |
| // Now, perform n queries, each of them no larger the maxInClause in length. |
| // Create a list of row id's from this. |
| rowIDSet.clear(); |
| iter = docIdValues.iterator(); |
| j = 0; |
| List<String> list2 = new ArrayList<String>(); |
| maxClauses = maxClausesRowIdsForDocIds(outputConnectionName); |
| while (iter.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| findRowIdsForDocIds(outputConnectionName,rowIDSet,list2); |
| list2.clear(); |
| j = 0; |
| } |
| list2.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| findRowIdsForDocIds(outputConnectionName,rowIDSet,list2); |
| |
| // Next, go through the list of row IDs, and delete them in chunks |
| j = 0; |
| list.clear(); |
| iter2 = rowIDSet.iterator(); |
| maxClauses = maxClausesDeleteRowIds(); |
| while (iter2.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| deleteRowIds(list); |
| list.clear(); |
| j = 0; |
| } |
| list.add(iter2.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| deleteRowIds(list); |
| |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| } |
| finally |
| { |
| lockManager.leaveLocks(null,null,lockArray); |
| } |
| } |
| } |
| |
| /** Remove multiple document components from the search engine index. |
| *@param pipelineConnections is the pipeline specification. |
| *@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted. |
| *@param identifierHashes are the hashes of the ids of the documents. |
| *@param componentHash is the hashed component identifier, if any. |
| *@param activities is the object to use to log the details of the ingestion attempt. May be null. |
| */ |
| @Override |
| public void documentRemoveMultiple( |
| IPipelineConnections pipelineConnections, |
| String[] identifierClasses, String[] identifierHashes, String componentHash, |
| IOutputRemoveActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| String[] outputConnectionNames = pipelineConnections.getOutputConnectionNames(); |
| IOutputConnection[] outputConnections = pipelineConnections.getOutputConnections(); |
| |
| // No transactions here, so we can cycle through the connection names one at a time |
| for (int z = 0; z < outputConnectionNames.length; z++) |
| { |
| String outputConnectionName = outputConnectionNames[z]; |
| IOutputConnection connection = outputConnections[z]; |
| |
| activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName); |
| |
| if (Logging.ingest.isDebugEnabled()) |
| { |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| Logging.ingest.debug("Request to remove document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" from output connection '"+outputConnectionName+"'"); |
| } |
| } |
| |
| // No transactions. Time for the operation may exceed transaction timeout. |
| |
| // Obtain the current URIs of all of these. |
| List<DeleteInfo> uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes,componentHash); |
| |
| // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting. |
| // (This guarantees that when this operation is complete the database reflects reality.) |
| int validURIcount = 0; |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| validURIcount++; |
| } |
| String[] lockArray = new String[validURIcount]; |
| String[] validURIArray = new String[validURIcount]; |
| String[] validURIHashArray = new String[validURIcount]; |
| validURIcount = 0; |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| { |
| validURIArray[validURIcount] = uri.getURI(); |
| validURIHashArray[validURIcount] = uri.getURIHash(); |
| lockArray[validURIcount] = createURILockName(outputConnectionName,validURIHashArray[validURIcount]); |
| validURIcount++; |
| } |
| } |
| |
| lockManager.enterLocks(null,null,lockArray); |
| try |
| { |
| // Fetch the document URIs for the listed documents |
| for (DeleteInfo uri : uris) |
| { |
| if (uri.getURI() != null) |
| removeDocument(connection,uri.getURI(),uri.getOutputVersion(),activities); |
| } |
| |
| // Now, get rid of all rows that match the given uris. |
| // Do the queries together, then the deletes |
| beginTransaction(); |
| try |
| { |
| // The basic process is this: |
| // 1) Come up with a set of urihash values |
| // 2) Find the matching, corresponding id values |
| // 3) Delete the rows corresponding to the id values, in sequence |
| |
| // Process (1 & 2) has to be broken down into chunks that contain the maximum |
| // number of doc hash values each. We need to avoid repeating doc hash values, |
| // so the first step is to come up with ALL the doc hash values before looping |
| // over them. |
| |
| int maxClauses; |
| |
| // Find all the documents that match this set of URIs |
| Set<String> docURIHashValues = new HashSet<String>(); |
| Set<String> docURIValues = new HashSet<String>(); |
| for (String docDBString : validURIArray) |
| { |
| docURIValues.add(docDBString); |
| } |
| for (String docDBString : validURIHashArray) |
| { |
| docURIHashValues.add(docDBString); |
| } |
| |
| // Now, perform n queries, each of them no larger the maxInClause in length. |
| // Create a list of row id's from this. |
| Set<Long> rowIDSet = new HashSet<Long>(); |
| Iterator<String> iter = docURIHashValues.iterator(); |
| int j = 0; |
| List<String> hashList = new ArrayList<String>(); |
| maxClauses = maxClausesRowIdsForURIs(outputConnectionName); |
| while (iter.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList); |
| hashList.clear(); |
| j = 0; |
| } |
| hashList.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList); |
| |
| // Next, go through the list of row IDs, and delete them in chunks |
| j = 0; |
| List<Long> list = new ArrayList<Long>(); |
| Iterator<Long> iter2 = rowIDSet.iterator(); |
| maxClauses = maxClausesDeleteRowIds(); |
| while (iter2.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| deleteRowIds(list); |
| list.clear(); |
| j = 0; |
| } |
| list.add(iter2.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| deleteRowIds(list); |
| |
| // Now, find the set of documents that remain that match the document identifiers. |
| Set<String> docIdValues = new HashSet<String>(); |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| String docDBString = makeKey(identifierClasses[i],identifierHashes[i]); |
| docIdValues.add(docDBString); |
| } |
| |
| // Now, perform n queries, each of them no larger the maxInClause in length. |
| // Create a list of row id's from this. |
| rowIDSet.clear(); |
| iter = docIdValues.iterator(); |
| j = 0; |
| List<String> list2 = new ArrayList<String>(); |
| maxClauses = maxClausesRowIdsForDocIds(outputConnectionName,componentHash); |
| while (iter.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| findRowIdsForDocIds(outputConnectionName,rowIDSet,list2,componentHash); |
| list2.clear(); |
| j = 0; |
| } |
| list2.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| findRowIdsForDocIds(outputConnectionName,rowIDSet,list2,componentHash); |
| |
| // Next, go through the list of row IDs, and delete them in chunks |
| j = 0; |
| list.clear(); |
| iter2 = rowIDSet.iterator(); |
| maxClauses = maxClausesDeleteRowIds(); |
| while (iter2.hasNext()) |
| { |
| if (j == maxClauses) |
| { |
| deleteRowIds(list); |
| list.clear(); |
| j = 0; |
| } |
| list.add(iter2.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| deleteRowIds(list); |
| |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| } |
| finally |
| { |
| lockManager.leaveLocks(null,null,lockArray); |
| } |
| } |
| } |
| |
| /** Calculate the clauses. |
| */ |
| protected int maxClausesRowIdsForURIs(String outputConnectionName) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| } |
| |
| /** Given values and parameters corresponding to a set of hash values, add corresponding |
| * table row id's to the output map. |
| */ |
| protected void findRowIdsForURIs(String outputConnectionName, Set<Long> rowIDSet, Set<String> uris, List<String> hashParamValues) |
| throws ManifoldCFException |
| { |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new MultiClause(uriHashField,hashParamValues), |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| |
| IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+ |
| getTableName()+" WHERE "+query,list,null,null); |
| |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| String docURI = (String)row.getValue(docURIField); |
| if (docURI != null && docURI.length() > 0) |
| { |
| if (uris.contains(docURI)) |
| { |
| Long rowID = (Long)row.getValue(idField); |
| rowIDSet.add(rowID); |
| } |
| } |
| } |
| } |
| |
| /** Calculate the maximum number of doc ids we should use. |
| */ |
| protected int maxClausesRowIdsForDocIds(String outputConnectionName) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| } |
| |
| /** Calculate the maximum number of doc ids we should use. |
| */ |
| protected int maxClausesRowIdsForDocIds(String outputConnectionName, String componentHash) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| (componentHash == null || componentHash.length() == 0)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash)}); |
| } |
| |
| /** Calculate the maximum number of doc ids we should use. |
| */ |
| protected int maxClausesRowIdsForDocIds(String[] outputConnectionNames) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| } |
| |
| /** Given values and parameters corresponding to a set of hash values, add corresponding |
| * table row id's to the output map. |
| */ |
| protected void findRowIdsForDocIds(String outputConnectionName, Set<Long> rowIDSet, List<String> paramValues) |
| throws ManifoldCFException |
| { |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new MultiClause(docKeyField,paramValues), |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| |
| IResultSet set = performQuery("SELECT "+idField+" FROM "+ |
| getTableName()+" WHERE "+query,list,null,null); |
| |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| Long rowID = (Long)row.getValue(idField); |
| rowIDSet.add(rowID); |
| } |
| } |
| |
| /** Given values and parameters corresponding to a set of hash values, add corresponding |
| * table row id's to the output map. |
| */ |
| protected void findRowIdsForDocIds(String outputConnectionName, Set<Long> rowIDSet, List<String> paramValues, String componentHash) |
| throws ManifoldCFException |
| { |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new MultiClause(docKeyField,paramValues), |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| (componentHash==null || componentHash.length() == 0)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash)}); |
| |
| IResultSet set = performQuery("SELECT "+idField+" FROM "+ |
| getTableName()+" WHERE "+query,list,null,null); |
| |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| Long rowID = (Long)row.getValue(idField); |
| rowIDSet.add(rowID); |
| } |
| } |
| |
| /** Given values and parameters corresponding to a set of hash values, add corresponding |
| * table row id's to the output map. |
| */ |
| protected void findRowIdsForDocIds(String[] outputConnectionNames, Set<Long> rowIDSet, List<String> paramValues) |
| throws ManifoldCFException |
| { |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new MultiClause(docKeyField,paramValues), |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| |
| IResultSet set = performQuery("SELECT "+idField+" FROM "+ |
| getTableName()+" WHERE "+query,list,null,null); |
| |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| Long rowID = (Long)row.getValue(idField); |
| rowIDSet.add(rowID); |
| } |
| } |
| |
| /** Calculate the maximum number of clauses. |
| */ |
| protected int maxClausesDeleteRowIds() |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{}); |
| } |
| |
| /** Delete a chunk of row ids. |
| */ |
| protected void deleteRowIds(List<Long> list) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(idField,list)}); |
| performDelete("WHERE "+query,newList,null); |
| } |
| |
| /** Delete a document from the search engine index. |
| *@param pipelineConnections is the pipeline specification. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hash of the id of the document. |
| *@param activities is the object to use to log the details of the ingestion attempt. May be null. |
| */ |
| @Override |
| public void documentDelete( |
| IPipelineConnections pipelineConnections, |
| String identifierClass, String identifierHash, |
| IOutputRemoveActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| documentDeleteMultiple(pipelineConnections,new String[]{identifierClass},new String[]{identifierHash},activities); |
| } |
| |
| /** Find out what URIs a SET of document URIs are currently ingested. |
| *@param identifierHashes is the array of document id's to check. |
| *@return the array of current document uri's. Null returned for identifiers |
| * that don't exist in the index. |
| */ |
| protected List<DeleteInfo> getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes) |
| throws ManifoldCFException |
| { |
| List<DeleteInfo> rval = new ArrayList<DeleteInfo>(); |
| beginTransaction(); |
| try |
| { |
| List<String> list = new ArrayList<String>(); |
| int maxCount = maxClauseDocumentURIChunk(outputConnectionName); |
| int j = 0; |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| if (j == maxCount) |
| { |
| getDocumentURIChunk(rval,outputConnectionName,list); |
| j = 0; |
| list.clear(); |
| } |
| list.add(makeKey(identifierClasses[i],identifierHashes[i])); |
| j++; |
| } |
| if (j > 0) |
| getDocumentURIChunk(rval,outputConnectionName,list); |
| return rval; |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| } |
| |
| /** Find out what URIs a SET of document URIs are currently ingested. |
| *@param outputConnectionName is the output connection name. |
| *@param identifierClasses is the array of identifier classes. |
| *@param identifierHashes is the array of document id's to check. |
| *@param componentHash is the component hash to check. |
| *@return the array of current document uri's. Null returned for identifiers |
| * that don't exist in the index. |
| */ |
| protected List<DeleteInfo> getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes, String componentHash) |
| throws ManifoldCFException |
| { |
| List<DeleteInfo> rval = new ArrayList<DeleteInfo>(); |
| beginTransaction(); |
| try |
| { |
| List<String> list = new ArrayList<String>(); |
| int maxCount = maxClauseDocumentURIChunk(outputConnectionName,componentHash); |
| int j = 0; |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| if (j == maxCount) |
| { |
| getDocumentURIChunk(rval,outputConnectionName,list,componentHash); |
| j = 0; |
| list.clear(); |
| } |
| list.add(makeKey(identifierClasses[i],identifierHashes[i])); |
| j++; |
| } |
| if (j > 0) |
| getDocumentURIChunk(rval,outputConnectionName,list,componentHash); |
| return rval; |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| } |
| |
| /** Look up ingestion data for a set of documents. |
| *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results. |
| *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes is the array of document identifier hashes to look up. |
| */ |
| @Override |
| public void getPipelineDocumentIngestDataMultiple( |
| IngestStatuses rval, |
| IPipelineSpecificationBasic[] pipelineSpecificationBasics, |
| String[] identifierClasses, String[] identifierHashes) |
| throws ManifoldCFException |
| { |
| // Organize by pipeline spec. |
| Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>(); |
| for (int i = 0; i < pipelineSpecificationBasics.length; i++) |
| { |
| IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i]; |
| List<Integer> list = keyMap.get(spec); |
| if (list == null) |
| { |
| list = new ArrayList<Integer>(); |
| keyMap.put(spec,list); |
| } |
| list.add(new Integer(i)); |
| } |
| |
| // Create the return array. |
| Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| IPipelineSpecificationBasic spec = iter.next(); |
| List<Integer> list = keyMap.get(spec); |
| String[] localIdentifierClasses = new String[list.size()]; |
| String[] localIdentifierHashes = new String[list.size()]; |
| for (int i = 0; i < localIdentifierClasses.length; i++) |
| { |
| int index = list.get(i).intValue(); |
| localIdentifierClasses[i] = identifierClasses[index]; |
| localIdentifierHashes[i] = identifierHashes[index]; |
| } |
| getPipelineDocumentIngestDataMultiple(rval,spec,localIdentifierClasses,localIdentifierHashes); |
| } |
| } |
| |
| /** Look up ingestion data for a SET of documents. |
| *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results. |
| *@param pipelineSpecificationBasic is the pipeline specification for all documents. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes is the array of document identifier hashes to look up. |
| */ |
| @Override |
| public void getPipelineDocumentIngestDataMultiple( |
| IngestStatuses rval, |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String[] identifierClasses, String[] identifierHashes) |
| throws ManifoldCFException |
| { |
| String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic); |
| |
| // Build a map, so we can convert an identifier into an array index. |
| Map<String,Integer> indexMap = new HashMap<String,Integer>(); |
| for (int i = 0; i < identifierHashes.length; i++) |
| { |
| indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i)); |
| } |
| |
| beginTransaction(); |
| try |
| { |
| List<String> list = new ArrayList<String>(); |
| int maxCount = maxClausePipelineDocumentIngestDataChunk(outputConnectionNames); |
| int j = 0; |
| Iterator<String> iter = indexMap.keySet().iterator(); |
| while (iter.hasNext()) |
| { |
| if (j == maxCount) |
| { |
| getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes); |
| j = 0; |
| list.clear(); |
| } |
| list.add(iter.next()); |
| j++; |
| } |
| if (j > 0) |
| getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes); |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| |
| } |
| |
| /** Get a chunk of document ingest data records. |
| *@param rval is the document ingest status array where the data should be put. |
| *@param map is the map from id to index. |
| *@param clause is the in clause for the query. |
| *@param list is the parameter list for the query. |
| */ |
| protected void getPipelineDocumentIngestDataChunk(IngestStatuses rval, Map<String,Integer> map, String[] outputConnectionNames, List<String> list, |
| String[] identifierClasses, String[] identifierHashes) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(docKeyField,list), |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| |
| // Get the primary records associated with this hash value |
| IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+componentHashField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+lastTransformationVersionField+ |
| " FROM "+getTableName()+" WHERE "+query,newList,null,null); |
| |
| // Now, go through the original request once more, this time building the result |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| String docHash = row.getValue(docKeyField).toString(); |
| Integer position = map.get(docHash); |
| if (position != null) |
| { |
| Long id = (Long)row.getValue(idField); |
| String outputConnectionName = (String)row.getValue(outputConnNameField); |
| String componentHash = (String)row.getValue(componentHashField); |
| String lastVersion = (String)row.getValue(lastVersionField); |
| if (lastVersion == null) |
| lastVersion = ""; |
| String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField); |
| if (lastTransformationVersion == null) |
| lastTransformationVersion = ""; |
| String lastOutputVersion = (String)row.getValue(lastOutputVersionField); |
| if (lastOutputVersion == null) |
| lastOutputVersion = ""; |
| String authorityName = (String)row.getValue(authorityNameField); |
| if (authorityName == null) |
| authorityName = ""; |
| int indexValue = position.intValue(); |
| rval.addStatus(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName, |
| componentHash,new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,authorityName)); |
| } |
| } |
| } |
| |
| /** Look up ingestion data for a document. |
| *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results. |
| *@param pipelineSpecificationBasic is the pipeline specification for the document. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hash of the id of the document. |
| */ |
| @Override |
| public void getPipelineDocumentIngestData( |
| IngestStatuses rval, |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String identifierClass, String identifierHash) |
| throws ManifoldCFException |
| { |
| getPipelineDocumentIngestDataMultiple(rval,pipelineSpecificationBasic, |
| new String[]{identifierClass},new String[]{identifierHash}); |
| } |
| |
| /** Calculate the average time interval between changes for a document. |
| * This is based on the data gathered for the document. |
| *@param pipelineSpecificationBasic is the basic pipeline specification. |
| *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted. |
| *@param identifierHashes is the hashes of the ids of the documents. |
| *@return the number of milliseconds between changes, or 0 if this cannot be calculated. |
| */ |
| @Override |
| public long[] getDocumentUpdateIntervalMultiple( |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String[] identifierClasses, String[] identifierHashes) |
| throws ManifoldCFException |
| { |
| // Get the output connection names |
| String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic); |
| |
| // Do these all at once!! |
| // First, create a return array |
| long[] rval = new long[identifierHashes.length]; |
| // Also create a map from identifier to return index. |
| Map<String,Integer> returnMap = new HashMap<String,Integer>(); |
| // Finally, need the set of hash codes |
| Set<String> idCodes = new HashSet<String>(); |
| for (int j = 0; j < identifierHashes.length; j++) |
| { |
| String key = makeKey(identifierClasses[j],identifierHashes[j]); |
| rval[j] = Long.MAX_VALUE; |
| returnMap.put(key,new Integer(j)); |
| idCodes.add(key); |
| } |
| |
| // Get the chunk size |
| int maxClause = maxClauseGetIntervals(outputConnectionNames); |
| |
| // Loop through the hash codes |
| Iterator<String> iter = idCodes.iterator(); |
| List<String> list = new ArrayList<String>(); |
| int j = 0; |
| while (iter.hasNext()) |
| { |
| if (j == maxClause) |
| { |
| getIntervals(rval,outputConnectionNames,list,returnMap); |
| list.clear(); |
| j = 0; |
| } |
| |
| list.add(iter.next()); |
| j++; |
| } |
| |
| if (j > 0) |
| getIntervals(rval,outputConnectionNames,list,returnMap); |
| |
| for (int i = 0; i < rval.length; i++) |
| { |
| if (rval[i] == Long.MAX_VALUE) |
| rval[i] = 0; |
| } |
| |
| return rval; |
| |
| } |
| |
| /** Calculate the average time interval between changes for a document. |
| * This is based on the data gathered for the document. |
| *@param pipelineSpecificationBasic is the basic pipeline specification. |
| *@param identifierClass is the name of the space in which the identifier hash should be interpreted. |
| *@param identifierHash is the hash of the id of the document. |
| *@return the number of milliseconds between changes, or 0 if this cannot be calculated. |
| */ |
| @Override |
| public long getDocumentUpdateInterval( |
| IPipelineSpecificationBasic pipelineSpecificationBasic, |
| String identifierClass, String identifierHash) |
| throws ManifoldCFException |
| { |
| return getDocumentUpdateIntervalMultiple( |
| pipelineSpecificationBasic, |
| new String[]{identifierClass},new String[]{identifierHash})[0]; |
| } |
| |
| /** Calculate the number of clauses. |
| */ |
| protected int maxClauseGetIntervals(String[] outputConnectionNames) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| } |
| |
| /** Query for and calculate the interval for a bunch of hashcodes. |
| *@param rval is the array to stuff calculated return values into. |
| *@param list is the list of parameters. |
| *@param queryPart is the part of the query pertaining to the list of hashcodes |
| *@param returnMap is a mapping from document id to rval index. |
| */ |
| protected void getIntervals(long[] rval, String[] outputConnectionNames, List<String> list, Map<String,Integer> returnMap) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(docKeyField,list), |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| |
| IResultSet set = performQuery("SELECT "+docKeyField+","+changeCountField+","+firstIngestField+","+lastIngestField+ |
| " FROM "+getTableName()+" WHERE "+query,newList,null,null); |
| |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| String docHash = (String)row.getValue(docKeyField); |
| Integer index = (Integer)returnMap.get(docHash); |
| if (index != null) |
| { |
| // Calculate the return value |
| long changeCount = ((Long)row.getValue(changeCountField)).longValue(); |
| long firstIngest = ((Long)row.getValue(firstIngestField)).longValue(); |
| long lastIngest = ((Long)row.getValue(lastIngestField)).longValue(); |
| int indexValue = index.intValue(); |
| long newValue = (long)(((double)(lastIngest-firstIngest))/(double)changeCount); |
| if (newValue < rval[indexValue]) |
| rval[indexValue] = newValue; |
| } |
| } |
| } |
| |
| /** Reset all documents belonging to a specific output connection, because we've got information that |
| * that system has been reconfigured. This will force all such documents to be reindexed the next time |
| * they are checked. |
| *@param outputConnection is the output connection associated with this action. |
| */ |
| @Override |
| public void resetOutputConnection(IOutputConnection outputConnection) |
| throws ManifoldCFException |
| { |
| if (outputConnection == null) |
| return; |
| |
| // We're not going to blow away the records, but we are going to set their versions to mean, "reindex required" |
| HashMap map = new HashMap(); |
| map.put(lastVersionField,null); |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnection.getName())}); |
| |
| performUpdate(map,"WHERE "+query,list,null); |
| } |
| |
| /** Remove all knowledge of an output index from the system. This is appropriate |
| * when the output index no longer exists and you wish to delete the associated job. |
| *@param outputConnection is the output connection associated with this action. |
| */ |
| @Override |
| public void removeOutputConnection(IOutputConnection outputConnection) |
| throws ManifoldCFException |
| { |
| if (outputConnection == null) |
| return; |
| |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnection.getName())}); |
| |
| performDelete("WHERE "+query,list,null); |
| |
| // Notify the output connection of the removal of all the records for the connection |
| IOutputConnector connector = outputConnectorPool.grab(outputConnection); |
| if (connector == null) |
| return; |
| try |
| { |
| connector.noteAllRecordsRemoved(); |
| } |
| finally |
| { |
| outputConnectorPool.release(outputConnection,connector); |
| } |
| |
| } |
| |
| /** Note the ingestion of a document, or the "update" of a document. |
| *@param outputConnectionName is the name of the output connection. |
| *@param docKey is the key string describing the document. |
| *@param componentHash is the component identifier hash for this document. |
| *@param documentVersion is a string describing the new version of the document. |
| *@param transformationVersion is a string describing all current transformations for the document. |
| *@param outputVersion is the version string calculated for the output connection. |
| *@param authorityNameString is the name of the relevant authority connection. |
| *@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch. |
| *@param documentURI is the uri the document can be accessed at, or null (which signals that we are to record the version, but no |
| * ingestion took place). |
| *@param documentURIHash is the hash of the document uri. |
| */ |
| protected void noteDocumentIngest(String outputConnectionName, |
| String docKey, String componentHash, String documentVersion, String transformationVersion, |
| String outputVersion, |
| String authorityNameString, |
| long ingestTime, String documentURI, String documentURIHash) |
| throws ManifoldCFException |
| { |
| HashMap map = new HashMap(); |
| while (true) |
| { |
| // The table can have at most one row per URI, for non-null URIs. It can also have at most one row per document identifier. |
| // However, for null URI's, multiple rows are allowed. Null URIs have a special meaning, which is that |
| // the document was not actually ingested. |
| |
| // To make sure the constraints are enforced, we cannot simply look for the row and insert one if not found. This is because |
| // postgresql does not cause a lock to be created on rows that don't yet exist, so multiple transactions of the kind described |
| // can lead to multiple rows with the same key. Instead, we *could* lock the whole table down, but that would interfere with |
| // parallelism. The lowest-impact approach is to make sure an index constraint is in place, and first attempt to do an INSERT. |
| // That attempt will fail if a record already exists. Then, an update can be attempted. |
| // |
| // In the situation where the INSERT fails, the current transaction is aborted and a new transaction must be performed. |
| // This means that it is impossible to structure things so that the UPDATE is guaranteed to succeed. So, on the event of an |
| // INSERT failure, the UPDATE is tried, but if that fails too, then the INSERT is tried again. This should also handle the |
| // case where a DELETE in another transaction removes the database row before it can be UPDATEd. |
| // |
| // If the UPDATE does not appear to modify any rows, this is also a signal that the INSERT must be retried. |
| // |
| |
| // Try the update first. Typically this succeeds except in the case where a doc is indexed for the first time. |
| map.clear(); |
| if (componentHash != null) |
| map.put(componentHashField,componentHash); |
| map.put(lastVersionField,documentVersion); |
| map.put(lastTransformationVersionField,transformationVersion); |
| map.put(lastOutputVersionField,outputVersion); |
| map.put(lastIngestField,new Long(ingestTime)); |
| if (documentURI != null) |
| { |
| map.put(docURIField,documentURI); |
| map.put(uriHashField,documentURIHash); |
| } |
| if (authorityNameString != null) |
| map.put(authorityNameField,authorityNameString); |
| else |
| map.put(authorityNameField,""); |
| |
| // Transaction abort due to deadlock should be retried here. |
| while (true) |
| { |
| long sleepAmt = 0L; |
| |
| beginTransaction(); |
| try |
| { |
| // Look for existing row. |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(docKeyField,docKey), |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))}); |
| IResultSet set = performQuery("SELECT "+idField+","+changeCountField+" FROM "+getTableName()+" WHERE "+ |
| query+" FOR UPDATE",list,null,null); |
| IResultRow row = null; |
| if (set.getRowCount() > 0) |
| row = set.getRow(0); |
| |
| if (row != null) |
| { |
| // Update the record |
| list.clear(); |
| query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(idField,row.getValue(idField))}); |
| long changeCount = ((Long)row.getValue(changeCountField)).longValue(); |
| changeCount++; |
| map.put(changeCountField,new Long(changeCount)); |
| performUpdate(map,"WHERE "+query,list,null); |
| // Update successful! |
| performCommit(); |
| return; |
| } |
| |
| // Update failed to find a matching record, so try the insert |
| break; |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) |
| { |
| if (Logging.perf.isDebugEnabled()) |
| Logging.perf.debug("Aborted transaction noting ingestion: "+e.getMessage()); |
| sleepAmt = getSleepAmt(); |
| continue; |
| } |
| |
| throw e; |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| sleepFor(sleepAmt); |
| } |
| } |
| |
| // Set up for insert |
| map.clear(); |
| if (componentHash != null) |
| map.put(componentHashField,componentHash); |
| map.put(lastVersionField,documentVersion); |
| map.put(lastTransformationVersionField,transformationVersion); |
| map.put(lastOutputVersionField,outputVersion); |
| map.put(lastIngestField,new Long(ingestTime)); |
| if (documentURI != null) |
| { |
| map.put(docURIField,documentURI); |
| map.put(uriHashField,documentURIHash); |
| } |
| if (authorityNameString != null) |
| map.put(authorityNameField,authorityNameString); |
| else |
| map.put(authorityNameField,""); |
| |
| Long id = new Long(IDFactory.make(threadContext)); |
| map.put(idField,id); |
| map.put(outputConnNameField,outputConnectionName); |
| map.put(docKeyField,docKey); |
| map.put(changeCountField,new Long(1)); |
| map.put(firstIngestField,map.get(lastIngestField)); |
| beginTransaction(); |
| try |
| { |
| performInsert(map,null); |
| noteModifications(1,0,0); |
| performCommit(); |
| return; |
| } |
| catch (ManifoldCFException e) |
| { |
| signalRollback(); |
| // If this is simply a constraint violation, we just want to fall through and try the update! |
| if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT) |
| throw e; |
| // Otherwise, exit transaction and fall through to 'update' attempt |
| } |
| catch (Error e) |
| { |
| signalRollback(); |
| throw e; |
| } |
| finally |
| { |
| endTransaction(); |
| } |
| |
| // Insert must have failed. Attempt an update. |
| } |
| } |
| |
| /** Calculate how many clauses at a time |
| */ |
| protected int maxClauseDocumentURIChunk(String outputConnectionName) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| } |
| |
| /** Get a chunk of document uris. |
| *@param rval is the string array where the uris should be put. |
| *@param map is the map from id to index. |
| *@param clause is the in clause for the query. |
| *@param list are the doc keys for the query. |
| */ |
| protected void getDocumentURIChunk(List<DeleteInfo> rval, String outputConnectionName, |
| List<String> list) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(docKeyField,list), |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| |
| IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+ |
| query,newList,null,null); |
| |
| // Go through list and put into buckets. |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| //String docHash = row.getValue(docKeyField).toString(); |
| String lastURI = (String)row.getValue(docURIField); |
| if (lastURI != null && lastURI.length() == 0) |
| lastURI = null; |
| String lastURIHash = (String)row.getValue(uriHashField); |
| if (lastURIHash != null && lastURIHash.length() == 0) |
| lastURIHash = null; |
| String lastOutputVersion = (String)row.getValue(lastOutputVersionField); |
| rval.add(new DeleteInfo(lastURI,lastURIHash,lastOutputVersion)); |
| } |
| } |
| |
| /** Calculate how many clauses at a time |
| */ |
| protected int maxClauseDocumentURIChunk(String outputConnectionName, String componentHash) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))}); |
| } |
| |
| /** Get a chunk of document uris. |
| *@param rval is the string array where the uris should be put. |
| *@param map is the map from id to index. |
| *@param clause is the in clause for the query. |
| *@param list are the doc keys for the query. |
| *@param componentHash is the component hash, if any, for the query. |
| */ |
| protected void getDocumentURIChunk(List<DeleteInfo> rval, String outputConnectionName, |
| List<String> list, String componentHash) |
| throws ManifoldCFException |
| { |
| ArrayList newList = new ArrayList(); |
| String query = buildConjunctionClause(newList,new ClauseDescription[]{ |
| new MultiClause(docKeyField,list), |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))}); |
| |
| IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+ |
| query,newList,null,null); |
| |
| // Go through list and put into buckets. |
| for (int i = 0; i < set.getRowCount(); i++) |
| { |
| IResultRow row = set.getRow(i); |
| //String docHash = row.getValue(docKeyField).toString(); |
| String lastURI = (String)row.getValue(docURIField); |
| if (lastURI != null && lastURI.length() == 0) |
| lastURI = null; |
| String lastURIHash = (String)row.getValue(uriHashField); |
| if (lastURIHash != null && lastURIHash.length() == 0) |
| lastURIHash = null; |
| String lastOutputVersion = (String)row.getValue(lastOutputVersionField); |
| rval.add(new DeleteInfo(lastURI,lastURIHash,lastOutputVersion)); |
| } |
| } |
| |
| /** Count the clauses |
| */ |
| protected int maxClauseDocumentIngestDataChunk(String outputConnectionName) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| } |
| |
| /** Count the clauses |
| */ |
| protected int maxClausePipelineDocumentIngestDataChunk(String[] outputConnectionNames) |
| { |
| return findConjunctionClauseMax(new ClauseDescription[]{ |
| new MultiClause(outputConnNameField,outputConnectionNames)}); |
| } |
| |
| |
| // Protected methods |
| |
| /** Remove document, using the specified output connection, via the standard pool. |
| */ |
| protected void removeDocument(IOutputConnection connection, String documentURI, String outputDescription, IOutputRemoveActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| IOutputConnector connector = outputConnectorPool.grab(connection); |
| if (connector == null) |
| // The connector is not installed; treat this as a service interruption. |
| throw new ServiceInterruption("Output connector not installed",0L); |
| try |
| { |
| connector.removeDocument(documentURI,outputDescription,activities); |
| } |
| finally |
| { |
| outputConnectorPool.release(connection,connector); |
| } |
| } |
| |
| /** Make a key from a document class and a hash */ |
| protected static String makeKey(String documentClass, String documentHash) |
| { |
| return documentClass + ":" + documentHash; |
| } |
| |
| /** This class contains the information necessary to delete a document */ |
| protected static class DeleteInfo |
| { |
| protected String uriValue; |
| protected String uriHashValue; |
| protected String outputVersion; |
| |
| public DeleteInfo(String uriValue, String uriHashValue, String outputVersion) |
| { |
| this.uriValue = uriValue; |
| this.uriHashValue = uriHashValue; |
| this.outputVersion = outputVersion; |
| } |
| |
| public String getURI() |
| { |
| return uriValue; |
| } |
| |
| public String getURIHash() |
| { |
| return uriHashValue; |
| } |
| |
| public String getOutputVersion() |
| { |
| return outputVersion; |
| } |
| } |
| |
| /** Wrapper class for add activity. This handles conversion of output connector activity logging to |
| * qualified activity names */ |
| protected static class OutputRecordingActivity implements IOutputHistoryActivity |
| { |
| protected final IOutputHistoryActivity activityProvider; |
| protected final String outputConnectionName; |
| |
| public OutputRecordingActivity(IOutputHistoryActivity activityProvider, String outputConnectionName) |
| { |
| this.activityProvider = activityProvider; |
| this.outputConnectionName = outputConnectionName; |
| } |
| |
| /** Record time-stamped information about the activity of the output connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityURI is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityURI, String resultCode, String resultDescription) |
| throws ManifoldCFException |
| { |
| activityProvider.recordActivity(startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName), |
| dataSize,entityURI,resultCode,resultDescription); |
| } |
| |
| } |
| |
| /** Wrapper class for add activity. This handles conversion of transformation connector activity logging to |
| * qualified activity names */ |
| protected static class TransformationRecordingActivity implements IOutputHistoryActivity |
| { |
| protected final IOutputHistoryActivity activityProvider; |
| protected final String transformationConnectionName; |
| |
| public TransformationRecordingActivity(IOutputHistoryActivity activityProvider, String transformationConnectionName) |
| { |
| this.activityProvider = activityProvider; |
| this.transformationConnectionName = transformationConnectionName; |
| } |
| |
| /** Record time-stamped information about the activity of the output connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityURI is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityURI, String resultCode, String resultDescription) |
| throws ManifoldCFException |
| { |
| activityProvider.recordActivity(startTime,ManifoldCF.qualifyTransformationActivityName(activityType,transformationConnectionName), |
| dataSize,entityURI,resultCode,resultDescription); |
| } |
| |
| } |
| |
| protected static class OutputRemoveActivitiesWrapper extends OutputRecordingActivity implements IOutputRemoveActivity |
| { |
| protected final IOutputRemoveActivity removeActivities; |
| |
| public OutputRemoveActivitiesWrapper(IOutputRemoveActivity removeActivities, String outputConnectionName) |
| { |
| super(removeActivities,outputConnectionName); |
| this.removeActivities = removeActivities; |
| } |
| |
| } |
| |
| protected static class OutputAddActivitiesWrapper extends OutputRecordingActivity implements IOutputAddActivity |
| { |
| protected final IOutputAddActivity addActivities; |
| |
| public OutputAddActivitiesWrapper(IOutputAddActivity addActivities, String outputConnectionName) |
| { |
| super(addActivities,outputConnectionName); |
| this.addActivities = addActivities; |
| } |
| |
| /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method |
| * includes the authority name with the access token, if any, so that each authority may establish its own token space. |
| *@param authorityNameString is the name of the authority to use to qualify the access token. |
| *@param accessToken is the raw, repository access token. |
| *@return the properly qualified access token. |
| */ |
| @Override |
| public String qualifyAccessToken(String authorityNameString, String accessToken) |
| throws ManifoldCFException |
| { |
| return addActivities.qualifyAccessToken(authorityNameString,accessToken); |
| } |
| |
| /** Send a document via the pipeline to the next output connection. |
| *@param documentURI is the document's URI. |
| *@param document is the document data to be processed (handed to the output data store). |
| *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. |
| *@throws IOException only if there's an IO error reading the data from the document. |
| */ |
| @Override |
| public int sendDocument(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| return addActivities.sendDocument(documentURI,document); |
| } |
| |
| /** Send NO document via the pipeline to the next output connection. This is equivalent |
| * to sending an empty document placeholder. |
| */ |
| @Override |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| addActivities.noDocument(); |
| } |
| |
| /** Detect if a date is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document |
| * in the first place. |
| *@param date is the mime type of the document. |
| *@return true if the date can be accepted by the downstream connection. |
| */ |
| @Override |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return addActivities.checkDateIndexable(date); |
| } |
| |
| /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document |
| * in the first place. |
| *@param mimeType is the mime type of the document. |
| *@return true if the mime type can be accepted by the downstream connection. |
| */ |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return addActivities.checkMimeTypeIndexable(mimeType); |
| } |
| |
| /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is |
| * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support |
| * search engines that only handle a small set of accepted file types. |
| *@param localFile is the local file to check. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return addActivities.checkDocumentIndexable(localFile); |
| } |
| |
| /** Pre-determine whether a document's length is acceptable downstream. This method is used |
| * to determine whether to fetch a document in the first place. |
| *@param length is the length of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return addActivities.checkLengthIndexable(length); |
| } |
| |
| /** Pre-determine whether a document's URL is acceptable downstream. This method is used |
| * to help filter out documents that cannot be indexed in advance. |
| *@param url is the URL of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkURLIndexable(String url) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return addActivities.checkURLIndexable(url); |
| } |
| |
| } |
| |
| protected static class OutputActivitiesWrapper extends OutputAddActivitiesWrapper implements IOutputActivity |
| { |
| protected final IOutputActivity activities; |
| |
| public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName) |
| { |
| super(activities,outputConnectionName); |
| this.activities = activities; |
| } |
| } |
| |
| protected class PipelineObject |
| { |
| public final IPipelineSpecification pipelineConnections; |
| public final IOutputConnector[] outputConnectors; |
| public final ITransformationConnector[] transformationConnectors; |
| |
| public PipelineObject( |
| IPipelineSpecification pipelineConnections, |
| ITransformationConnector[] transformationConnectors, |
| IOutputConnector[] outputConnectors) |
| { |
| this.pipelineConnections = pipelineConnections; |
| this.transformationConnectors = transformationConnectors; |
| this.outputConnectors = outputConnectors; |
| } |
| |
| public boolean checkDateIndexable(Date date, IOutputCheckActivity finalActivity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity); |
| return entryPoint.checkDateIndexable(date); |
| } |
| |
| public boolean checkMimeTypeIndexable(String mimeType, IOutputCheckActivity finalActivity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity); |
| return entryPoint.checkMimeTypeIndexable(mimeType); |
| } |
| |
| public boolean checkDocumentIndexable(File localFile, IOutputCheckActivity finalActivity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity); |
| return entryPoint.checkDocumentIndexable(localFile); |
| } |
| |
| public boolean checkLengthIndexable(long length, IOutputCheckActivity finalActivity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity); |
| return entryPoint.checkLengthIndexable(length); |
| } |
| |
| public boolean checkURLIndexable(String uri, IOutputCheckActivity finalActivity) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineCheckFanout entryPoint = buildCheckPipeline(finalActivity); |
| return entryPoint.checkURLIndexable(uri); |
| } |
| |
| public void release() |
| throws ManifoldCFException |
| { |
| outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors); |
| transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors); |
| } |
| |
| protected PipelineCheckFanout buildCheckPipeline(IOutputCheckActivity finalActivity) |
| { |
| // Algorithm for building a pipeline: |
| // (1) We start with the set of final output connection stages, and build an entry point for each one. That's our "current set". |
| // (2) We cycle through the "current set". For each member, we attempt to go upstream a level. |
| // (3) Before we can build the pipeline activity class for the next upstream stage, we need to have present ALL of the children that share that |
| // parent. If we don't have that yet, we throw the stage back into the list. |
| // (4) We continue until there is one stage left that has no parent, and that's what we return. |
| |
| // Create the current set |
| Map<Integer,PipelineCheckEntryPoint> currentSet = new HashMap<Integer,PipelineCheckEntryPoint>(); |
| // First, locate all the output stages, and enter them into the set |
| int count = pipelineConnections.getOutputCount(); |
| for (int i = 0; i < count; i++) |
| { |
| int outputStage = pipelineConnections.getOutputStage(i); |
| PipelineCheckEntryPoint outputStageEntryPoint = new PipelineCheckEntryPoint( |
| outputConnectors[pipelineConnections.getOutputConnectionIndex(outputStage).intValue()], |
| pipelineConnections.getStageDescriptionString(outputStage),finalActivity); |
| currentSet.put(new Integer(outputStage), outputStageEntryPoint); |
| } |
| // Cycle through the "current set" |
| while (true) |
| { |
| int parent = -1; |
| int[] siblings = null; |
| for (Integer outputStage : currentSet.keySet()) |
| { |
| parent = pipelineConnections.getStageParent(outputStage.intValue()); |
| // Look up the children |
| siblings = pipelineConnections.getStageChildren(parent); |
| // Are all the siblings in the current set yet? If not, we can't proceed with this entry. |
| boolean skipToNext = false; |
| for (int sibling : siblings) |
| { |
| if (currentSet.get(new Integer(sibling)) == null) |
| { |
| skipToNext = true; |
| break; |
| } |
| } |
| if (skipToNext) |
| { |
| siblings = null; |
| continue; |
| } |
| // All siblings are present! |
| break; |
| } |
| |
| // Siblings will be set if there's a stage we can do. If not, we're done, but this should already have been detected. |
| if (siblings == null) |
| throw new IllegalStateException("Not at root but can't progress"); |
| |
| PipelineCheckEntryPoint[] siblingEntryPoints = new PipelineCheckEntryPoint[siblings.length]; |
| for (int j = 0; j < siblings.length; j++) |
| { |
| siblingEntryPoints[j] = currentSet.remove(new Integer(siblings[j])); |
| } |
| // Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors. |
| PipelineCheckFanout pcf = new PipelineCheckFanout(siblingEntryPoints); |
| if (parent == -1) |
| return pcf; |
| PipelineCheckEntryPoint newEntry = new PipelineCheckEntryPoint( |
| transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()], |
| pipelineConnections.getStageDescriptionString(parent),pcf); |
| currentSet.put(new Integer(parent), newEntry); |
| } |
| } |
| } |
| |
| protected class PipelineObjectWithVersions extends PipelineObject |
| { |
| protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions; |
| |
| public PipelineObjectWithVersions( |
| IPipelineSpecificationWithVersions pipelineSpecificationWithVersions, |
| ITransformationConnector[] transformationConnectors, |
| IOutputConnector[] outputConnectors) |
| { |
| super(pipelineSpecificationWithVersions,transformationConnectors,outputConnectors); |
| this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions; |
| } |
| |
| public int addOrReplaceDocumentWithException(String docKey, String componentHash, String documentURI, RepositoryDocument document, String newDocumentVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,authorityNameString,ingestTime,docKey,componentHash); |
| return entryPoint.sendDocument(documentURI,document); |
| } |
| |
| public void noDocument(String docKey, String componentHash, String newDocumentVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,authorityNameString,ingestTime,docKey,componentHash); |
| entryPoint.noDocument(); |
| } |
| |
| protected PipelineAddFanout buildAddPipeline(IOutputActivity finalActivity, |
| String newDocumentVersion, String newAuthorityNameString, |
| long ingestTime, String docKey, String componentHash) |
| { |
| // Algorithm for building a pipeline: |
| // (1) We start with the set of final output connection stages, and build an entry point for each one. That's our "current set". |
| // (2) We cycle through the "current set". For each member, we attempt to go upstream a level. |
| // (3) Before we can build the pipeline activity class for the next upstream stage, we need to have present ALL of the children that share that |
| // parent. If we don't have that yet, we throw the stage back into the list. |
| // (4) We continue until there is one stage left that has no parent, and that's what we return. |
| |
| // Create the current set |
| Map<Integer,PipelineAddEntryPoint> currentSet = new HashMap<Integer,PipelineAddEntryPoint>(); |
| // First, locate all the output stages, and enter them into the set |
| IPipelineSpecificationWithVersions fullSpec = pipelineSpecificationWithVersions; |
| |
| int outputCount = fullSpec.getOutputCount(); |
| for (int i = 0; i < outputCount; i++) |
| { |
| int outputStage = fullSpec.getOutputStage(i); |
| |
| // Compute whether we need to reindex this record to this output or not, based on spec. |
| String oldDocumentVersion = fullSpec.getOutputDocumentVersionString(i); |
| String oldOutputVersion = fullSpec.getOutputVersionString(i); |
| String oldTransformationVersion = fullSpec.getOutputTransformationVersionString(i); |
| String oldAuthorityName = fullSpec.getAuthorityNameString(i); |
| |
| // Compute the transformation version string. Must always be computed if we're going to reindex, since we save it. |
| String newTransformationVersion = computePackedTransformationVersion(fullSpec,outputStage); |
| |
| boolean needToReindex = (oldDocumentVersion == null); |
| if (needToReindex == false) |
| { |
| // We need a way to signal that a document has no valid version string. |
| // That way is when the new document version string is empty. |
| needToReindex = (newDocumentVersion.length() == 0 || |
| !oldDocumentVersion.equals(newDocumentVersion) || |
| !oldOutputVersion.equals(fullSpec.getStageDescriptionString(outputStage).getVersionString()) || |
| !oldAuthorityName.equals((newAuthorityNameString==null)?"":newAuthorityNameString)); |
| } |
| if (needToReindex == false) |
| { |
| needToReindex = (!oldTransformationVersion.equals(newTransformationVersion)); |
| } |
| |
| int connectionIndex = fullSpec.getOutputConnectionIndex(outputStage).intValue(); |
| PipelineAddEntryPoint outputStageEntryPoint = new OutputAddEntryPoint( |
| outputConnectors[connectionIndex], |
| fullSpec.getStageDescriptionString(outputStage), |
| new OutputActivitiesWrapper(finalActivity,fullSpec.getStageConnectionName(outputStage)), |
| needToReindex, |
| fullSpec.getStageConnectionName(outputStage), |
| newTransformationVersion, |
| ingestTime, |
| newDocumentVersion, |
| docKey, |
| componentHash, |
| newAuthorityNameString); |
| currentSet.put(new Integer(outputStage), outputStageEntryPoint); |
| } |
| // Cycle through the "current set" |
| while (true) |
| { |
| int parent = -1; |
| int[] siblings = null; |
| for (Integer outputStage : currentSet.keySet()) |
| { |
| parent = fullSpec.getStageParent(outputStage.intValue()); |
| // Look up the children |
| siblings = fullSpec.getStageChildren(parent); |
| // Are all the siblings in the current set yet? If not, we can't proceed with this entry. |
| boolean skipToNext = false; |
| for (int sibling : siblings) |
| { |
| if (currentSet.get(new Integer(sibling)) == null) |
| { |
| skipToNext = true; |
| break; |
| } |
| } |
| if (skipToNext) |
| { |
| siblings = null; |
| continue; |
| } |
| // All siblings are present! |
| break; |
| } |
| |
| // Siblings will be set if there's a stage we can do. If not, we're done, but this should already have been detected. |
| if (siblings == null) |
| throw new IllegalStateException("Not at root but can't progress"); |
| |
| PipelineAddEntryPoint[] siblingEntryPoints = new PipelineAddEntryPoint[siblings.length]; |
| for (int j = 0; j < siblings.length; j++) |
| { |
| siblingEntryPoints[j] = currentSet.remove(new Integer(siblings[j])); |
| } |
| // Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors. |
| PipelineAddFanout pcf = new PipelineAddFanout(siblingEntryPoints, |
| (parent==-1)?null:new TransformationRecordingActivity(finalActivity, |
| fullSpec.getStageConnectionName(parent)), |
| finalActivity); |
| if (parent == -1) |
| return pcf; |
| PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint( |
| transformationConnectors[fullSpec.getTransformationConnectionIndex(parent).intValue()], |
| fullSpec.getStageDescriptionString(parent),newAuthorityNameString,pcf,pcf.checkNeedToReindex()); |
| currentSet.put(new Integer(parent), newEntry); |
| } |
| |
| } |
| |
| } |
| |
| /** This class describes the entry stage of multiple siblings in a check pipeline. |
| */ |
| public static class PipelineCheckFanout implements IOutputCheckActivity |
| { |
| protected final PipelineCheckEntryPoint[] entryPoints; |
| |
| public PipelineCheckFanout(PipelineCheckEntryPoint[] entryPoints) |
| { |
| this.entryPoints = entryPoints; |
| } |
| |
| @Override |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineCheckEntryPoint p : entryPoints) |
| { |
| if (p.checkDateIndexable(date)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineCheckEntryPoint p : entryPoints) |
| { |
| if (p.checkMimeTypeIndexable(mimeType)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineCheckEntryPoint p : entryPoints) |
| { |
| if (p.checkDocumentIndexable(localFile)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineCheckEntryPoint p : entryPoints) |
| { |
| if (p.checkLengthIndexable(length)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkURLIndexable(String uri) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineCheckEntryPoint p : entryPoints) |
| { |
| if (p.checkURLIndexable(uri)) |
| return true; |
| } |
| return false; |
| } |
| } |
| |
| /** This class describes the entry stage of a check pipeline. |
| */ |
| public static class PipelineCheckEntryPoint |
| { |
| protected final IPipelineConnector pipelineConnector; |
| protected final VersionContext pipelineDescriptionString; |
| protected final IOutputCheckActivity checkActivity; |
| |
| public PipelineCheckEntryPoint( |
| IPipelineConnector pipelineConnector, |
| VersionContext pipelineDescriptionString, |
| IOutputCheckActivity checkActivity) |
| { |
| this.pipelineConnector= pipelineConnector; |
| this.pipelineDescriptionString = pipelineDescriptionString; |
| this.checkActivity = checkActivity; |
| } |
| |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkDateIndexable(pipelineDescriptionString,date,checkActivity); |
| } |
| |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,checkActivity); |
| } |
| |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,checkActivity); |
| } |
| |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,checkActivity); |
| } |
| |
| public boolean checkURLIndexable(String uri) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkURLIndexable(pipelineDescriptionString,uri,checkActivity); |
| } |
| |
| } |
| |
| /** This class describes the entry stage of multiple siblings in an add pipeline. |
| */ |
| public static class PipelineAddFanout implements IOutputAddActivity |
| { |
| protected final PipelineAddEntryPoint[] entryPoints; |
| protected final IOutputHistoryActivity finalHistoryActivity; |
| protected final IOutputQualifyActivity finalQualifyActivity; |
| |
| public PipelineAddFanout(PipelineAddEntryPoint[] entryPoints, IOutputHistoryActivity finalHistoryActivity, |
| IOutputQualifyActivity finalQualifyActivity) |
| { |
| this.entryPoints = entryPoints; |
| this.finalHistoryActivity = finalHistoryActivity; |
| this.finalQualifyActivity = finalQualifyActivity; |
| } |
| |
| public boolean checkNeedToReindex() |
| { |
| // Look at the entry points, and make sure they're not all disabled. |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.isActive()) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.checkDateIndexable(date)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.checkMimeTypeIndexable(mimeType)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.checkDocumentIndexable(localFile)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.checkLengthIndexable(length)) |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean checkURLIndexable(String uri) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // OR all results |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.checkURLIndexable(uri)) |
| return true; |
| } |
| return false; |
| } |
| |
| /** Send a document via the pipeline to the next output connection. |
| *@param documentURI is the document's URI. |
| *@param document is the document data to be processed (handed to the output data store). |
| *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. |
| *@throws IOException only if there's an IO error reading the data from the document. |
| */ |
| @Override |
| public int sendDocument(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| // First, count the number of active entry points. |
| int activeCount = 0; |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.isActive()) |
| activeCount++; |
| } |
| if (activeCount <= 1) |
| { |
| // No need to copy anything. |
| int rval = IPipelineConnector.DOCUMENTSTATUS_REJECTED; |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (!p.isActive()) |
| continue; |
| if (p.addOrReplaceDocumentWithException(documentURI,document) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED) |
| rval = IPipelineConnector.DOCUMENTSTATUS_ACCEPTED; |
| } |
| return rval; |
| } |
| else |
| { |
| // Create a RepositoryDocumentFactory, which we'll need to clean up at the end. |
| RepositoryDocumentFactory factory = new RepositoryDocumentFactory(document); |
| try |
| { |
| // If any of them accept the document, we return "accept". |
| int rval = IPipelineConnector.DOCUMENTSTATUS_REJECTED; |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (!p.isActive()) |
| continue; |
| if (p.addOrReplaceDocumentWithException(documentURI,factory.createDocument()) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED) |
| rval = IPipelineConnector.DOCUMENTSTATUS_ACCEPTED; |
| } |
| return rval; |
| } |
| finally |
| { |
| factory.close(); |
| } |
| } |
| } |
| |
| /** Send NO document via the pipeline to the next output connection. This is equivalent |
| * to sending an empty document placeholder. |
| */ |
| @Override |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| for (PipelineAddEntryPoint p : entryPoints) |
| { |
| if (p.isActive()) |
| { |
| // Invoke the addEntryPoint method for handling "noDocument" |
| p.noDocument(); |
| } |
| } |
| } |
| |
| /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method |
| * includes the authority name with the access token, if any, so that each authority may establish its own token space. |
| *@param authorityNameString is the name of the authority to use to qualify the access token. |
| *@param accessToken is the raw, repository access token. |
| *@return the properly qualified access token. |
| */ |
| @Override |
| public String qualifyAccessToken(String authorityNameString, String accessToken) |
| throws ManifoldCFException |
| { |
| // This functionality does not need to be staged; we just want to vector through to the final stage directly. |
| return finalQualifyActivity.qualifyAccessToken(authorityNameString,accessToken); |
| } |
| |
| /** Record time-stamped information about the activity of the output connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityURI is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityURI, String resultCode, String resultDescription) |
| throws ManifoldCFException |
| { |
| // Each stage of the pipeline uses a specific activity for recording history, but it's not fundamentally |
| // pipelined |
| finalHistoryActivity.recordActivity(startTime,activityType,dataSize,entityURI,resultCode,resultDescription); |
| } |
| } |
| |
| /** This class describes the entry stage of an add pipeline. |
| */ |
| public static class PipelineAddEntryPoint |
| { |
| protected final IPipelineConnector pipelineConnector; |
| protected final VersionContext pipelineDescriptionString; |
| protected final String authorityNameString; |
| protected final IOutputAddActivity addActivity; |
| protected final boolean isActive; |
| |
| public PipelineAddEntryPoint(IPipelineConnector pipelineConnector, |
| VersionContext pipelineDescriptionString, |
| String authorityNameString, |
| IOutputAddActivity addActivity, |
| boolean isActive) |
| { |
| this.pipelineConnector = pipelineConnector; |
| this.pipelineDescriptionString = pipelineDescriptionString; |
| this.authorityNameString = authorityNameString; |
| this.addActivity = addActivity; |
| this.isActive = isActive; |
| } |
| |
| public boolean isActive() |
| { |
| return isActive; |
| } |
| |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkDateIndexable(pipelineDescriptionString,date,addActivity); |
| } |
| |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,addActivity); |
| } |
| |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,addActivity); |
| } |
| |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,addActivity); |
| } |
| |
| public boolean checkURLIndexable(String uri) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return pipelineConnector.checkURLIndexable(pipelineDescriptionString,uri,addActivity); |
| } |
| |
| public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| // If the transformation connector doesn't do what it should, compensate! |
| MonitoredAddActivityWrapper wrapper = new MonitoredAddActivityWrapper(addActivity); |
| int rval = pipelineConnector.addOrReplaceDocumentWithException( |
| documentURI,pipelineDescriptionString, |
| document,authorityNameString,wrapper); |
| // The wrapper detects activity by the connector, so if we don't see either sendDocument() or |
| // noDocument(), we issue noDocument() ourselves. If the connector was an output connector, |
| // this will wind up being a no-op, but otherwise it will guarantee that recording takes place. |
| if (!wrapper.wasDocumentActedUpon()) |
| addActivity.noDocument(); |
| return rval; |
| } |
| |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| // Call the addActivity method for handling no document |
| addActivity.noDocument(); |
| } |
| } |
| |
| public class OutputAddEntryPoint extends PipelineAddEntryPoint |
| { |
| protected final IOutputConnector outputConnector; |
| protected final String outputConnectionName; |
| protected final String transformationVersion; |
| protected final long ingestTime; |
| protected final String documentVersion; |
| protected final String docKey; |
| protected final String componentHash; |
| protected final IOutputActivity activity; |
| |
| public OutputAddEntryPoint(IOutputConnector outputConnector, |
| VersionContext outputDescriptionString, |
| IOutputActivity activity, |
| boolean isActive, |
| String outputConnectionName, |
| String transformationVersion, |
| long ingestTime, |
| String documentVersion, |
| String docKey, |
| String componentHash, |
| String authorityNameString) |
| { |
| super(outputConnector,outputDescriptionString,authorityNameString,activity,isActive); |
| this.outputConnector = outputConnector; |
| this.outputConnectionName = outputConnectionName; |
| this.transformationVersion = transformationVersion; |
| this.ingestTime = ingestTime; |
| this.documentVersion = documentVersion; |
| this.docKey = docKey; |
| this.componentHash = componentHash; |
| this.activity = activity; |
| } |
| |
| @Override |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| try |
| { |
| addOrReplaceDocumentWithException(null,null); |
| } |
| catch (IOException e) |
| { |
| throw new RuntimeException("Unexpected IOException: "+e.getMessage(),e); |
| } |
| } |
| |
| @Override |
| public int addOrReplaceDocumentWithException(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| // No transactions; not safe because post may take too much time |
| |
| // First, calculate a document uri hash value |
| String documentURIHash = null; |
| if (documentURI != null) |
| documentURIHash = ManifoldCF.hash(documentURI); |
| |
| String oldURI = null; |
| String oldURIHash = null; |
| String oldOutputVersion = null; |
| |
| // Repeat if needed |
| while (true) |
| { |
| long sleepAmt = 0L; |
| try |
| { |
| // See what uri was used before for this doc, if any |
| ArrayList list = new ArrayList(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(docKeyField,docKey), |
| new UnitaryClause(outputConnNameField,outputConnectionName), |
| (componentHash == null || componentHash.length() == 0)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash)}); |
| |
| IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+ |
| " WHERE "+query,list,null,null); |
| |
| if (set.getRowCount() > 0) |
| { |
| IResultRow row = set.getRow(0); |
| oldURI = (String)row.getValue(docURIField); |
| oldURIHash = (String)row.getValue(uriHashField); |
| oldOutputVersion = (String)row.getValue(lastOutputVersionField); |
| } |
| |
| break; |
| } |
| catch (ManifoldCFException e) |
| { |
| // Look for deadlock and retry if so |
| if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT) |
| { |
| if (Logging.perf.isDebugEnabled()) |
| Logging.perf.debug("Aborted select looking for status: "+e.getMessage()); |
| sleepAmt = getSleepAmt(); |
| continue; |
| } |
| throw e; |
| } |
| finally |
| { |
| sleepFor(sleepAmt); |
| } |
| } |
| |
| // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave |
| // dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well. |
| |
| // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections |
| // to block the rare case that multiple threads try to work on the same URI. |
| |
| String[] lockArray = computeLockArray(documentURIHash,oldURIHash,outputConnectionName); |
| lockManager.enterLocks(null,null,lockArray); |
| try |
| { |
| |
| ArrayList list = new ArrayList(); |
| |
| if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI))) |
| { |
| // Delete all records from the database that match the old URI, except for THIS record. |
| list.clear(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(uriHashField,"=",oldURIHash), |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| list.add(docKey); |
| performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null); |
| outputConnector.removeDocument(oldURI,oldOutputVersion,activity); |
| } |
| |
| if (documentURI != null) |
| { |
| // Get rid of all records that match the NEW uri, except for this record. |
| list.clear(); |
| String query = buildConjunctionClause(list,new ClauseDescription[]{ |
| new UnitaryClause(uriHashField,"=",documentURIHash), |
| new UnitaryClause(outputConnNameField,outputConnectionName)}); |
| list.add(docKey); |
| performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null); |
| |
| // Now, we know we are ready for the ingest. |
| |
| // Here are the cases: |
| // 1) There was a service interruption before the upload started. |
| // (In that case, we don't need to log anything, just reschedule). |
| // 2) There was a service interruption after the document was transmitted. |
| // (In that case, we should presume that the document was ingested, but |
| // reschedule another import anyway.) |
| // 3) Everything went OK |
| // (need to log the ingestion.) |
| // 4) Everything went OK, but we were told we have an illegal document. |
| // (We note the ingestion because if we don't we will be forced to repeat ourselves. |
| // In theory, document doesn't need to be deleted, but there is no way to signal |
| // that at the moment.) |
| |
| // Note an ingestion before we actually try it. |
| // This is a marker that says "something is there"; it has an empty version, which indicates |
| // that we don't know anything about it. That means it will be reingested when the |
| // next version comes along, and will be deleted if called for also. |
| noteDocumentIngest(outputConnectionName,docKey,componentHash,null,null,null,null,ingestTime,documentURI,documentURIHash); |
| int result = super.addOrReplaceDocumentWithException(documentURI, document); |
| noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),authorityNameString,ingestTime,documentURI,documentURIHash); |
| return result; |
| } |
| |
| // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled |
| // to noteDocumentIngest by having the null documentURI. |
| noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),authorityNameString,ingestTime,null,null); |
| return IPipelineConnector.DOCUMENTSTATUS_ACCEPTED; |
| } |
| finally |
| { |
| lockManager.leaveLocks(null,null,lockArray); |
| } |
| } |
| } |
| |
| protected static String[] computeLockArray(String documentURIHash, String oldURIHash, String outputConnectionName) |
| { |
| int uriCount = 0; |
| if (documentURIHash != null) |
| uriCount++; |
| if (oldURIHash != null && (documentURIHash == null || !documentURIHash.equals(oldURIHash))) |
| uriCount++; |
| String[] lockArray = new String[uriCount]; |
| uriCount = 0; |
| if (documentURIHash != null) |
| lockArray[uriCount++] = createURILockName(outputConnectionName,documentURIHash); |
| if (oldURIHash != null && (documentURIHash == null || !documentURIHash.equals(oldURIHash))) |
| lockArray[uriCount++] = createURILockName(outputConnectionName,oldURIHash); |
| return lockArray; |
| } |
| |
| /** This class passes everything through, and monitors what happens so that the |
| * framework can compensate for any transformation connector coding errors. |
| */ |
| protected static class MonitoredAddActivityWrapper implements IOutputAddActivity |
| { |
| protected final IOutputAddActivity activities; |
| |
| protected boolean documentProcessed = false; |
| |
| public MonitoredAddActivityWrapper(IOutputAddActivity activities) |
| { |
| this.activities = activities; |
| } |
| |
| public boolean wasDocumentActedUpon() |
| { |
| return documentProcessed; |
| } |
| |
| /** Send a document via the pipeline to the next output connection. |
| *@param documentURI is the document's URI. |
| *@param document is the document data to be processed (handed to the output data store). |
| *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector. |
| *@throws IOException only if there's an IO error reading the data from the document. |
| */ |
| @Override |
| public int sendDocument(String documentURI, RepositoryDocument document) |
| throws ManifoldCFException, ServiceInterruption, IOException |
| { |
| if (documentProcessed) |
| throw new IllegalStateException("Document cannot have multiple dispositions"); |
| int rval = activities.sendDocument(documentURI,document); |
| documentProcessed = true; |
| return rval; |
| } |
| |
| /** Send NO document via the pipeline to the next output connection. This is equivalent |
| * to sending an empty document placeholder. |
| */ |
| @Override |
| public void noDocument() |
| throws ManifoldCFException, ServiceInterruption |
| { |
| if (documentProcessed) |
| throw new IllegalStateException("Document cannot have multiple dispositions"); |
| activities.noDocument(); |
| documentProcessed = true; |
| } |
| |
| /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method |
| * includes the authority name with the access token, if any, so that each authority may establish its own token space. |
| *@param authorityNameString is the name of the authority to use to qualify the access token. |
| *@param accessToken is the raw, repository access token. |
| *@return the properly qualified access token. |
| */ |
| @Override |
| public String qualifyAccessToken(String authorityNameString, String accessToken) |
| throws ManifoldCFException |
| { |
| return activities.qualifyAccessToken(authorityNameString,accessToken); |
| } |
| |
| /** Record time-stamped information about the activity of the output connector. |
| *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every |
| * activity has an associated time; the startTime field records when the activity began. A null value |
| * indicates that the start time and the finishing time are the same. |
| *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is |
| * used to categorize what kind of activity is being recorded. For example, a web connector might record a |
| * "fetch document" activity. Cannot be null. |
| *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable. |
| *@param entityURI is a (possibly long) string which identifies the object involved in the history record. |
| * The interpretation of this field will differ from connector to connector. May be null. |
| *@param resultCode contains a terse description of the result of the activity. The description is limited in |
| * size to 255 characters, and can be interpreted only in the context of the current connector. May be null. |
| *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result |
| * described in the resultCode field. This field is not meant to be queried on. May be null. |
| */ |
| @Override |
| public void recordActivity(Long startTime, String activityType, Long dataSize, |
| String entityURI, String resultCode, String resultDescription) |
| throws ManifoldCFException |
| { |
| activities.recordActivity(startTime,activityType,dataSize,entityURI,resultCode,resultDescription); |
| } |
| |
| /** Detect if a date is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document |
| * in the first place. |
| *@param date is the date of the document. |
| *@return true if the document described by the date can be accepted by the downstream connection. |
| */ |
| @Override |
| public boolean checkDateIndexable(Date date) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return activities.checkDateIndexable(date); |
| } |
| |
| /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document |
| * in the first place. |
| *@param mimeType is the mime type of the document. |
| *@return true if the mime type can be accepted by the downstream connection. |
| */ |
| @Override |
| public boolean checkMimeTypeIndexable(String mimeType) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return activities.checkMimeTypeIndexable(mimeType); |
| } |
| |
| /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is |
| * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support |
| * search engines that only handle a small set of accepted file types. |
| *@param localFile is the local file to check. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkDocumentIndexable(File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return activities.checkDocumentIndexable(localFile); |
| } |
| |
| /** Pre-determine whether a document's length is acceptable downstream. This method is used |
| * to determine whether to fetch a document in the first place. |
| *@param length is the length of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkLengthIndexable(long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return activities.checkLengthIndexable(length); |
| } |
| |
| /** Pre-determine whether a document's URL is acceptable downstream. This method is used |
| * to help filter out documents that cannot be indexed in advance. |
| *@param url is the URL of the document. |
| *@return true if the file is acceptable by the downstream connection. |
| */ |
| @Override |
| public boolean checkURLIndexable(String url) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| return activities.checkURLIndexable(url); |
| } |
| |
| } |
| |
| } |