blob: 6aaea178df3114eb67bd8ef128a8fb1abce858a3 [file] [log] [blame]
/* $Id$ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.amazoncloudsearch;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.io.StringReader;
import java.io.BufferedReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Locale;
import java.util.Set;
import java.util.HashSet;
import java.util.Date;
import org.apache.commons.io.input.ReaderInputStream;
import org.apache.commons.io.FilenameUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
import org.apache.manifoldcf.agents.output.amazoncloudsearch.SDFModel.Document;
import org.apache.manifoldcf.core.interfaces.Specification;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.ConfigurationNode;
import org.apache.manifoldcf.core.interfaces.DBInterfaceFactory;
import org.apache.manifoldcf.core.interfaces.IDBInterface;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.IPasswordMapperActivity;
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
import org.apache.manifoldcf.agents.system.ManifoldCF;
import org.apache.manifoldcf.agents.system.Logging;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.manifoldcf.core.jsongen.*;
public class AmazonCloudSearchConnector extends BaseOutputConnector {
/** Ingestion activity */
public final static String INGEST_ACTIVITY = "document ingest";
/** Document removal activity */
public final static String REMOVE_ACTIVITY = "document deletion";
/** Forward to the javascript to check the configuration parameters */
private static final String EDIT_CONFIGURATION_JS = "editConfiguration.js";
/** Forward to the HTML template to edit the configuration parameters */
private static final String EDIT_CONFIGURATION_HTML = "editConfiguration.html";
/** Forward to the HTML template to view the configuration parameters */
private static final String VIEW_CONFIGURATION_HTML = "viewConfiguration.html";
/** Forward to the javascript to check the specification parameters for the job */
private static final String EDIT_SPECIFICATION_JS = "editSpecification.js";
private static final String EDIT_SPECIFICATION_CONTENTS_HTML = "editSpecification_Contents.html";
private static final String VIEW_SPECIFICATION_HTML = "viewSpecification.html";
/** Local connection */
protected HttpPost poster = null;
// What we need for database keys
protected String serverHost = null;
protected String serverPath = null;
/** Document Chunk Manager */
private DocumentChunkManager documentChunkManager = null;
/** cloudsearch field name for file body text. */
private static final String FILE_BODY_TEXT_FIELDNAME = "f_bodytext";
/** Constructor.
*/
public AmazonCloudSearchConnector(){
}
/** Clear out any state information specific to a given thread.
* This method is called when this object is returned to the connection pool.
*/
@Override
public void clearThreadContext()
{
super.clearThreadContext();
documentChunkManager = null;
}
@Override
public void install(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(mainDatabase);
dcmanager.install();
}
@Override
public void deinstall(IThreadContext threadContext)
throws ManifoldCFException
{
IDBInterface mainDatabase = DBInterfaceFactory.make(threadContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
DocumentChunkManager dcmanager = new DocumentChunkManager(mainDatabase);
dcmanager.deinstall();
}
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
@Override
public String[] getActivitiesList()
{
return new String[]{INGEST_ACTIVITY,REMOVE_ACTIVITY};
}
/** Connect.
*@param configParameters is the set of configuration parameters, which
* in this case describe the target appliance, basic auth configuration, etc. (This formerly came
* out of the ini file.)
*/
@Override
public void connect(ConfigParams configParameters)
{
super.connect(configParameters);
}
/** This method is called to assess whether to count this connector instance should
* actually be counted as being connected.
*@return true if the connector instance is actually connected.
*/
@Override
public boolean isConnected()
{
return poster != null;
}
/** Close the connection. Call this before discarding the connection.
*/
@Override
public void disconnect()
throws ManifoldCFException
{
serverHost = null;
serverPath = null;
poster = null;
super.disconnect();
}
/** Set up a session */
protected void getSession()
throws ManifoldCFException
{
if (documentChunkManager == null)
{
IDBInterface databaseHandle = DBInterfaceFactory.make(currentContext,
ManifoldCF.getMasterDatabaseName(),
ManifoldCF.getMasterDatabaseUsername(),
ManifoldCF.getMasterDatabasePassword());
documentChunkManager = new DocumentChunkManager(databaseHandle);
}
serverHost = params.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
if (serverHost == null)
throw new ManifoldCFException("Server host parameter required");
serverPath = params.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
if (serverPath == null)
throw new ManifoldCFException("Server path parameter required");
String proxyProtocol = params.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
String proxyHost = params.getParameter(AmazonCloudSearchConfig.PROXY_HOST);
String proxyPort = params.getParameter(AmazonCloudSearchConfig.PROXY_PORT);
// Https is OK here without a custom trust store because we know we are talking to an Amazon instance, which has certs that
// are presumably non-custom.
String urlStr = "https://" + serverHost + serverPath;
poster = new HttpPost(urlStr);
//set proxy
if(proxyHost != null && proxyHost.length() > 0)
{
try
{
HttpHost proxy = new HttpHost(proxyHost, Integer.parseInt(proxyPort), proxyProtocol);
RequestConfig config = RequestConfig.custom().setProxy(proxy).build();
poster.setConfig(config);
}
catch (NumberFormatException e)
{
throw new ManifoldCFException("Number format exception: "+e.getMessage(),e);
}
}
poster.addHeader("Content-Type", "application/json");
}
/** Test the connection. Returns a string describing the connection integrity.
*@return the connection's status as a displayable string.
*/
@Override
public String check() throws ManifoldCFException {
try {
getSession();
String responsbody = postData(new ReaderInputStream(new StringReader("[]"),Consts.UTF_8));
String status = "";
try
{
status = getStatusFromJsonResponse(responsbody);
} catch (ManifoldCFException e)
{
Logging.ingest.debug(e);
return "Could not get status from response body. Check Access Policy setting of your domain of Amazon CloudSearch.: " + e.getMessage();
}
// check status message
String message = "";
if ("error".equals(status)) {
JsonParser parser = new JsonFactory().createJsonParser(responsbody);
while (parser.nextToken() != JsonToken.END_OBJECT) {
String name = parser.getCurrentName();
if ("errors".equalsIgnoreCase(name)) {
message = parseMessage(parser);
break;
}
}
}
if ("error".equalsIgnoreCase(status)
&& "batch must contain at least one operation".equals(message)) {
return "Connection working.";
}
return "Connection NOT working.";
} catch (ClientProtocolException e) {
Logging.ingest.debug(e);
return "Protocol exception: "+e.getMessage();
} catch (IOException e) {
Logging.ingest.debug(e);
return "IO exception: "+e.getMessage();
} catch (ServiceInterruption e) {
Logging.ingest.debug(e);
return "Transient exception: "+e.getMessage();
}
}
private String getStatusFromJsonResponse(String responsbody) throws ManifoldCFException {
try {
JsonParser parser = new JsonFactory().createJsonParser(responsbody);
while (parser.nextToken() != JsonToken.END_OBJECT)
{
String name = parser.getCurrentName();
if("status".equalsIgnoreCase(name)){
parser.nextToken();
return parser.getText();
}
}
} catch (JsonParseException e) {
throw new ManifoldCFException(e);
} catch (IOException e) {
throw new ManifoldCFException(e);
}
return null;
}
private String parseMessage(JsonParser parser) throws JsonParseException, IOException {
while(parser.nextToken() != JsonToken.END_ARRAY){
String name = parser.getCurrentName();
if("message".equalsIgnoreCase(name)){
parser.nextToken();
return parser.getText();
}
}
return null;
}
/** Get an output version string, given an output specification. The output version string is used to uniquely describe the pertinent details of
* the output specification and the configuration, to allow the Connector Framework to determine whether a document will need to be output again.
* Note that the contents of the document cannot be considered by this method, and that a different version string (defined in IRepositoryConnector)
* is used to describe the version of the actual document.
*
* This method presumes that the connector object has been configured, and it is thus able to communicate with the output data store should that be
* necessary.
*@param os is the current output specification for the job that is doing the crawling.
*@return a string, of unlimited length, which uniquely describes output configuration and specification in such a way that if two such strings are equal,
* the document will not need to be sent again to the output data store.
*/
@Override
public String getPipelineDescription(Specification os)
throws ManifoldCFException, ServiceInterruption
{
SpecPacker sp = new SpecPacker(os);
return sp.toPackedString();
}
/** Detect if a mime type is indexable or not. This method is used by participating repository connectors to pre-filter the number of
* unusable documents that will be passed to this output connector.
*@param outputDescription is the document's output version.
*@param mimeType is the mime type of the document.
*@return true if the mime type is indexable by this connector.
*/
public boolean checkMimeTypeIndexable(String outputDescription, String mimeType)
throws ManifoldCFException, ServiceInterruption
{
SpecPacker sp = new SpecPacker(outputDescription);
if (sp.checkMimeType(mimeType))
return super.checkMimeTypeIndexable(outputDescription, mimeType);
else
return false;
}
@Override
public boolean checkLengthIndexable(String outputDescription, long length)
throws ManifoldCFException, ServiceInterruption {
SpecPacker sp = new SpecPacker(outputDescription);
if (sp.checkLengthIndexable(length))
return super.checkLengthIndexable(outputDescription, length);
else
return false;
}
@Override
public boolean checkURLIndexable(String outputDescription, String url)
throws ManifoldCFException, ServiceInterruption {
SpecPacker sp = new SpecPacker(outputDescription);
if (sp.checkURLIndexable(url))
return super.checkURLIndexable(outputDescription, url);
else
return false;
}
/** Add (or replace) a document in the output data store using the connector.
* This method presumes that the connector object has been configured, and it is thus able to communicate with the output data store should that be
* necessary.
* The OutputSpecification is *not* provided to this method, because the goal is consistency, and if output is done it must be consistent with the
* output description, since that was what was partly used to determine if output should be taking place. So it may be necessary for this method to decode
* an output description string in order to determine what should be done.
*@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process
* and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors.
*@param outputDescription is the description string that was constructed for this document by the getOutputDescription() method.
*@param document is the document data to be processed (handed to the output data store).
*@param authorityNameString is the name of the authority responsible for authorizing any access tokens passed in with the repository document. May be null.
*@param activities is the handle to an object that the implementer of an output connector may use to perform operations, such as logging processing activity.
*@return the document status (accepted or permanently rejected).
*/
@Override
public int addOrReplaceDocumentWithException(String documentURI, String outputDescription, RepositoryDocument document, String authorityNameString, IOutputAddActivity activities)
throws ManifoldCFException, ServiceInterruption, IOException
{
// Establish a session
getSession();
SpecPacker sp = new SpecPacker(outputDescription);
String uid = ManifoldCF.hash(documentURI);
// Build a JSON generator
JSONObjectReader objectReader = new JSONObjectReader();
// Build the metadata field part
JSONObjectReader fieldReader = new JSONObjectReader();
// Add the type and ID
objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader("id"),new JSONStringReader(uid)))
.addNameValuePair(new JSONNameValueReader(new JSONStringReader("type"),new JSONStringReader("add")))
.addNameValuePair(new JSONNameValueReader(new JSONStringReader("fields"),fieldReader));
// Populate the fields...
Iterator<String> itr = document.getFields();
while (itr.hasNext())
{
String fieldName = itr.next();
Object[] fieldValues = document.getField(fieldName);
JSONReader[] elements = new JSONReader[fieldValues.length];
if (fieldValues instanceof Reader[])
{
for (int i = 0; i < elements.length; i++)
{
elements[i] = new JSONStringReader((Reader)fieldValues[i]);
}
}
else if (fieldValues instanceof Date[])
{
for (int i = 0; i < elements.length; i++)
{
elements[i] = new JSONStringReader(((Date)fieldValues[i]).toString());
}
}
else if (fieldValues instanceof String[])
{
for (int i = 0; i < elements.length; i++)
{
elements[i] = new JSONStringReader((String)fieldValues[i]);
}
}
else
throw new IllegalStateException("Unexpected metadata type: "+fieldValues.getClass().getName());
fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(fieldName),new JSONArrayReader(elements)));
}
// Add the primary content data in.
fieldReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader(FILE_BODY_TEXT_FIELDNAME),
new JSONStringReader(new InputStreamReader(document.getBinaryStream(),Consts.UTF_8))));
documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
conditionallyFlushDocuments();
return DOCUMENTSTATUS_ACCEPTED;
}
/** Remove a document using the connector.
* Note that the last outputDescription is included, since it may be necessary for the connector to use such information to know how to properly remove the document.
*@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process
* and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors.
*@param outputDescription is the last description string that was constructed for this document by the getOutputDescription() method above.
*@param activities is the handle to an object that the implementer of an output connector may use to perform operations, such as logging processing activity.
*/
@Override
public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
// Establish a session
getSession();
String uid = ManifoldCF.hash(documentURI);
// Build a JSON generator
JSONObjectReader objectReader = new JSONObjectReader();
// Add the type and ID
objectReader.addNameValuePair(new JSONNameValueReader(new JSONStringReader("id"),new JSONStringReader(uid)))
.addNameValuePair(new JSONNameValueReader(new JSONStringReader("type"),new JSONStringReader("delete")));
try
{
documentChunkManager.recordDocument(uid, serverHost, serverPath, new ReaderInputStream(objectReader, Consts.UTF_8));
}
catch (IOException e)
{
handleIOException(e);
}
conditionallyFlushDocuments();
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
getSession();
flushDocuments();
}
protected static final int CHUNK_SIZE = 1000;
protected void conditionallyFlushDocuments()
throws ManifoldCFException, ServiceInterruption
{
if (documentChunkManager.equalOrMoreThan(serverHost, serverPath, CHUNK_SIZE))
flushDocuments();
}
protected void flushDocuments()
throws ManifoldCFException, ServiceInterruption
{
Logging.ingest.info("AmazonCloudSearch: Starting flush to Amazon");
// Repeat until we are empty of cached stuff
int chunkNumber = 0;
while (true)
{
DocumentRecord[] records = documentChunkManager.readChunk(serverHost, serverPath, CHUNK_SIZE);
try
{
if (records.length == 0)
break;
// The records consist of up to 1000 individual input streams, which must be all concatenated together into the post
// To do that, we go into and out of Reader space once again...
JSONArrayReader arrayReader = new JSONArrayReader();
for (DocumentRecord dr : records)
{
arrayReader.addArrayElement(new JSONValueReader(new InputStreamReader(dr.getDataStream(),Consts.UTF_8)));
}
//post data..
String responsbody = postData(new ReaderInputStream(arrayReader,Consts.UTF_8));
// check status
String status = getStatusFromJsonResponse(responsbody);
if("success".equals(status))
{
Logging.ingest.info("AmazonCloudSearch: Successfully sent document chunk " + chunkNumber);
//remove documents from table..
documentChunkManager.deleteChunk(records);
}
else
{
Logging.ingest.error("AmazonCloudSearch: Error sending document chunk "+ chunkNumber+": "+ responsbody);
throw new ManifoldCFException("recieved error status from service after feeding document. response body : " + responsbody);
}
}
finally
{
Throwable exception = null;
for (DocumentRecord dr : records)
{
try
{
dr.close();
}
catch (Throwable e)
{
exception = e;
}
}
if (exception != null)
{
if (exception instanceof ManifoldCFException)
throw (ManifoldCFException)exception;
else if (exception instanceof Error)
throw (Error)exception;
else if (exception instanceof RuntimeException)
throw (RuntimeException)exception;
else
throw new RuntimeException("Unknown exception class thrown: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
}
}
}
}
/**
* Fill in a Server tab configuration parameter map for calling a Velocity
* template.
*
* @param newMap is the map to fill in
* @param parameters is the current set of configuration parameters
*/
private static void fillInServerConfigurationMap(Map<String, Object> newMap, IPasswordMapperActivity mapper, ConfigParams parameters) {
String serverhost = parameters.getParameter(AmazonCloudSearchConfig.SERVER_HOST);
String serverpath = parameters.getParameter(AmazonCloudSearchConfig.SERVER_PATH);
String proxyprotocol = parameters.getParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL);
String proxyhost = parameters.getParameter(AmazonCloudSearchConfig.PROXY_HOST);
String proxyport = parameters.getParameter(AmazonCloudSearchConfig.PROXY_PORT);
if (serverhost == null)
serverhost = AmazonCloudSearchConfig.SERVER_HOST_DEFAULT;
if (serverpath == null)
serverpath = AmazonCloudSearchConfig.SERVER_PATH_DEFAULT;
if (proxyprotocol == null)
proxyprotocol = AmazonCloudSearchConfig.PROXY_PROTOCOL_DEFAULT;
if (proxyhost == null)
proxyhost = AmazonCloudSearchConfig.PROXY_HOST_DEFAULT;
if (proxyport == null)
proxyport = AmazonCloudSearchConfig.PROXY_PORT_DEFAULT;
newMap.put("SERVERHOST", serverhost);
newMap.put("SERVERPATH", serverpath);
newMap.put("PROXYPROTOCOL", proxyprotocol);
newMap.put("PROXYHOST", proxyhost);
newMap.put("PROXYPORT", proxyport);
}
/**
* View configuration. This method is called in the body section of the
* connector's view configuration page. Its purpose is to present the
* connection information to the user. The coder can presume that the HTML
* that is output from this configuration will be within appropriate <html>
* and <body> tags.
*
* @param threadContext is the local thread context.
* @param out is the output to which any HTML should be sent.
* @param parameters are the configuration parameters, as they currently
* exist, for this connection being configured.
*/
@Override
public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out,
Locale locale, ConfigParams parameters) throws ManifoldCFException, IOException {
Map<String, Object> paramMap = new HashMap<String, Object>();
// Fill in map from each tab
fillInServerConfigurationMap(paramMap, out, parameters);
Messages.outputResourceWithVelocity(out,locale,VIEW_CONFIGURATION_HTML,paramMap);
}
/**
*
* Output the configuration header section. This method is called in the
* head section of the connector's configuration page. Its purpose is to add
* the required tabs to the list, and to output any javascript methods that
* might be needed by the configuration editing HTML.
*
* @param threadContext is the local thread context.
* @param out is the output to which any HTML should be sent.
* @param parameters are the configuration parameters, as they currently
* exist, for this connection being configured.
* @param tabsArray is an array of tab names. Add to this array any tab
* names that are specific to the connector.
*/
@Override
public void outputConfigurationHeader(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters, List<String> tabsArray)
throws ManifoldCFException, IOException {
// Add the Server tab
tabsArray.add(Messages.getString(locale, "AmazonCloudSearchOutputConnector.ServerTabName"));
// Map the parameters
Map<String, Object> paramMap = new HashMap<String, Object>();
// Fill in the parameters from each tab
fillInServerConfigurationMap(paramMap, out, parameters);
// Output the Javascript - only one Velocity template for all tabs
Messages.outputResourceWithVelocity(out,locale,EDIT_CONFIGURATION_JS,paramMap);
}
@Override
public void outputConfigurationBody(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName)
throws ManifoldCFException, IOException {
// Call the Velocity templates for each tab
Map<String, Object> paramMap = new HashMap<String, Object>();
// Set the tab name
paramMap.put("TABNAME", tabName);
// Fill in the parameters
fillInServerConfigurationMap(paramMap, out, parameters);
// Server tab
Messages.outputResourceWithVelocity(out,locale,EDIT_CONFIGURATION_HTML,paramMap);
}
/**
* Process a configuration post. This method is called at the start of the
* connector's configuration page, whenever there is a possibility that form
* data for a connection has been posted. Its purpose is to gather form
* information and modify the configuration parameters accordingly. The name
* of the posted form is "editconnection".
*
* @param threadContext is the local thread context.
* @param variableContext is the set of variables available from the post,
* including binary file post information.
* @param parameters are the configuration parameters, as they currently
* exist, for this connection being configured.
* @return null if all is well, or a string error message if there is an
* error that should prevent saving of the connection (and cause a
* redirection to an error page).
*
*/
@Override
public String processConfigurationPost(IThreadContext threadContext,
IPostParameters variableContext, ConfigParams parameters)
throws ManifoldCFException {
// Server tab parameters
String serverhost = variableContext.getParameter("serverhost");
if (serverhost != null)
parameters.setParameter(AmazonCloudSearchConfig.SERVER_HOST, serverhost);
String serverpath = variableContext.getParameter("serverpath");
if (serverpath != null)
parameters.setParameter(AmazonCloudSearchConfig.SERVER_PATH, serverpath);
String proxyprotocol = variableContext.getParameter("proxyprotocol");
if (proxyprotocol != null)
parameters.setParameter(AmazonCloudSearchConfig.PROXY_PROTOCOL, proxyprotocol);
String proxyhost = variableContext.getParameter("proxyhost");
if (proxyhost != null)
parameters.setParameter(AmazonCloudSearchConfig.PROXY_HOST, proxyhost);
String proxyport = variableContext.getParameter("proxyport");
if (proxyport != null)
parameters.setParameter(AmazonCloudSearchConfig.PROXY_PORT, proxyport);
return null;
}
private String postData(InputStream jsonData) throws ServiceInterruption, ManifoldCFException {
CloseableHttpClient httpclient = HttpClients.createDefault();
try {
poster.setEntity(new InputStreamEntity(jsonData));
HttpResponse res = httpclient.execute(poster);
HttpEntity resEntity = res.getEntity();
return EntityUtils.toString(resEntity);
} catch (ClientProtocolException e) {
throw new ManifoldCFException(e);
} catch (IOException e) {
handleIOException(e);
} finally {
try {
httpclient.close();
} catch (IOException e) {
//do nothing
}
}
return null;
}
private static void handleIOException(IOException e)
throws ManifoldCFException, ServiceInterruption {
if (!(e instanceof java.net.SocketTimeoutException)
&& (e instanceof InterruptedIOException)) {
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
ManifoldCFException.INTERRUPTED);
}
Logging.ingest.warn(
"Amazon CloudSearch: IO exception: " + e.getMessage(), e);
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO exception: " + e.getMessage(), e,
currentTime + 300000L, currentTime + 3 * 60 * 60000L, -1, false);
}
protected static void fillInContentsSpecificationMap(Map<String,Object> paramMap, Specification os)
{
String maxFileSize = AmazonCloudSearchConfig.MAXLENGTH_DEFAULT;
String allowedMimeTypes = AmazonCloudSearchConfig.MIMETYPES_DEFAULT;
String allowedFileExtensions = AmazonCloudSearchConfig.EXTENSIONS_DEFAULT;
for (int i = 0; i < os.getChildCount(); i++)
{
SpecificationNode sn = os.getChild(i);
if (sn.getType().equals(AmazonCloudSearchConfig.NODE_MAXLENGTH))
maxFileSize = sn.getAttributeValue(AmazonCloudSearchConfig.ATTRIBUTE_VALUE);
else if (sn.getType().equals(AmazonCloudSearchConfig.NODE_MIMETYPES))
allowedMimeTypes = sn.getValue();
else if (sn.getType().equals(AmazonCloudSearchConfig.NODE_EXTENSIONS))
allowedFileExtensions = sn.getValue();
}
paramMap.put("MAXFILESIZE",maxFileSize);
paramMap.put("MIMETYPES",allowedMimeTypes);
paramMap.put("EXTENSIONS",allowedFileExtensions);
}
/** Obtain the name of the form check javascript method to call.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@return the name of the form check javascript method.
*/
@Override
public String getFormCheckJavascriptMethodName(int connectionSequenceNumber)
{
return "s"+connectionSequenceNumber+"_checkSpecification";
}
/** Obtain the name of the form presave check javascript method to call.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@return the name of the form presave check javascript method.
*/
@Override
public String getFormPresaveCheckJavascriptMethodName(int connectionSequenceNumber)
{
return "s"+connectionSequenceNumber+"_checkSpecificationForSave";
}
/** Output the specification header section.
* This method is called in the head section of a job page which has selected a pipeline connection of the current type. Its purpose is to add the required tabs
* to the list, and to output any javascript methods that might be needed by the job editing HTML.
*@param out is the output to which any HTML should be sent.
*@param locale is the preferred local of the output.
*@param os is the current pipeline specification for this connection.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@param tabsArray is an array of tab names. Add to this array any tab names that are specific to the connector.
*/
@Override
public void outputSpecificationHeader(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber, List<String> tabsArray)
throws ManifoldCFException, IOException
{
Map<String, Object> paramMap = new HashMap<String, Object>();
paramMap.put("SEQNUM",Integer.toString(connectionSequenceNumber));
tabsArray.add(Messages.getString(locale, "AmazonCloudSearchOutputConnector.ContentsTabName"));
// Fill in the specification header map, using data from all tabs.
fillInContentsSpecificationMap(paramMap, os);
Messages.outputResourceWithVelocity(out,locale,EDIT_SPECIFICATION_JS,paramMap);
}
/** Output the specification body section.
* This method is called in the body section of a job page which has selected a pipeline connection of the current type. Its purpose is to present the required form elements for editing.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html>, <body>, and <form> tags. The name of the
* form is "editjob".
*@param out is the output to which any HTML should be sent.
*@param locale is the preferred local of the output.
*@param os is the current pipeline specification for this job.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@param actualSequenceNumber is the connection within the job that has currently been selected.
*@param tabName is the current tab name.
*/
@Override
public void outputSpecificationBody(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber, int actualSequenceNumber, String tabName)
throws ManifoldCFException, IOException
{
Map<String, Object> paramMap = new HashMap<String, Object>();
// Set the tab name
paramMap.put("TABNAME", tabName);
paramMap.put("SEQNUM",Integer.toString(connectionSequenceNumber));
paramMap.put("SELECTEDNUM",Integer.toString(actualSequenceNumber));
// Fill in the field mapping tab data
fillInContentsSpecificationMap(paramMap, os);
Messages.outputResourceWithVelocity(out,locale,EDIT_SPECIFICATION_CONTENTS_HTML,paramMap);
}
/** Process a specification post.
* This method is called at the start of job's edit or view page, whenever there is a possibility that form data for a connection has been
* posted. Its purpose is to gather form information and modify the transformation specification accordingly.
* The name of the posted form is "editjob".
*@param variableContext contains the post data, including binary file-upload information.
*@param locale is the preferred local of the output.
*@param os is the current pipeline specification for this job.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@return null if all is well, or a string error message if there is an error that should prevent saving of the job (and cause a redirection to an error page).
*/
@Override
public String processSpecificationPost(IPostParameters variableContext, Locale locale, Specification os,
int connectionSequenceNumber)
throws ManifoldCFException {
String seqPrefix = "s"+connectionSequenceNumber+"_";
String x;
x = variableContext.getParameter(seqPrefix+"maxfilesize");
if (x != null)
{
int i = 0;
while (i < os.getChildCount())
{
SpecificationNode node = os.getChild(i);
if (node.getType().equals(AmazonCloudSearchConfig.NODE_MAXLENGTH))
os.removeChild(i);
else
i++;
}
SpecificationNode sn = new SpecificationNode(AmazonCloudSearchConfig.NODE_MAXLENGTH);
sn.setAttribute(AmazonCloudSearchConfig.ATTRIBUTE_VALUE,x);
os.addChild(os.getChildCount(),sn);
}
x = variableContext.getParameter(seqPrefix+"mimetypes");
if (x != null)
{
int i = 0;
while (i < os.getChildCount())
{
SpecificationNode node = os.getChild(i);
if (node.getType().equals(AmazonCloudSearchConfig.NODE_MIMETYPES))
os.removeChild(i);
else
i++;
}
SpecificationNode sn = new SpecificationNode(AmazonCloudSearchConfig.NODE_MIMETYPES);
sn.setValue(x);
os.addChild(os.getChildCount(),sn);
}
x = variableContext.getParameter(seqPrefix+"extensions");
if (x != null)
{
int i = 0;
while (i < os.getChildCount())
{
SpecificationNode node = os.getChild(i);
if (node.getType().equals(AmazonCloudSearchConfig.NODE_EXTENSIONS))
os.removeChild(i);
else
i++;
}
SpecificationNode sn = new SpecificationNode(AmazonCloudSearchConfig.NODE_EXTENSIONS);
sn.setValue(x);
os.addChild(os.getChildCount(),sn);
}
return null;
}
/** View specification.
* This method is called in the body section of a job's view page. Its purpose is to present the pipeline specification information to the user.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html> and <body> tags.
*@param out is the output to which any HTML should be sent.
*@param locale is the preferred local of the output.
*@param connectionSequenceNumber is the unique number of this connection within the job.
*@param os is the current pipeline specification for this job.
*/
@Override
public void viewSpecification(IHTTPOutput out, Locale locale, Specification os,
int connectionSequenceNumber)
throws ManifoldCFException, IOException
{
Map<String, Object> paramMap = new HashMap<String, Object>();
paramMap.put("SEQNUM",Integer.toString(connectionSequenceNumber));
// Fill in the map with data from all tabs
fillInContentsSpecificationMap(paramMap, os);
Messages.outputResourceWithVelocity(out,locale,VIEW_SPECIFICATION_HTML,paramMap);
}
protected static void fillSet(Set<String> set, String input) {
try
{
StringReader sr = new StringReader(input);
BufferedReader br = new BufferedReader(sr);
String line = null;
while ((line = br.readLine()) != null)
{
line = line.trim();
if (line.length() > 0)
set.add(line);
}
}
catch (IOException e)
{
// Should never happen
throw new RuntimeException("IO exception reading strings: "+e.getMessage(),e);
}
}
protected static class SpecPacker {
private final Set<String> extensions = new HashSet<String>();
private final Set<String> mimeTypes = new HashSet<String>();
private final Long lengthCutoff;
public SpecPacker(Specification os) {
Long lengthCutoff = null;
String extensions = null;
String mimeTypes = null;
for (int i = 0; i < os.getChildCount(); i++) {
SpecificationNode sn = os.getChild(i);
if (sn.getType().equals(AmazonCloudSearchConfig.NODE_MIMETYPES)) {
mimeTypes = sn.getValue();
} else if (sn.getType().equals(AmazonCloudSearchConfig.NODE_EXTENSIONS)) {
extensions = sn.getValue();
} else if (sn.getType().equals(AmazonCloudSearchConfig.NODE_MAXLENGTH)) {
String value = sn.getAttributeValue(AmazonCloudSearchConfig.ATTRIBUTE_VALUE);
lengthCutoff = new Long(value);
}
}
this.lengthCutoff = lengthCutoff;
fillSet(this.extensions, extensions);
fillSet(this.mimeTypes, mimeTypes);
}
public SpecPacker(String packedString) {
int index = 0;
// Max length
final StringBuilder sb = new StringBuilder();
if (packedString.length() > index) {
if (packedString.charAt(index++) == '+') {
index = unpack(sb,packedString,index,'+');
this.lengthCutoff = new Long(sb.toString());
} else
this.lengthCutoff = null;
} else
this.lengthCutoff = null;
// Mime types
final List<String> mimeBuffer = new ArrayList<String>();
index = unpackList(mimeBuffer,packedString,index,'+');
for (String mimeType : mimeBuffer) {
this.mimeTypes.add(mimeType);
}
// Extensions
final List<String> extensionsBuffer = new ArrayList<String>();
index = unpackList(extensionsBuffer,packedString,index,'+');
for (String extension : extensionsBuffer) {
this.extensions.add(extension);
}
}
public String toPackedString() {
StringBuilder sb = new StringBuilder();
int i;
// Max length
if (lengthCutoff == null)
sb.append('-');
else {
sb.append('+');
pack(sb,lengthCutoff.toString(),'+');
}
// Mime types
String[] mimeTypes = new String[this.mimeTypes.size()];
i = 0;
for (String mimeType : this.mimeTypes) {
mimeTypes[i++] = mimeType;
}
java.util.Arrays.sort(mimeTypes);
packList(sb,mimeTypes,'+');
// Extensions
String[] extensions = new String[this.extensions.size()];
i = 0;
for (String extension : this.extensions) {
extensions[i++] = extension;
}
java.util.Arrays.sort(extensions);
packList(sb,extensions,'+');
return sb.toString();
}
public boolean checkLengthIndexable(long length) {
if (lengthCutoff == null)
return true;
return (length <= lengthCutoff.longValue());
}
public boolean checkMimeType(String mimeType) {
if (mimeType == null)
mimeType = "application/unknown";
return mimeTypes.contains(mimeType);
}
public boolean checkURLIndexable(String url) {
String extension = FilenameUtils.getExtension(url);
if (extension == null || extension.length() == 0)
extension = ".";
return extensions.contains(extension);
}
}
}