blob: b8a0f12450e45685aaab836842a0c9003b7b61df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.connectors.alfrescowebscript;
import org.alfresco.consulting.indexer.client.AlfrescoClient;
import org.alfresco.consulting.indexer.client.AlfrescoDownException;
import org.alfresco.consulting.indexer.client.AlfrescoResponse;
import org.alfresco.consulting.indexer.client.WebScriptsAlfrescoClient;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.common.DateParser;
import org.apache.manifoldcf.crawler.connectors.BaseRepositoryConnector;
import org.apache.manifoldcf.crawler.interfaces.IExistingVersions;
import org.apache.manifoldcf.crawler.interfaces.IProcessActivity;
import org.apache.manifoldcf.crawler.interfaces.ISeedingActivity;
import org.apache.manifoldcf.crawler.system.Logging;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.InputStream;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class AlfrescoConnector extends BaseRepositoryConnector {
private static final String ACTIVITY_FETCH = "fetch document";
private static final String[] activitiesList = new String[]{ACTIVITY_FETCH};
private AlfrescoClient alfrescoClient;
private Boolean enableDocumentProcessing = Boolean.TRUE;
private static final String CONTENT_URL_PROPERTY = "contentUrlPath";
private static final String AUTHORITIES_PROPERTY = "readableAuthorities";
private static final String MIMETYPE_PROPERTY = "mimetype";
private static final String SIZE_PROPERTY = "size";
private static final String MODIFIED_DATE_PROPERTY = "cm:modified";
private static final String CREATED_DATE_PROPERTY = "cm:created";
// Static Fields
private static final String FIELD_UUID = "uuid";
private static final String FIELD_NODEREF = "nodeRef";
private static final String FIELD_TYPE = "type";
private static final String FIELD_NAME = "name";
@Override
public int getConnectorModel() {
return MODEL_ADD_CHANGE_DELETE; // We return only incremental documents.
}
public void setClient(AlfrescoClient client) {
alfrescoClient = client;
}
@Override
public void connect(ConfigParams config) {
super.connect(config);
String protocol = getConfig(config, "protocol", "http");
String hostname = getConfig(config, "hostname", "localhost");
String endpoint = getConfig(config, "endpoint", "/alfresco/service");
String storeProtocol = getConfig(config, "storeprotocol", "workspace");
String storeId = getConfig(config, "storeid", "SpacesStore");
String username = getConfig(config, "username", null);
String password = getObfuscatedConfig(config, "password", null);
this.enableDocumentProcessing = new Boolean(getConfig(config, "enabledocumentprocessing", "false"));
alfrescoClient = new WebScriptsAlfrescoClient(protocol, hostname, endpoint,
storeProtocol, storeId, username, password);
}
private static String getConfig(ConfigParams config,
String parameter,
String defaultValue) {
final String protocol = config.getParameter(parameter);
if (protocol == null) {
return defaultValue;
}
return protocol;
}
private static String getObfuscatedConfig(ConfigParams config,
String parameter,
String defaultValue) {
final String protocol = config.getObfuscatedParameter(parameter);
if (protocol == null) {
return defaultValue;
}
return protocol;
}
@Override
public String check() throws ManifoldCFException {
try {
// We really want to do something more like fetching a document here...
alfrescoClient.fetchUserAuthorities("admin");
return super.check();
} catch (AlfrescoDownException e) {
if (Logging.connectors != null) {
Logging.connectors.warn(e.getMessage(), e);
}
return "Connection failed: " + e.getMessage();
}
}
@Override
public void disconnect() throws ManifoldCFException {
super.disconnect();
}
@Override
public String[] getActivitiesList() {
return activitiesList;
}
@Override
public int getMaxDocumentRequest() {
return 20;
}
@Override
public String addSeedDocuments(ISeedingActivity activities, Specification spec,
String lastSeedVersion, long seedTime, int jobMode) throws ManifoldCFException, ServiceInterruption {
try {
long lastTransactionId = 0;
long lastAclChangesetId = 0;
if(lastSeedVersion != null && !lastSeedVersion.isEmpty()) {
StringTokenizer tokenizer = new StringTokenizer(lastSeedVersion,"|");
if (tokenizer.countTokens() == 2) {
lastTransactionId = new Long(tokenizer.nextToken());
lastAclChangesetId = new Long(tokenizer.nextToken());
}
}
if (Logging.connectors != null && Logging.connectors.isDebugEnabled())
Logging.connectors.debug(MessageFormat.format("Starting from transaction id: {0} and acl changeset id: {1}", new Object[]{lastTransactionId, lastAclChangesetId}));
long transactionIdsProcessed;
long aclChangesetsProcessed;
do {
final AlfrescoResponse response = alfrescoClient.
fetchNodes(lastTransactionId,
lastAclChangesetId,
ConfigurationHandler.getFilters(spec));
int count = 0;
for (Map<String, Object> doc : response.getDocuments()) {
// String json = gson.toJson(doc);
// activities.addSeedDocument(json);
String uuid = doc.get("uuid").toString();
activities.addSeedDocument(uuid);
count++;
}
if (Logging.connectors != null && Logging.connectors.isDebugEnabled())
Logging.connectors.debug(MessageFormat.format("Fetched and added {0} seed documents", new Object[]{new Integer(count)}));
transactionIdsProcessed = response.getLastTransactionId() - lastTransactionId;
aclChangesetsProcessed = response.getLastAclChangesetId() - lastAclChangesetId;
lastTransactionId = response.getLastTransactionId();
lastAclChangesetId = response.getLastAclChangesetId();
if (Logging.connectors != null && Logging.connectors.isDebugEnabled())
Logging.connectors.debug(MessageFormat.format("transaction_id={0}, acl_changeset_id={1}", new Object[]{lastTransactionId, lastAclChangesetId}));
} while (transactionIdsProcessed > 0 || aclChangesetsProcessed > 0);
if (Logging.connectors != null && Logging.connectors.isDebugEnabled())
Logging.connectors.debug(MessageFormat.format("Recording {0} as last transaction id and {1} as last changeset id", new Object[]{lastTransactionId, lastAclChangesetId}));
return lastTransactionId + "|" + lastAclChangesetId;
} catch (AlfrescoDownException e) {
handleAlfrescoDownException(e,"seeding");
return null;
}
}
@Override
public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
throws ManifoldCFException, ServiceInterruption {
try {
for (String doc : documentIdentifiers) {
String nextVersion = statuses.getIndexedVersionString(doc);
// Calling again Alfresco API because Document's actions are lost from seeding method
AlfrescoResponse response = alfrescoClient.fetchNode(doc);
if(response.getDocumentList().isEmpty()){ // Not found seeded document. Could reflect an error in Alfresco
if (Logging.connectors != null)
Logging.connectors.warn(MessageFormat.format("Invalid Seeded Document from Alfresco with ID {0}", new Object[]{doc}));
activities.deleteDocument(doc);
continue;
}
Map<String, Object> map = response.getDocumentList().get(0); // Should be only one
if ((Boolean) map.get("deleted")) {
activities.deleteDocument(doc);
continue;
}
// From the map, get the things we know about
String uuid = doc;
String nodeRef = map.containsKey(FIELD_NODEREF) ? map.get(FIELD_NODEREF).toString() : "";
String type = map.containsKey(FIELD_TYPE) ? map.get(FIELD_TYPE).toString() : "";
String name = map.containsKey(FIELD_NAME) ? map.get(FIELD_NAME).toString() : "";
// Fetch document metadata
Map<String,Object> properties = alfrescoClient.fetchMetadata(uuid);
// Process various special fields
Object mdObject;
// Size
Long lSize = null;
mdObject = properties.get(SIZE_PROPERTY);
if (mdObject != null) {
String size = mdObject.toString();
lSize = new Long(size);
}
// Modified Date
Date modifiedDate = null;
mdObject = properties.get(MODIFIED_DATE_PROPERTY);
if (mdObject != null) {
modifiedDate = DateParser.parseISO8601Date(mdObject.toString());
}
// Created Date
Date createdDate = null;
mdObject = properties.get(CREATED_DATE_PROPERTY);
if (mdObject != null) {
createdDate = DateParser.parseISO8601Date(mdObject.toString());
}
// Establish the document version.
if (modifiedDate == null) {
activities.deleteDocument(doc);
continue;
}
String documentVersion = new Long(modifiedDate.getTime()).toString();
if(!activities.checkDocumentNeedsReindexing(doc, documentVersion))
continue;
String mimeType = null;
Object mimetypeObject = properties.get(MIMETYPE_PROPERTY);
if (mimetypeObject != null) {
mimeType = mimetypeObject.toString();
}
if (lSize != null && !activities.checkLengthIndexable(lSize.longValue())) {
activities.noDocument(doc, documentVersion);
continue;
}
if (mimeType != null && !activities.checkMimeTypeIndexable(mimeType)) {
activities.noDocument(doc, documentVersion);
continue;
}
RepositoryDocument rd = new RepositoryDocument();
rd.addField(FIELD_NODEREF, nodeRef);
rd.addField(FIELD_TYPE, type);
rd.setFileName(name);
if (modifiedDate != null)
rd.setModifiedDate(modifiedDate);
if (createdDate != null)
rd.setCreatedDate(createdDate);
for(String property : properties.keySet()) {
Object propertyValue = properties.get(property);
rd.addField(property,propertyValue.toString());
}
if (mimeType != null && !mimeType.isEmpty())
rd.setMimeType(mimeType);
// Indexing Permissions
@SuppressWarnings("unchecked")
List<String> permissions = (List<String>) properties.remove(AUTHORITIES_PROPERTY);
if(permissions != null){
rd.setSecurityACL(RepositoryDocument.SECURITY_TYPE_DOCUMENT,
permissions.toArray(new String[permissions.size()]));
}
// Document Binary Content
InputStream stream;
long length;
byte[] empty = new byte[0];
String contentUrlPath = (String) properties.get(CONTENT_URL_PROPERTY);
if (contentUrlPath != null && !contentUrlPath.isEmpty()) {
if (this.enableDocumentProcessing) {
if (lSize != null) {
stream = alfrescoClient.fetchContent(contentUrlPath);
length = lSize.longValue();
} else {
stream = new ByteArrayInputStream(empty);
length = 0L;
}
} else {
stream = new ByteArrayInputStream(empty);
length = 0L;
}
} else {
stream = new ByteArrayInputStream(empty);
length = 0L;
}
try {
rd.setBinary(stream, length);
if (Logging.connectors != null && Logging.connectors.isDebugEnabled())
Logging.connectors.debug(MessageFormat.format("Ingesting with id: {0}, URI {1} and rd {2}", new Object[]{uuid, nodeRef, rd.getFileName()}));
activities.ingestDocumentWithException(doc, documentVersion, nodeRef/*was uuid*/, rd);
} catch (IOException e) {
handleIOException(e,"reading stream");
} finally {
try {
stream.close();
} catch (IOException e) {
handleIOException(e,"closing stream");
}
}
}
} catch (AlfrescoDownException e) {
handleAlfrescoDownException(e,"processing");
}
}
protected final static long interruptionRetryTime = 5L*60L*1000L;
protected static void handleAlfrescoDownException(AlfrescoDownException e, String context)
throws ManifoldCFException, ServiceInterruption {
long currentTime = System.currentTimeMillis();
// Server doesn't appear to by up. Try for a brief time then give up.
String message = "Server appears down during "+context+": "+e.getMessage();
Logging.connectors.warn(message,e);
throw new ServiceInterruption(message,
e,
currentTime + interruptionRetryTime,
-1L,
3,
true);
}
protected static void handleIOException(IOException e, String context)
throws ManifoldCFException, ServiceInterruption
{
if ((e instanceof InterruptedIOException) && (!(e instanceof java.net.SocketTimeoutException)))
throw new ManifoldCFException(e.getMessage(), ManifoldCFException.INTERRUPTED);
long currentTime = System.currentTimeMillis();
if (e instanceof java.net.ConnectException)
{
// Server isn't up at all. Try for a brief time then give up.
String message = "Server could not be contacted during "+context+": "+e.getMessage();
Logging.connectors.warn(message,e);
throw new ServiceInterruption(message,
e,
currentTime + interruptionRetryTime,
-1L,
3,
true);
}
if (e instanceof java.net.SocketTimeoutException)
{
String message2 = "Socket timeout exception during "+context+": "+e.getMessage();
Logging.connectors.warn(message2,e);
throw new ServiceInterruption(message2,
e,
currentTime + interruptionRetryTime,
currentTime + 20L * 60000L,
-1,
false);
}
if (e.getClass().getName().equals("java.net.SocketException"))
{
// In the past we would have treated this as a straight document rejection, and
// treated it in the same manner as a 400. The reasoning is that the server can
// perfectly legally send out a 400 and drop the connection immediately thereafter,
// this a race condition.
// However, Solr 4.0 (or the Jetty version that the example runs on) seems
// to have a bug where it drops the connection when two simultaneous documents come in
// at the same time. This is the final version of Solr 4.0 so we need to deal with
// this.
if (e.getMessage().toLowerCase(Locale.ROOT).indexOf("broken pipe") != -1 ||
e.getMessage().toLowerCase(Locale.ROOT).indexOf("connection reset") != -1 ||
e.getMessage().toLowerCase(Locale.ROOT).indexOf("target server failed to respond") != -1)
{
// Treat it as a service interruption, but with a limited number of retries.
// In that way we won't burden the user with a huge retry interval; it should
// give up fairly quickly, and yet NOT give up if the error was merely transient
String message = "Server dropped connection during "+context+": "+e.getMessage();
Logging.connectors.warn(message,e);
throw new ServiceInterruption(message,
e,
currentTime + interruptionRetryTime,
-1L,
3,
false);
}
// Other socket exceptions are service interruptions - but if we keep getting them, it means
// that a socket timeout is probably set too low to accept this particular document. So
// we retry for a while, then skip the document.
String message2 = "Socket exception during "+context+": "+e.getMessage();
Logging.connectors.warn(message2,e);
throw new ServiceInterruption(message2,
e,
currentTime + interruptionRetryTime,
currentTime + 20L * 60000L,
-1,
false);
}
// Otherwise, no idea what the trouble is, so presume that retries might fix it.
String message3 = "IO exception during "+context+": "+e.getMessage();
Logging.connectors.warn(message3,e);
throw new ServiceInterruption(message3,
e,
currentTime + interruptionRetryTime,
currentTime + 2L * 60L * 60000L,
-1,
true);
}
@Override
public void outputConfigurationHeader(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters,
List<String> tabsArray) throws ManifoldCFException, IOException {
ConfigurationHandler.outputConfigurationHeader(threadContext, out, locale,
parameters, tabsArray);
}
@Override
public void outputConfigurationBody(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName)
throws ManifoldCFException, IOException {
ConfigurationHandler.outputConfigurationBody(threadContext, out, locale,
parameters, tabName);
}
@Override
public String processConfigurationPost(IThreadContext threadContext,
IPostParameters variableContext, Locale locale, ConfigParams parameters)
throws ManifoldCFException {
return ConfigurationHandler.processConfigurationPost(threadContext,
variableContext, locale, parameters);
}
@Override
public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out,
Locale locale, ConfigParams parameters) throws ManifoldCFException,
IOException {
ConfigurationHandler.viewConfiguration(threadContext, out, locale,
parameters);
}
@Override
public void outputSpecificationHeader(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber, List<String> tabsArray)
throws ManifoldCFException, IOException {
ConfigurationHandler.outputSpecificationHeader(out, locale, os, connectionSequenceNumber, tabsArray);
}
@Override
public void outputSpecificationBody(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber, int actualSequenceNumber, String tabName) throws ManifoldCFException, IOException {
ConfigurationHandler.outputSpecificationBody(out, locale, os, connectionSequenceNumber, actualSequenceNumber, tabName);
}
@Override
public String processSpecificationPost(IPostParameters variableContext, Locale locale, Specification os,
int connectionSequenceNumber) throws ManifoldCFException {
return ConfigurationHandler.processSpecificationPost(variableContext, locale, os, connectionSequenceNumber);
}
@Override
public void viewSpecification(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber) throws ManifoldCFException, IOException {
ConfigurationHandler.viewSpecification(out, locale, os, connectionSequenceNumber);
}
}