blob: 20b0880ff1321aa533be9b2f8c6214db788e1550 [file] [log] [blame]
/* $Id: FileConnector.java 995085 2010-09-08 15:13:38Z kwright $ */
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.crawler.connectors.hdfs;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.crawler.connectors.hdfs.HDFSSession;
import org.apache.manifoldcf.crawler.connectors.hdfs.Messages;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
import org.apache.manifoldcf.core.common.XThreadInputStream;
import org.apache.manifoldcf.core.common.XThreadStringBuffer;
import org.apache.manifoldcf.core.extmimemap.ExtensionMimeMap;
import java.util.*;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
/** This is the "repository connector" for a file system. It's a relative of the share crawler, and should have
* comparable basic functionality, with the exception of the ability to use ActiveDirectory and look at other shares.
*/
public class HDFSRepositoryConnector extends org.apache.manifoldcf.crawler.connectors.BaseRepositoryConnector
{
public static final String _rcsid = "@(#)$Id: FileConnector.java 995085 2010-09-08 15:13:38Z kwright $";
// Activities that we know about
protected final static String ACTIVITY_READ = "read document";
// Relationships we know about
protected static final String RELATIONSHIP_CHILD = "child";
// Activities list
protected static final String[] activitiesList = new String[]{ACTIVITY_READ};
protected String nameNodeProtocol = null;
protected String nameNodeHost = null;
protected String nameNodePort = null;
protected String user = null;
protected HDFSSession session = null;
protected long lastSessionFetch = -1L;
protected static final long timeToRelease = 300000L;
/*
* Constructor.
*/
public HDFSRepositoryConnector()
{
}
/** Tell the world what model this connector uses for getDocumentIdentifiers().
* This must return a model value as specified above.
*@return the model type value.
*/
@Override
public int getConnectorModel()
{
return MODEL_CHAINED_ADD_CHANGE;
}
/** Return the list of relationship types that this connector recognizes.
*@return the list.
*/
@Override
public String[] getRelationshipTypes()
{
return new String[]{RELATIONSHIP_CHILD};
}
/** List the activities we might report on.
*/
@Override
public String[] getActivitiesList()
{
return activitiesList;
}
/** For any given document, list the bins that it is a member of.
*/
@Override
public String[] getBinNames(String documentIdentifier)
{
return new String[]{"HDFS"};
}
/**
* Get the maximum number of documents to amalgamate together into one
* batch, for this connector.
*
* @return the maximum number. 0 indicates "unlimited".
*/
@Override
public int getMaxDocumentRequest() {
return 1;
}
/* (non-Javadoc)
* @see org.apache.manifoldcf.core.connector.BaseConnector#connect(org.apache.manifoldcf.core.interfaces.ConfigParams)
*/
@Override
public void connect(ConfigParams configParams) {
super.connect(configParams);
nameNodeProtocol = configParams.getParameter("namenodeprotocol");
if (nameNodeProtocol == null)
nameNodeProtocol = "hdfs";
nameNodeHost = configParams.getParameter("namenodehost");
nameNodePort = configParams.getParameter("namenodeport");
user = configParams.getParameter("user");
}
/* (non-Javadoc)
* @see org.apache.manifoldcf.core.connector.BaseConnector#disconnect()
*/
@Override
public void disconnect() throws ManifoldCFException {
closeSession();
user = null;
nameNodeProtocol = null;
nameNodeHost = null;
nameNodePort = null;
super.disconnect();
}
/**
* Set up a session
*/
protected HDFSSession getSession() throws ManifoldCFException, ServiceInterruption {
if (session == null) {
if (StringUtils.isEmpty(nameNodeProtocol)) {
throw new ManifoldCFException("Parameter namenodeprotocol required but not set");
}
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: NameNodeProtocol = '" + nameNodeProtocol + "'");
}
if (StringUtils.isEmpty(nameNodeHost)) {
throw new ManifoldCFException("Parameter namenodehost required but not set");
}
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: NameNodeHost = '" + nameNodeHost + "'");
}
if (StringUtils.isEmpty(nameNodePort)) {
throw new ManifoldCFException("Parameter namenodeport required but not set");
}
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: NameNodePort = '" + nameNodePort + "'");
}
if (StringUtils.isEmpty(user)) {
throw new ManifoldCFException("Parameter user required but not set");
}
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: User = '" + user + "'");
}
String nameNode = nameNodeProtocol+"://"+nameNodeHost+":"+nameNodePort;
GetSessionThread t = new GetSessionThread(nameNode,user);
try {
t.start();
t.finishUp();
} catch (InterruptedException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
handleIOException(e);
} catch (InterruptedIOException e) {
t.interrupt();
handleIOException(e);
} catch (URISyntaxException e) {
handleURISyntaxException(e);
} catch (IOException e) {
handleIOException(e);
}
session = t.getResult();
}
lastSessionFetch = System.currentTimeMillis();
return session;
}
/**
* Test the connection. Returns a string describing the connection
* integrity.
*
* @return the connection's status as a displayable string.
*/
@Override
public String check() throws ManifoldCFException {
try {
checkConnection();
return super.check();
} catch (ServiceInterruption e) {
return "Connection temporarily failed: " + e.getMessage();
} catch (ManifoldCFException e) {
return "Connection failed: " + e.getMessage();
}
}
/** This method is called to assess whether to count this connector instance should
* actually be counted as being connected.
*@return true if the connector instance is actually connected.
*/
@Override
public boolean isConnected()
{
return session != null;
}
/**
* @throws ManifoldCFException
*/
@Override
public void poll() throws ManifoldCFException {
if (lastSessionFetch == -1L) {
return;
}
long currentTime = System.currentTimeMillis();
if (currentTime >= lastSessionFetch + timeToRelease) {
closeSession();
}
}
protected void closeSession()
throws ManifoldCFException {
if (session != null) {
try {
// This can in theory throw an IOException, so it is possible it is doing socket
// communication. In practice, it's unlikely that there's any real IO, so I'm
// NOT putting it in a background thread for now.
session.close();
} catch (InterruptedIOException e) {
throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
} catch (IOException e) {
Logging.connectors.warn("HDFS: Error closing connection: "+e.getMessage(),e);
// Eat the exception
} finally {
session = null;
lastSessionFetch = -1L;
}
}
}
/**
* Queue "seed" documents. Seed documents are the starting places for
* crawling activity. Documents are seeded when this method calls
* appropriate methods in the passed in ISeedingActivity object.
*
* This method can choose to find repository changes that happen only during
* the specified time interval. The seeds recorded by this method will be
* viewed by the framework based on what the getConnectorModel() method
* returns.
*
* It is not a big problem if the connector chooses to create more seeds
* than are strictly necessary; it is merely a question of overall work
* required.
*
* The times passed to this method may be interpreted for greatest
* efficiency. The time ranges any given job uses with this connector will
* not overlap, but will proceed starting at 0 and going to the "current
* time", each time the job is run. For continuous crawling jobs, this
* method will be called once, when the job starts, and at various periodic
* intervals as the job executes.
*
* When a job's specification is changed, the framework automatically resets
* the seeding start time to 0. The seeding start time may also be set to 0
* on each job run, depending on the connector model returned by
* getConnectorModel().
*
* Note that it is always ok to send MORE documents rather than less to this
* method.
*
* @param activities is the interface this method should use to perform
* whatever framework actions are desired.
* @param spec is a document specification (that comes from the job).
* @param startTime is the beginning of the time range to consider,
* inclusive.
* @param endTime is the end of the time range to consider, exclusive.
* @param jobMode is an integer describing how the job is being run, whether
* continuous or once-only.
*/
@Override
public void addSeedDocuments(ISeedingActivity activities,
DocumentSpecification spec, long startTime, long endTime, int jobMode)
throws ManifoldCFException, ServiceInterruption {
String path = StringUtils.EMPTY;
int i = 0;
while (i < spec.getChildCount()) {
SpecificationNode sn = spec.getChild(i);
if (sn.getType().equals("startpoint")) {
path = sn.getAttributeValue("path");
FileStatus fileStatus = getObject(new Path(path));
if (fileStatus.isDirectory()) {
activities.addSeedDocument(fileStatus.getPath().toUri().toString());
}
}
i++;
}
}
/** Get document versions given an array of document identifiers.
* This method is called for EVERY document that is considered. It is therefore important to perform
* as little work as possible here.
* The connector will be connected before this method can be called.
*@param documentIdentifiers is the array of local document identifiers, as understood by this connector.
*@param oldVersions is the corresponding array of version strings that have been saved for the document identifiers.
* A null value indicates that this is a first-time fetch, while an empty string indicates that the previous document
* had an empty version string.
*@param activities is the interface this method should use to perform whatever framework actions are desired.
*@param spec is the current document specification for the current job. If there is a dependency on this
* specification, then the version string should include the pertinent data, so that reingestion will occur
* when the specification changes. This is primarily useful for metadata.
*@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
*@param usesDefaultAuthority will be true only if the authority in use for these documents is the default one.
*@return the corresponding version strings, with null in the places where the document no longer exists.
* Empty version strings indicate that there is no versioning ability for the corresponding document, and the document
* will always be processed.
*/
public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
throws ManifoldCFException, ServiceInterruption
{
String[] rval = new String[documentIdentifiers.length];
for (int i = 0; i < rval.length; i++) {
String documentIdentifier = documentIdentifiers[i];
FileStatus fileStatus = getObject(new Path(documentIdentifier));
if (fileStatus != null) {
if (fileStatus.isDirectory()) {
long lastModified = fileStatus.getModificationTime();
rval[i] = new Long(lastModified).toString();
} else {
long fileLength = fileStatus.getLen();
if (activities.checkLengthIndexable(fileLength)) {
long lastModified = fileStatus.getModificationTime();
StringBuilder sb = new StringBuilder();
// Check if the path is to be converted. We record that info in the version string so that we'll reindex documents whose
// URI's change.
String nameNode = nameNodeProtocol + "://" + nameNodeHost + ":" + nameNodePort;
String convertPath = findConvertPath(nameNode, spec, fileStatus.getPath());
if (convertPath != null)
{
// Record the path.
sb.append("+");
pack(sb,convertPath,'+');
}
else
sb.append("-");
sb.append(new Long(lastModified).toString()).append(":").append(new Long(fileLength).toString());
rval[i] = sb.toString();
} else {
rval[i] = null;
}
}
} else {
rval[i] = null;
}
}
return rval;
}
/** Process a set of documents.
* This is the method that should cause each document to be fetched, processed, and the results either added
* to the queue of documents for the current job, and/or entered into the incremental ingestion manager.
* The document specification allows this class to filter what is done based on the job.
*@param documentIdentifiers is the set of document identifiers to process.
*@param activities is the interface this method should use to queue up new document references
* and ingest documents.
*@param spec is the document specification.
*@param scanOnly is an array corresponding to the document identifiers. It is set to true to indicate when the processing
* should only find other references, and should not actually call the ingestion methods.
*/
@Override
public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities, DocumentSpecification spec, boolean[] scanOnly)
throws ManifoldCFException, ServiceInterruption {
for (int i = 0; i < documentIdentifiers.length; i++) {
String version = versions[i];
String documentIdentifier = documentIdentifiers[i];
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("HDFS: Processing document identifier '" + documentIdentifier + "'");
}
FileStatus fileStatus = getObject(new Path(documentIdentifier));
if (fileStatus == null) {
// It is no longer there , so delete right away
activities.deleteDocument(documentIdentifier);
continue;
}
if (fileStatus.isDirectory()) {
/*
* Queue up stuff for directory
*/
String entityReference = documentIdentifier;
FileStatus[] fileStatuses = getChildren(fileStatus.getPath());
if (fileStatuses == null) {
// Directory was deleted, so remove
activities.deleteDocument(documentIdentifier);
continue;
}
for (int j = 0; j < fileStatuses.length; j++) {
FileStatus fs = fileStatuses[j++];
String canonicalPath = fs.getPath().toString();
if (checkInclude(session.getUri().toString(),fs,canonicalPath,spec)) {
activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
}
}
} else {
if (scanOnly[i])
continue;
if (!checkIngest(session.getUri().toString(),fileStatus,spec))
continue;
// Get the WGet conversion path out of the version string
String convertPath = null;
if (version.length() > 0 && version.startsWith("+"))
{
StringBuilder unpack = new StringBuilder();
unpack(unpack, version, 1, '+');
convertPath = unpack.toString();
}
// It is a file to be indexed.
// Prepare the metadata part of RepositoryDocument
RepositoryDocument data = new RepositoryDocument();
data.setFileName(fileStatus.getPath().getName());
data.setMimeType(mapExtensionToMimeType(fileStatus.getPath().getName()));
data.setModifiedDate(new Date(fileStatus.getModificationTime()));
String uri;
if (convertPath != null) {
uri = convertToWGETURI(convertPath);
} else {
uri = fileStatus.getPath().toUri().toString();
}
data.addField("uri",uri);
// We will record document fetch as an activity
long startTime = System.currentTimeMillis();
String errorCode = "FAILED";
String errorDesc = StringUtils.EMPTY;
long fileSize = 0;
try {
BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
try {
t.start();
boolean wasInterrupted = false;
try {
InputStream is = t.getSafeInputStream();
try {
data.setBinary(is, fileSize);
activities.ingestDocumentWithException(documentIdentifier,version,uri,data);
} finally {
is.close();
}
} catch (java.net.SocketTimeoutException e) {
throw e;
} catch (InterruptedIOException e) {
wasInterrupted = true;
throw e;
} catch (ManifoldCFException e) {
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
wasInterrupted = true;
}
throw e;
} finally {
if (!wasInterrupted) {
// This does a join
t.finishUp();
}
}
// No errors. Record the fact that we made it.
errorCode = "OK";
// Length we did in bytes
fileSize = fileStatus.getLen();
} catch (InterruptedException e) {
// We were interrupted out of the join, most likely. Before we abandon the thread,
// send a courtesy interrupt.
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
errorCode = "IO ERROR";
errorDesc = e.getMessage();
handleIOException(e);
} catch (InterruptedIOException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (IOException e) {
errorCode = "IO ERROR";
errorDesc = e.getMessage();
handleIOException(e);
}
} finally {
activities.recordActivity(new Long(startTime),ACTIVITY_READ,new Long(fileSize),documentIdentifier,errorCode,errorDesc,null);
}
}
}
}
// UI support methods.
//
// These support methods come in two varieties. The first bunch is involved in setting up connection configuration information. The second bunch
// is involved in presenting and editing document specification information for a job. The two kinds of methods are accordingly treated differently,
// in that the first bunch cannot assume that the current connector object is connected, while the second bunch can. That is why the first bunch
// receives a thread context argument for all UI methods, while the second bunch does not need one (since it has already been applied via the connect()
// method, above).
/** Output the configuration header section.
* This method is called in the head section of the connector's configuration page. Its purpose is to add the required tabs to the list, and to output any
* javascript methods that might be needed by the configuration editing HTML.
*@param threadContext is the local thread context.
*@param out is the output to which any HTML should be sent.
*@param parameters are the configuration parameters, as they currently exist, for this connection being configured.
*@param tabsArray is an array of tab names. Add to this array any tab names that are specific to the connector.
*/
@Override
public void outputConfigurationHeader(IThreadContext threadContext, IHTTPOutput out, Locale locale, ConfigParams parameters, List<String> tabsArray) throws ManifoldCFException, IOException
{
tabsArray.add(Messages.getString(locale,"HDFSRepositoryConnector.ServerTabName"));
out.print(
"<script type=\"text/javascript\">\n"+
"<!--\n"+
"function checkConfigForSave()\n"+
"{\n"+
" if (editconnection.namenodehost.value == \"\")\n"+
" {\n"+
" alert(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.NameNodeHostCannotBeNull")+"\");\n"+
" SelectTab(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.ServerTabName")+"\");\n"+
" editconnection.namenodehost.focus();\n"+
" return false;\n"+
" }\n"+
" if (editconnection.namenodeport.value == \"\")\n"+
" {\n"+
" alert(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.NameNodePortCannotBeNull")+"\");\n"+
" SelectTab(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.ServerTabName")+"\");\n"+
" editconnection.namenodeport.focus();\n"+
" return false;\n"+
" }\n"+
" if (!isInteger(editconnection.namenodeport.value))\n"+
" {\n"+
" alert(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.NameNodePortMustBeAnInteger")+"\");\n"+
" SelectTab(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.ServerTabName")+"\");\n"+
" editconnection.namenodeport.focus();\n"+
" return false;\n"+
" }\n"+
" if (editconnection.user.value == \"\")\n"+
" {\n"+
" alert(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.UserCannotBeNull")+"\");\n"+
" SelectTab(\""+Messages.getBodyJavascriptString(locale,"HDFSRepositoryConnector.ServerTabName")+"\");\n"+
" editconnection.user.focus();\n"+
" return false;\n"+
" }\n"+
" return true;\n"+
"}\n"+
"\n"+
"//-->\n"+
"</script>\n"
);
}
/** Output the configuration body section.
* This method is called in the body section of the connector's configuration page. Its purpose is to present the required form elements for editing.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html>, <body>, and <form> tags. The name of the
* form is "editconnection".
*@param threadContext is the local thread context.
*@param out is the output to which any HTML should be sent.
*@param parameters are the configuration parameters, as they currently exist, for this connection being configured.
*@param tabName is the current tab name.
*/
public void outputConfigurationBody(IThreadContext threadContext, IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName)
throws ManifoldCFException, IOException
{
String nameNodeProtocol = parameters.getParameter("namenodeprotocol");
if (nameNodeProtocol == null) {
nameNodeProtocol = "hdfs";
}
String nameNodeHost = parameters.getParameter("namenodehost");
if (nameNodeHost == null) {
nameNodeHost = "localhost";
}
String nameNodePort = parameters.getParameter("namenodeport");
if (nameNodePort == null) {
nameNodePort = "9000";
}
String user = parameters.getParameter("user");
if (user == null) {
user = "";
}
if (tabName.equals(Messages.getString(locale,"HDFSRepositoryConnector.ServerTabName")))
{
out.print(
"<table class=\"displaytable\">\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodeProtocol") + "</nobr></td>\n"+
" <td class=\"value\">\n"+
" <select name=\"namenodeprotocol\" size=\"2\">\n"+
" <option value=\"file\"" + (nameNodeProtocol.equals("file")?" selected=\"true\"":"") + ">file</option>\n"+
" <option value=\"ftp\"" + (nameNodeProtocol.equals("ftp")?" selected=\"true\"":"") + ">ftp</option>\n"+
" <option value=\"har\"" + (nameNodeProtocol.equals("har")?" selected=\"true\"":"") + ">har</option>\n"+
" <option value=\"hdfs\"" + (nameNodeProtocol.equals("hdfs")?" selected=\"true\"":"") + ">hdfs</option>\n"+
" <option value=\"s3\"" + (nameNodeProtocol.equals("s3")?" selected=\"true\"":"") + ">s3</option>\n"+
" <option value=\"s3n\"" + (nameNodeProtocol.equals("s3n")?" selected=\"true\"":"") + ">s3n</option>\n"+
" <option value=\"viewfs\"" + (nameNodeProtocol.equals("viewfs")?" selected=\"true\"":"") + ">viewfs</option>\n"+
" </select>\n"+
" </td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodeHost") + "</nobr></td>\n"+
" <td class=\"value\">\n"+
" <input name=\"namenodehost\" type=\"text\" size=\"32\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodeHost)+"\"/>\n"+
" </td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodePort") + "</nobr></td>\n"+
" <td class=\"value\">\n"+
" <input name=\"namenodeport\" type=\"text\" size=\"5\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodePort)+"\"/>\n"+
" </td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.User") + "</nobr></td>\n"+
" <td class=\"value\">\n"+
" <input name=\"user\" type=\"text\" size=\"32\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(user)+"\"/>\n"+
" </td>\n"+
" </tr>\n"+
"</table>\n"
);
}
else
{
// Server tab hiddens
out.print(
"<input type=\"hidden\" name=\"namenodeprotocol\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodeProtocol)+"\"/>\n"+
"<input type=\"hidden\" name=\"namenodehost\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodeHost)+"\"/>\n"+
"<input type=\"hidden\" name=\"namenodeport\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodePort)+"\"/>\n"+
"<input type=\"hidden\" name=\"user\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(user)+"\"/>\n"
);
}
}
/** Process a configuration post.
* This method is called at the start of the connector's configuration page, whenever there is a possibility that form data for a connection has been
* posted. Its purpose is to gather form information and modify the configuration parameters accordingly.
* The name of the posted form is "editconnection".
*@param threadContext is the local thread context.
*@param variableContext is the set of variables available from the post, including binary file post information.
*@param parameters are the configuration parameters, as they currently exist, for this connection being configured.
*@return null if all is well, or a string error message if there is an error that should prevent saving of the connection (and cause a redirection to an error page).
*/
@Override
public String processConfigurationPost(IThreadContext threadContext, IPostParameters variableContext, ConfigParams parameters)
throws ManifoldCFException
{
String nameNodeProtocol = variableContext.getParameter("namenodeprotocol");
if (nameNodeProtocol != null) {
parameters.setParameter("namenodeprotocol", nameNodeProtocol);
}
String nameNodeHost = variableContext.getParameter("namenodehost");
if (nameNodeHost != null) {
parameters.setParameter("namenodehost", nameNodeHost);
}
String nameNodePort = variableContext.getParameter("namenodeport");
if (nameNodePort != null) {
parameters.setParameter("namenodeport", nameNodePort);
}
String user = variableContext.getParameter("user");
if (user != null) {
parameters.setParameter("user", user);
}
return null;
}
/** View configuration.
* This method is called in the body section of the connector's view configuration page. Its purpose is to present the connection information to the user.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html> and <body> tags.
*@param threadContext is the local thread context.
*@param out is the output to which any HTML should be sent.
*@param parameters are the configuration parameters, as they currently exist, for this connection being configured.
*/
@Override
public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out, Locale locale, ConfigParams parameters)
throws ManifoldCFException, IOException
{
String nameNodeProtocol = parameters.getParameter("namenodeprotocol");
if (nameNodeProtocol == null)
nameNodeProtocol = "hdfs";
String nameNodeHost = parameters.getParameter("namenodehost");
String nameNodePort = parameters.getParameter("namenodeport");
String user = parameters.getParameter("user");
out.print(
"<table class=\"displaytable\">\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodeProtocol") + "</nobr></td>\n"+
" <td class=\"value\">\n"+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodeProtocol)+"</td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodeHost") + "</nobr></td>\n"+
" <td class=\"value\">\n"+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodeHost)+"</td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NameNodePort") + "</nobr></td>\n"+
" <td class=\"value\">\n"+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nameNodePort)+"</td>\n"+
" </tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.User") + "</nobr></td>\n"+
" <td class=\"value\">\n"+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(user)+"</td>\n"+
" </tr>\n"+
"</table>\n"
);
}
/** Output the specification header section.
* This method is called in the head section of a job page which has selected a repository connection of the current type. Its purpose is to add the required tabs
* to the list, and to output any javascript methods that might be needed by the job editing HTML.
*@param out is the output to which any HTML should be sent.
*@param ds is the current document specification for this job.
*@param tabsArray is an array of tab names. Add to this array any tab names that are specific to the connector.
*/
@Override
public void outputSpecificationHeader(IHTTPOutput out, Locale locale, DocumentSpecification ds, List<String> tabsArray)
throws ManifoldCFException, IOException
{
tabsArray.add(Messages.getString(locale,"HDFSRepositoryConnector.Paths"));
out.print(
"<script type=\"text/javascript\">\n"+
"<!--\n"+
"function checkSpecification()\n"+
"{\n"+
" // Does nothing right now.\n"+
" return true;\n"+
"}\n"+
"\n"+
"function SpecOp(n, opValue, anchorvalue)\n"+
"{\n"+
" eval(\"editjob.\"+n+\".value = \\\"\"+opValue+\"\\\"\");\n"+
" postFormSetAnchor(anchorvalue);\n"+
"}\n"+
"//-->\n"+
"</script>\n"
);
}
/** Output the specification body section.
* This method is called in the body section of a job page which has selected a repository connection of the current type. Its purpose is to present the required form elements for editing.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html>, <body>, and <form> tags. The name of the
* form is "editjob".
*@param out is the output to which any HTML should be sent.
*@param ds is the current document specification for this job.
*@param tabName is the current tab name.
*/
@Override
public void outputSpecificationBody(IHTTPOutput out, Locale locale, DocumentSpecification ds, String tabName)
throws ManifoldCFException, IOException
{
int i;
int k;
// Paths tab
if (tabName.equals(Messages.getString(locale,"HDFSRepositoryConnector.Paths")))
{
out.print(
"<table class=\"displaytable\">\n"+
" <tr><td class=\"separator\" colspan=\"3\"><hr/></td></tr>\n"+
" <tr>\n"+
" <td class=\"description\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Paths2") + "</nobr></td>\n"+
" <td class=\"boxcell\">\n"+
" <table class=\"formtable\">\n"+
" <tr class=\"formheaderrow\">\n"+
" <td class=\"formcolumnheader\"></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.RootPath") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.ConvertToURI") + "<br/>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.ConvertToURIExample")+ "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Rules") + "</nobr></td>\n"+
" </tr>\n"
);
i = 0;
k = 0;
while (i < ds.getChildCount())
{
SpecificationNode sn = ds.getChild(i++);
if (sn.getType().equals("startpoint"))
{
String pathDescription = "_"+Integer.toString(k);
String pathOpName = "specop"+pathDescription;
String path = sn.getAttributeValue("path");
String convertToURIString = sn.getAttributeValue("converttouri");
boolean convertToURI = false;
if (convertToURIString != null && convertToURIString.equals("true"))
convertToURI = true;
out.print(
" <tr class=\""+(((k % 2)==0)?"evenformrow":"oddformrow")+"\">\n"+
" <td class=\"formcolumncell\">\n"+
" <input type=\"hidden\" name=\""+pathOpName+"\" value=\"\"/>\n"+
" <input type=\"hidden\" name=\""+"specpath"+pathDescription+"\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(sn.getAttributeValue("path"))+"\"/>\n"+
" <a name=\""+"path_"+Integer.toString(k)+"\">\n"+
" <input type=\"button\" value=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.Delete") + "\" onClick='Javascript:SpecOp(\""+pathOpName+"\",\"Delete\",\"path_"+Integer.toString(k)+"\")' alt=\""+Messages.getAttributeString(locale,"HDFSRepositoryConnector.DeletePath")+Integer.toString(k)+"\"/>\n"+
" </a>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+org.apache.manifoldcf.ui.util.Encoder.bodyEscape(path)+" \n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <input type=\"hidden\" name=\"converttouri"+pathDescription+"\" value=\""+(convertToURI?"true":"false")+"\">\n"+
" <nobr>\n"+
" "+(convertToURI?Messages.getBodyString(locale,"HDFSRepositoryConnector.Yes"):Messages.getBodyString(locale,"HDFSRepositoryConnector.No"))+" \n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"boxcell\">\n"+
" <input type=\"hidden\" name=\""+"specchildcount"+pathDescription+"\" value=\""+Integer.toString(sn.getChildCount())+"\"/>\n"+
" <table class=\"formtable\">\n"+
" <tr class=\"formheaderrow\">\n"+
" <td class=\"formcolumnheader\"></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.IncludeExclude") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.FileDirectory") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Match") + "</nobr></td>\n"+
" </tr>\n"
);
int j = 0;
while (j < sn.getChildCount())
{
SpecificationNode excludeNode = sn.getChild(j);
String instanceDescription = "_"+Integer.toString(k)+"_"+Integer.toString(j);
String instanceOpName = "specop" + instanceDescription;
String nodeFlavor = excludeNode.getType();
String nodeType = excludeNode.getAttributeValue("type");
String nodeMatch = excludeNode.getAttributeValue("match");
out.print(
" <tr class=\"evenformrow\">\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input type=\"button\" value=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.InsertHere") + "\" onClick='Javascript:SpecOp(\"specop"+instanceDescription+"\",\"Insert Here\",\"match_"+Integer.toString(k)+"_"+Integer.toString(j+1)+"\")' alt=\""+Messages.getAttributeString(locale,"HDFSRepositoryConnector.InsertNewMatchForPath")+Integer.toString(k)+" before position #"+Integer.toString(j)+"\"/>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <select name=\""+"specflavor"+instanceDescription+"\">\n"+
" <option value=\"include\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.include") + "</option>\n"+
" <option value=\"exclude\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.exclude") + "</option>\n"+
" </select>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <select name=\""+"spectype"+instanceDescription+"\">\n"+
" <option value=\"file\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.File") + "</option>\n"+
" <option value=\"directory\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Directory") + "</option>\n"+
" </select>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input type=\"text\" size=\"10\" name=\""+"specmatch"+instanceDescription+"\" value=\"\"/>\n"+
" </nobr>\n"+
" </td>\n"+
" </tr>\n"+
" <tr class=\"oddformrow\">\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input type=\"hidden\" name=\""+"specop"+instanceDescription+"\" value=\"\"/>\n"+
" <input type=\"hidden\" name=\""+"specfl"+instanceDescription+"\" value=\""+nodeFlavor+"\"/>\n"+
" <input type=\"hidden\" name=\""+"specty"+instanceDescription+"\" value=\""+nodeType+"\"/>\n"+
" <input type=\"hidden\" name=\""+"specma"+instanceDescription+"\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nodeMatch)+"\"/>\n"+
" <a name=\""+"match_"+Integer.toString(k)+"_"+Integer.toString(j)+"\">\n"+
" <input type=\"button\" value=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.Delete") + "\" onClick='Javascript:SpecOp(\"specop"+instanceDescription+"\",\"Delete\",\"match_"+Integer.toString(k)+"_"+Integer.toString(j)+"\")' alt=\""+Messages.getAttributeString(locale,"HDFSRepositoryConnector.DeletePath")+Integer.toString(k)+", match spec #"+Integer.toString(j)+"\"/>\n"+
" </a>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+nodeFlavor+"\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+nodeType+"\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+org.apache.manifoldcf.ui.util.Encoder.bodyEscape(nodeMatch)+"\n"+
" </nobr>\n"+
" </td>\n"+
" </tr>\n"
);
j++;
}
if (j == 0)
{
out.print(
" <tr class=\"formrow\"><td class=\"formcolumnmessage\" colspan=\"4\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NoRulesDefined") + "</td></tr>\n"
);
}
out.print(
" <tr class=\"formrow\"><td class=\"lightseparator\" colspan=\"4\"><hr/></td></tr>\n"+
" <tr class=\"formrow\">\n"+
" <td class=\"formcolumncell\">\n"+
" <a name=\""+"match_"+Integer.toString(k)+"_"+Integer.toString(j)+"\">\n"+
" <input type=\"button\" value=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.Add") + "\" onClick='Javascript:SpecOp(\""+pathOpName+"\",\"Add\",\"match_"+Integer.toString(k)+"_"+Integer.toString(j+1)+"\")' alt=\""+Messages.getAttributeString(locale,"HDFSRepositoryConnector.AddNewMatchForPath")+Integer.toString(k)+"\"/>\n"+
" </a>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <select name=\""+"specflavor"+pathDescription+"\">\n"+
" <option value=\"include\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.include") + "</option>\n"+
" <option value=\"exclude\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.exclude") + "</option>\n"+
" </select>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <select name=\""+"spectype"+pathDescription+"\">\n"+
" <option value=\"file\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.File") + "</option>\n"+
" <option value=\"directory\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Directory") + "</option>\n"+
" </select>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input type=\"text\" size=\"10\" name=\""+"specmatch"+pathDescription+"\" value=\"\"/>\n"+
" </nobr>\n"+
" </td>\n"+
" </tr>\n"+
" </table>\n"+
" </td>\n"+
" </tr>\n"
);
k++;
}
}
if (k == 0)
{
out.print(
" <tr class=\"formrow\"><td class=\"formcolumnmessage\" colspan=\"4\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NoDocumentsSpecified") + "</td></tr>\n"
);
}
out.print(
" <tr class=\"formrow\"><td class=\"lightseparator\" colspan=\"4\"><hr/></td></tr>\n"+
" <tr class=\"formrow\">\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <a name=\""+"path_"+Integer.toString(k)+"\">\n"+
" <input type=\"button\" value=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.Add") + "\" onClick='Javascript:SpecOp(\"specop\",\"Add\",\"path_"+Integer.toString(i+1)+"\")' alt=\"" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.AddNewPath") + "\"/>\n"+
" <input type=\"hidden\" name=\"pathcount\" value=\""+Integer.toString(k)+"\"/>\n"+
" <input type=\"hidden\" name=\"specop\" value=\"\"/>\n"+
" </a>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input type=\"text\" size=\"30\" name=\"specpath\" value=\"\"/>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" <input name=\"converttouri\" type=\"checkbox\" value=\"true\"/>\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" </td>\n"+
" </tr>\n"+
" </table>\n"+
" </td>\n"+
" </tr>\n"+
"</table>\n"
);
}
else
{
i = 0;
k = 0;
while (i < ds.getChildCount())
{
SpecificationNode sn = ds.getChild(i++);
if (sn.getType().equals("startpoint"))
{
String pathDescription = "_"+Integer.toString(k);
String path = sn.getAttributeValue("path");
String convertToURIString = sn.getAttributeValue("converttouri");
boolean convertToURI = false;
if (convertToURIString != null && convertToURIString.equals("true"))
convertToURI = true;
out.print(
"<input type=\"hidden\" name=\"specpath"+pathDescription+"\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(path)+"\"/>\n"+
"<input type=\"hidden\" name=\"converttouri"+pathDescription+"\" value=\""+(convertToURI?"true":"false")+"\">\n"+
"<input type=\"hidden\" name=\"specchildcount"+pathDescription+"\" value=\""+Integer.toString(sn.getChildCount())+"\"/>\n"
);
int j = 0;
while (j < sn.getChildCount())
{
SpecificationNode excludeNode = sn.getChild(j);
String instanceDescription = "_"+Integer.toString(k)+"_"+Integer.toString(j);
String nodeFlavor = excludeNode.getType();
String nodeType = excludeNode.getAttributeValue("type");
String nodeMatch = excludeNode.getAttributeValue("match");
out.print(
"<input type=\"hidden\" name=\"specfl"+instanceDescription+"\" value=\""+nodeFlavor+"\"/>\n"+
"<input type=\"hidden\" name=\"specty"+instanceDescription+"\" value=\""+nodeType+"\"/>\n"+
"<input type=\"hidden\" name=\"specma"+instanceDescription+"\" value=\""+org.apache.manifoldcf.ui.util.Encoder.attributeEscape(nodeMatch)+"\"/>\n"
);
j++;
}
k++;
}
}
out.print(
"<input type=\"hidden\" name=\"pathcount\" value=\""+Integer.toString(k)+"\"/>\n"
);
}
}
/** Process a specification post.
* This method is called at the start of job's edit or view page, whenever there is a possibility that form data for a connection has been
* posted. Its purpose is to gather form information and modify the document specification accordingly.
* The name of the posted form is "editjob".
*@param variableContext contains the post data, including binary file-upload information.
*@param ds is the current document specification for this job.
*@return null if all is well, or a string error message if there is an error that should prevent saving of the job (and cause a redirection to an error page).
*/
@Override
public String processSpecificationPost(IPostParameters variableContext, Locale locale, DocumentSpecification ds)
throws ManifoldCFException
{
String x = variableContext.getParameter("pathcount");
if (x != null)
{
ds.clearChildren();
// Find out how many children were sent
int pathCount = Integer.parseInt(x);
// Gather up these
int i = 0;
int k = 0;
while (i < pathCount)
{
String pathDescription = "_"+Integer.toString(i);
String pathOpName = "specop"+pathDescription;
x = variableContext.getParameter(pathOpName);
if (x != null && x.equals("Delete"))
{
// Skip to the next
i++;
continue;
}
// Path inserts won't happen until the very end
String path = variableContext.getParameter("specpath"+pathDescription);
String convertToURI = variableContext.getParameter("converttouri"+pathDescription);
SpecificationNode node = new SpecificationNode("startpoint");
node.setAttribute("path",path);
if (convertToURI != null)
node.setAttribute("converttouri",convertToURI);
// Now, get the number of children
String y = variableContext.getParameter("specchildcount"+pathDescription);
int childCount = Integer.parseInt(y);
int j = 0;
int w = 0;
while (j < childCount)
{
String instanceDescription = "_"+Integer.toString(i)+"_"+Integer.toString(j);
// Look for an insert or a delete at this point
String instanceOp = "specop"+instanceDescription;
String z = variableContext.getParameter(instanceOp);
String flavor;
String type;
String match;
SpecificationNode sn;
if (z != null && z.equals("Delete"))
{
// Process the deletion as we gather
j++;
continue;
}
if (z != null && z.equals("Insert Here"))
{
// Process the insertion as we gather.
flavor = variableContext.getParameter("specflavor"+instanceDescription);
type = variableContext.getParameter("spectype"+instanceDescription);
match = variableContext.getParameter("specmatch"+instanceDescription);
sn = new SpecificationNode(flavor);
sn.setAttribute("type",type);
sn.setAttribute("match",match);
node.addChild(w++,sn);
}
flavor = variableContext.getParameter("specfl"+instanceDescription);
type = variableContext.getParameter("specty"+instanceDescription);
match = variableContext.getParameter("specma"+instanceDescription);
sn = new SpecificationNode(flavor);
sn.setAttribute("type",type);
sn.setAttribute("match",match);
node.addChild(w++,sn);
j++;
}
if (x != null && x.equals("Add"))
{
// Process adds to the end of the rules in-line
String match = variableContext.getParameter("specmatch"+pathDescription);
String type = variableContext.getParameter("spectype"+pathDescription);
String flavor = variableContext.getParameter("specflavor"+pathDescription);
SpecificationNode sn = new SpecificationNode(flavor);
sn.setAttribute("type",type);
sn.setAttribute("match",match);
node.addChild(w,sn);
}
ds.addChild(k++,node);
i++;
}
// See if there's a global add operation
String op = variableContext.getParameter("specop");
if (op != null && op.equals("Add"))
{
String path = variableContext.getParameter("specpath");
String convertToURI = variableContext.getParameter("converttouri");
SpecificationNode node = new SpecificationNode("startpoint");
node.setAttribute("path",path);
if (convertToURI != null)
node.setAttribute("converttouri",convertToURI);
// Now add in the defaults; these will be "include all directories" and "include all files".
SpecificationNode sn = new SpecificationNode("include");
sn.setAttribute("type","file");
sn.setAttribute("match","*");
node.addChild(node.getChildCount(),sn);
sn = new SpecificationNode("include");
sn.setAttribute("type","directory");
sn.setAttribute("match","*");
node.addChild(node.getChildCount(),sn);
ds.addChild(k,node);
}
}
/*
* "filepathtouri"
*/
String filepathtouri = variableContext.getParameter("filepathtouri");
if (filepathtouri != null) {
SpecificationNode sn;
int i = 0;
while (i < ds.getChildCount()) {
if (ds.getChild(i).getType().equals("filepathtouri")) {
ds.removeChild(i);
} else {
i++;
}
}
sn = new SpecificationNode("filepathtouri");
sn.setValue(filepathtouri);
ds.addChild(ds.getChildCount(),sn);
}
return null;
}
/** View specification.
* This method is called in the body section of a job's view page. Its purpose is to present the document specification information to the user.
* The coder can presume that the HTML that is output from this configuration will be within appropriate <html> and <body> tags.
*@param out is the output to which any HTML should be sent.
*@param ds is the current document specification for this job.
*/
@Override
public void viewSpecification(IHTTPOutput out, Locale locale, DocumentSpecification ds)
throws ManifoldCFException, IOException
{
out.print(
"<table class=\"displaytable\">\n"+
" <tr>\n"+
" <td class=\"description\">" + Messages.getAttributeString(locale,"HDFSRepositoryConnector.Paths2") + "</td>\n"+
" <td class=\"boxcell\">\n"+
" <table class=\"formtable\">\n"+
" <tr class=\"formheaderrow\">\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.RootPath") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.ConvertToURI") + "<br/>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.ConvertToURIExample")+ "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Rules") + "</nobr></td>\n"+
" </tr>\n"
);
int k = 0;
for (int i = 0; i < ds.getChildCount(); i++)
{
SpecificationNode sn = ds.getChild(i);
if (sn.getType().equals("startpoint"))
{
String path = sn.getAttributeValue("path");
String convertToURIString = sn.getAttributeValue("converttouri");
boolean convertToURI = false;
if (convertToURIString != null && convertToURIString.equals("true"))
convertToURI = true;
out.print(
" <tr class=\""+(((k % 2)==0)?"evenformrow":"oddformrow")+"\">\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+org.apache.manifoldcf.ui.util.Encoder.bodyEscape(path)+" \n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+(convertToURI?Messages.getBodyString(locale,"HDFSRepositoryConnector.Yes"):Messages.getBodyString(locale,"HDFSRepositoryConnector.No"))+" \n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"boxcell\">\n"+
" <table class=\"formtable\">\n"+
" <tr class=\"formheaderrow\">\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.IncludeExclude") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.FileDirectory") + "</nobr></td>\n"+
" <td class=\"formcolumnheader\"><nobr>" + Messages.getBodyString(locale,"HDFSRepositoryConnector.Match") + "</nobr></td>\n"+
" </tr>\n"
);
int l = 0;
for (int j = 0; j < sn.getChildCount(); j++)
{
SpecificationNode excludeNode = sn.getChild(j);
String nodeFlavor = excludeNode.getType();
String nodeType = excludeNode.getAttributeValue("type");
String nodeMatch = excludeNode.getAttributeValue("match");
out.print(
" <tr class=\""+(((l % 2)==0)?"evenformrow":"oddformrow")+"\">\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+nodeFlavor+"\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+nodeType+"\n"+
" </nobr>\n"+
" </td>\n"+
" <td class=\"formcolumncell\">\n"+
" <nobr>\n"+
" "+org.apache.manifoldcf.ui.util.Encoder.bodyEscape(nodeMatch)+"\n"+
" </nobr>\n"+
" </td>\n"+
" </tr>\n"
);
l++;
}
if (l == 0)
{
out.print(
" <tr><td class=\"formcolumnmessage\" colspan=\"3\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NoRulesDefined") + "</td></tr>\n"
);
}
out.print(
" </table>\n"+
" </td>\n"
);
out.print(
" </tr>\n"
);
k++;
}
}
if (k == 0)
{
out.print(
" <tr><td class=\"formcolumnmessage\" colspan=\"3\">" + Messages.getBodyString(locale,"HDFSRepositoryConnector.NoDocumentsSpecified") + "</td></tr>\n"
);
}
out.print(
" </table>\n"+
" </td>\n"+
" </tr>\n"
);
out.print(
"</table>\n"
);
}
// Protected static methods
/** Convert a path to an HDFS wget URI. The URI is the URI that will be the unique key from
* the search index, and will be presented to the user as part of the search results.
*@param filePath is the document filePath.
*@param repositoryPath is the document repositoryPath.
*@return the document uri.
*/
protected static String convertToWGETURI(String path)
throws ManifoldCFException
{
//
// Note well: This MUST be a legal URI!!!
try
{
StringBuffer sb = new StringBuffer();
String[] tmp = path.split("/", 3);
String scheme = "";
String host = "";
String other = "";
if (tmp.length >= 1)
scheme = tmp[0];
else
scheme = "hdfs";
if (tmp.length >= 2)
host = tmp[1];
else
host = "localhost:9000";
if (tmp.length >= 3)
other = "/" + tmp[2];
else
other = "/";
return new URI(scheme + "://" + host + other).toURL().toString();
}
catch (java.net.MalformedURLException e)
{
throw new ManifoldCFException("Bad url: "+e.getMessage(),e);
}
catch (URISyntaxException e)
{
throw new ManifoldCFException("Bad url: "+e.getMessage(),e);
}
}
/** This method finds the part of the path that should be converted to a URI.
* Returns null if the path should not be converted.
*@param spec is the document specification.
*@param documentIdentifier is the document identifier.
*@return the part of the path to be converted, or null.
*/
protected static String findConvertPath(String nameNode, DocumentSpecification spec, Path theFile)
{
String fullpath = theFile.toString();
for (int j = 0; j < spec.getChildCount(); j++)
{
SpecificationNode sn = spec.getChild(j);
if (sn.getType().equals("startpoint"))
{
String path = sn.getAttributeValue("path");
String convertToURI = sn.getAttributeValue("converttouri");
if (path.length() > 0 && convertToURI != null && convertToURI.equals("true"))
{
path = nameNode + path;
if (!path.endsWith("/"))
path += "/";
if (fullpath.startsWith(path))
return fullpath.substring(path.length());
}
}
}
return null;
}
/** Map an extension to a mime type */
protected static String mapExtensionToMimeType(String fileName)
{
int slashIndex = fileName.lastIndexOf("/");
if (slashIndex != -1)
fileName = fileName.substring(slashIndex+1);
int dotIndex = fileName.lastIndexOf(".");
if (dotIndex == -1)
return null;
return ExtensionMimeMap.mapToMimeType(fileName.substring(dotIndex+1).toLowerCase(java.util.Locale.ROOT));
}
/** Check if a file or directory should be included, given a document specification.
*@param fileName is the canonical file name.
*@param documentSpecification is the specification.
*@return true if it should be included.
*/
protected static boolean checkInclude(String nameNode, FileStatus fileStatus, String fileName, DocumentSpecification documentSpecification)
throws ManifoldCFException
{
if (Logging.connectors.isDebugEnabled())
{
Logging.connectors.debug("Checking whether to include file '"+fileName+"'");
}
String pathPart;
String filePart;
if (fileStatus.isDirectory())
{
pathPart = fileName;
filePart = null;
}
else
{
pathPart = fileStatus.getPath().getParent().toString();
filePart = fileStatus.getPath().getName();
}
// Scan until we match a startpoint
int i = 0;
while (i < documentSpecification.getChildCount())
{
SpecificationNode sn = documentSpecification.getChild(i++);
if (sn.getType().equals("startpoint"))
{
String path = null;
try {
path = new URI(nameNode).resolve(sn.getAttributeValue("path")).toString();
} catch (URISyntaxException e) {
e.printStackTrace();
}
if (Logging.connectors.isDebugEnabled())
{
Logging.connectors.debug("Checking path '"+path+"' against canonical '"+pathPart+"'");
}
// Compare with filename
int matchEnd = matchSubPath(path,pathPart);
if (matchEnd == -1)
{
if (Logging.connectors.isDebugEnabled())
{
Logging.connectors.debug("Match check '"+path+"' against canonical '"+pathPart+"' failed");
}
continue;
}
// matchEnd is the start of the rest of the path (after the match) in fileName.
// We need to walk through the rules and see whether it's in or out.
int j = 0;
while (j < sn.getChildCount())
{
SpecificationNode node = sn.getChild(j++);
String flavor = node.getType();
String match = node.getAttributeValue("match");
String type = node.getAttributeValue("type");
// If type is "file", then our match string is against the filePart.
// If filePart is null, then this rule is simply skipped.
String sourceMatch;
int sourceIndex;
if (type.equals("file"))
{
if (filePart == null)
{
continue;
}
sourceMatch = filePart;
sourceIndex = 0;
}
else
{
if (filePart != null)
{
continue;
}
sourceMatch = pathPart;
sourceIndex = matchEnd;
}
if (flavor.equals("include"))
{
if (checkMatch(sourceMatch,sourceIndex,match))
{
return true;
}
}
else if (flavor.equals("exclude"))
{
if (checkMatch(sourceMatch,sourceIndex,match))
{
return false;
}
}
}
}
}
if (Logging.connectors.isDebugEnabled())
{
Logging.connectors.debug("Not including '"+fileName+"' because no matching rules");
}
return false;
}
/** Check if a file should be ingested, given a document specification. It is presumed that
* documents that do not pass checkInclude() will be checked with this method.
*@param file is the file.
*@param documentSpecification is the specification.
*/
protected static boolean checkIngest(String nameNode, FileStatus fileStatus, DocumentSpecification documentSpecification)
throws ManifoldCFException
{
// Since the only exclusions at this point are not based on file contents, this is a no-op.
// MHL
return true;
}
/** Match a sub-path. The sub-path must match the complete starting part of the full path, in a path
* sense. The returned value should point into the file name beyond the end of the matched path, or
* be -1 if there is no match.
*@param subPath is the sub path.
*@param fullPath is the full path.
*@return the index of the start of the remaining part of the full path, or -1.
*/
protected static int matchSubPath(String subPath, String fullPath)
{
if (subPath.length() > fullPath.length())
return -1;
if (fullPath.startsWith(subPath) == false)
return -1;
int rval = subPath.length();
if (fullPath.length() == rval)
return rval;
char x = fullPath.charAt(rval);
if (x == Path.SEPARATOR_CHAR)
rval++;
return rval;
}
/** Check a match between two strings with wildcards.
*@param sourceMatch is the expanded string (no wildcards)
*@param sourceIndex is the starting point in the expanded string.
*@param match is the wildcard-based string.
*@return true if there is a match.
*/
protected static boolean checkMatch(String sourceMatch, int sourceIndex, String match)
{
// Note: The java regex stuff looks pretty heavyweight for this purpose.
// I've opted to try and do a simple recursive version myself, which is not compiled.
// Basically, the match proceeds by recursive descent through the string, so that all *'s cause
// recursion.
boolean caseSensitive = true;
return processCheck(caseSensitive, sourceMatch, sourceIndex, match, 0);
}
/** Recursive worker method for checkMatch. Returns 'true' if there is a path that consumes both
* strings in their entirety in a matched way.
*@param caseSensitive is true if file names are case sensitive.
*@param sourceMatch is the source string (w/o wildcards)
*@param sourceIndex is the current point in the source string.
*@param match is the match string (w/wildcards)
*@param matchIndex is the current point in the match string.
*@return true if there is a match.
*/
protected static boolean processCheck(boolean caseSensitive, String sourceMatch, int sourceIndex,
String match, int matchIndex)
{
// Logging.connectors.debug("Matching '"+sourceMatch+"' position "+Integer.toString(sourceIndex)+
// " against '"+match+"' position "+Integer.toString(matchIndex));
// Match up through the next * we encounter
while (true)
{
// If we've reached the end, it's a match.
if (sourceMatch.length() == sourceIndex && match.length() == matchIndex)
return true;
// If one has reached the end but the other hasn't, no match
if (match.length() == matchIndex)
return false;
if (sourceMatch.length() == sourceIndex)
{
if (match.charAt(matchIndex) != '*')
return false;
matchIndex++;
continue;
}
char x = sourceMatch.charAt(sourceIndex);
char y = match.charAt(matchIndex);
if (!caseSensitive)
{
if (x >= 'A' && x <= 'Z')
x -= 'A'-'a';
if (y >= 'A' && y <= 'Z')
y -= 'A'-'a';
}
if (y == '*')
{
// Wildcard!
// We will recurse at this point.
// Basically, we want to combine the results for leaving the "*" in the match string
// at this point and advancing the source index, with skipping the "*" and leaving the source
// string alone.
return processCheck(caseSensitive,sourceMatch,sourceIndex+1,match,matchIndex) ||
processCheck(caseSensitive,sourceMatch,sourceIndex,match,matchIndex+1);
}
if (y == '?' || x == y)
{
sourceIndex++;
matchIndex++;
}
else
return false;
}
}
/**
* @param e
* @throws ManifoldCFException
* @throws ServiceInterruption
*/
private static void handleIOException(IOException e) throws ManifoldCFException, ServiceInterruption {
if (!(e instanceof java.net.SocketTimeoutException) && (e instanceof InterruptedIOException)) {
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
}
Logging.connectors.warn("HDFS: IO exception: "+e.getMessage(),e);
long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("IO exception: "+e.getMessage(), e, currentTime + 300000L, currentTime + 3 * 60 * 60000L,-1,false);
}
/**
* @param e
* @throws ManifoldCFException
* @throws ServiceInterruption
*/
private static void handleURISyntaxException(URISyntaxException e) throws ManifoldCFException, ServiceInterruption {
// Permanent problem
Logging.connectors.error("HDFS: Bad namenode specification: "+e.getMessage(), e);
throw new ManifoldCFException("Bad namenode specification: "+e.getMessage(), e);
}
protected static class CheckConnectionThread extends Thread {
protected final HDFSSession session;
protected Throwable exception = null;
public CheckConnectionThread(HDFSSession session) {
super();
this.session = session;
setDaemon(true);
}
public void run() {
try {
session.getRepositoryInfo();
} catch (Throwable e) {
this.exception = e;
}
}
public void finishUp() throws InterruptedException, IOException {
join();
Throwable thr = exception;
if (thr != null) {
if (thr instanceof IOException) {
throw (IOException) thr;
} else if (thr instanceof RuntimeException) {
throw (RuntimeException) thr;
} else {
throw (Error) thr;
}
}
}
}
/**
* @throws ManifoldCFException
* @throws ServiceInterruption
*/
protected void checkConnection() throws ManifoldCFException, ServiceInterruption {
CheckConnectionThread t = new CheckConnectionThread(getSession());
try {
t.start();
t.finishUp();
return;
} catch (InterruptedException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
handleIOException(e);
} catch (InterruptedIOException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (IOException e) {
handleIOException(e);
}
}
protected static class GetSessionThread extends Thread {
protected final String nameNode;
protected final String user;
protected Throwable exception = null;
protected HDFSSession session;
public GetSessionThread(String nameNode, String user) {
super();
this.nameNode = nameNode;
this.user = user;
setDaemon(true);
}
public void run() {
try {
// Create a session
session = new HDFSSession(nameNode, user);
} catch (Throwable e) {
this.exception = e;
}
}
public void finishUp()
throws InterruptedException, IOException, URISyntaxException {
join();
Throwable thr = exception;
if (thr != null) {
if (thr instanceof IOException) {
throw (IOException) thr;
} else if (thr instanceof URISyntaxException) {
throw (URISyntaxException) thr;
} else if (thr instanceof RuntimeException) {
throw (RuntimeException) thr;
} else {
throw (Error) thr;
}
}
}
public HDFSSession getResult() {
return session;
}
}
protected FileStatus[] getChildren(Path path)
throws ManifoldCFException, ServiceInterruption {
GetChildrenThread t = new GetChildrenThread(getSession(), path);
try {
t.start();
t.finishUp();
return t.getResult();
} catch (InterruptedException e) {
t.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
handleIOException(e);
} catch (InterruptedIOException e) {
t.interrupt();
handleIOException(e);
} catch (IOException e) {
handleIOException(e);
}
return null;
}
protected class GetChildrenThread extends Thread {
protected Throwable exception = null;
protected FileStatus[] result = null;
protected final HDFSSession session;
protected final Path path;
public GetChildrenThread(HDFSSession session, Path path) {
super();
this.session = session;
this.path = path;
setDaemon(true);
}
@Override
public void run() {
try {
result = session.listStatus(path);
} catch (Throwable e) {
this.exception = e;
}
}
public void finishUp() throws InterruptedException, IOException {
join();
Throwable thr = exception;
if (thr != null) {
if (thr instanceof RuntimeException) {
throw (RuntimeException) thr;
} else if (thr instanceof Error) {
throw (Error) thr;
} else if (thr instanceof IOException) {
throw (IOException) thr;
} else {
throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
}
}
}
public FileStatus[] getResult() {
return result;
}
}
protected FileStatus getObject(Path path)
throws ManifoldCFException, ServiceInterruption {
GetObjectThread objt = new GetObjectThread(getSession(),path);
try {
objt.start();
objt.finishUp();
return objt.getResponse();
} catch (InterruptedException e) {
objt.interrupt();
throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
} catch (java.net.SocketTimeoutException e) {
handleIOException(e);
} catch (InterruptedIOException e) {
objt.interrupt();
handleIOException(e);
} catch (IOException e) {
handleIOException(e);
}
return null;
}
protected static class GetObjectThread extends Thread {
protected final HDFSSession session;
protected final Path nodeId;
protected Throwable exception = null;
protected FileStatus response = null;
public GetObjectThread(HDFSSession session, Path nodeId) {
super();
setDaemon(true);
this.session = session;
this.nodeId = nodeId;
}
public void run() {
try {
response = session.getObject(nodeId);
} catch (Throwable e) {
this.exception = e;
}
}
public void finishUp() throws InterruptedException, IOException {
join();
Throwable thr = exception;
if (thr != null) {
if (thr instanceof RuntimeException) {
throw (RuntimeException) thr;
} else if (thr instanceof Error) {
throw (Error) thr;
} else if (thr instanceof IOException) {
throw (IOException) thr;
} else {
throw new RuntimeException("Unhandled exception of type: "+thr.getClass().getName(),thr);
}
}
}
public FileStatus getResponse() {
return response;
}
}
protected static class BackgroundStreamThread extends Thread
{
protected final HDFSSession session;
protected final Path nodeId;
protected boolean abortThread = false;
protected Throwable responseException = null;
protected InputStream sourceStream = null;
protected XThreadInputStream threadStream = null;
public BackgroundStreamThread(HDFSSession session, Path nodeId)
{
super();
setDaemon(true);
this.session = session;
this.nodeId = nodeId;
}
public void run()
{
try {
try {
synchronized (this) {
if (!abortThread) {
sourceStream = session.getFSDataInputStream(nodeId);
threadStream = new XThreadInputStream(sourceStream);
this.notifyAll();
}
}
if (threadStream != null)
{
// Stuff the content until we are done
threadStream.stuffQueue();
}
} finally {
if (sourceStream != null) {
sourceStream.close();
}
}
} catch (Throwable e) {
responseException = e;
}
}
public InputStream getSafeInputStream() throws InterruptedException, IOException
{
// Must wait until stream is created, or until we note an exception was thrown.
while (true)
{
synchronized (this)
{
if (responseException != null) {
throw new IllegalStateException("Check for response before getting stream");
}
checkException(responseException);
if (threadStream != null) {
return threadStream;
}
wait();
}
}
}
public void finishUp() throws InterruptedException, IOException
{
// This will be called during the finally
// block in the case where all is well (and
// the stream completed) and in the case where
// there were exceptions.
synchronized (this) {
if (threadStream != null) {
threadStream.abort();
}
abortThread = true;
}
join();
checkException(responseException);
}
protected synchronized void checkException(Throwable exception) throws IOException
{
if (exception != null)
{
Throwable e = exception;
if (e instanceof IOException) {
throw (IOException)e;
} else if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else if (e instanceof Error) {
throw (Error)e;
} else {
throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
}
}
}
}
}