| /* $Id: ElasticSearchConnector.java 1299512 2012-03-12 00:58:38Z piergiorgio $ */ |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.manifoldcf.agents.output.elasticsearch; |
| |
| //import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Iterator; |
| import java.util.HashMap; |
| //import java.util.concurrent.TimeUnit; |
| |
| import org.apache.http.conn.HttpClientConnectionManager; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; |
| //import org.apache.http.impl.client.HttpClientBuilder; |
| import org.apache.http.protocol.HttpRequestExecutor; |
| import org.apache.http.impl.client.HttpClients; |
| import org.apache.http.client.config.RequestConfig; |
| import org.apache.http.client.CredentialsProvider; |
| import org.apache.http.impl.client.BasicCredentialsProvider; |
| import org.apache.http.config.SocketConfig; |
| import org.apache.http.config.RegistryBuilder; |
| import org.apache.http.conn.socket.ConnectionSocketFactory; |
| import org.apache.http.conn.ssl.SSLConnectionSocketFactory; |
| import org.apache.http.conn.socket.PlainConnectionSocketFactory; |
| import org.apache.http.conn.ssl.NoopHostnameVerifier; |
| import org.apache.http.auth.Credentials; |
| import org.apache.http.auth.UsernamePasswordCredentials; |
| import org.apache.http.auth.AuthScope; |
| //import org.apache.http.client.HttpRequestRetryHandler; |
| //import org.apache.http.protocol.HttpContext; |
| |
| //import org.apache.commons.io.FilenameUtils; |
| import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity; |
| //import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity; |
| import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity; |
| //import org.apache.manifoldcf.agents.interfaces.IOutputCheckActivity; |
| import org.apache.manifoldcf.agents.interfaces.RepositoryDocument; |
| import org.apache.manifoldcf.agents.interfaces.ServiceInterruption; |
| import org.apache.manifoldcf.agents.output.BaseOutputConnector; |
| import org.apache.manifoldcf.agents.output.elasticsearch.ElasticSearchAction.CommandEnum; |
| import org.apache.manifoldcf.agents.output.elasticsearch.ElasticSearchConnection.Result; |
| import org.apache.manifoldcf.core.interfaces.Specification; |
| import org.apache.manifoldcf.core.interfaces.ConfigParams; |
| import org.apache.manifoldcf.core.interfaces.IHTTPOutput; |
| import org.apache.manifoldcf.core.interfaces.IPostParameters; |
| import org.apache.manifoldcf.core.interfaces.IThreadContext; |
| import org.apache.manifoldcf.core.interfaces.ManifoldCFException; |
| import org.apache.manifoldcf.core.system.ManifoldCF; |
| import org.apache.manifoldcf.core.interfaces.VersionContext; |
| import org.apache.manifoldcf.connectorcommon.interfaces.IKeystoreManager; |
| import org.apache.manifoldcf.connectorcommon.common.InterruptibleSocketFactory; |
| |
| |
| /** |
| * This is the "output connector" for elasticsearch. |
| * |
| * @author Luca Stancapiano |
| */ |
| public class ElasticSearchConnector extends BaseOutputConnector |
| { |
| |
| private final static String ELASTICSEARCH_INDEXATION_ACTIVITY = "Indexation"; |
| private final static String ELASTICSEARCH_DELETION_ACTIVITY = "Deletion"; |
| |
| private final static String[] ELASTICSEARCH_ACTIVITIES = |
| { ELASTICSEARCH_INDEXATION_ACTIVITY, ELASTICSEARCH_DELETION_ACTIVITY }; |
| |
| private final static String ELASTICSEARCH_TAB_SERVER = "ElasticSearchConnector.Server"; |
| private final static String ELASTICSEARCH_TAB_PARAMETERS = "ElasticSearchConnector.Parameters"; |
| |
| /** Forward to the javascript to check the configuration parameters */ |
| private static final String EDIT_CONFIG_HEADER_FORWARD = "editConfiguration.js"; |
| |
| /** Forward to the HTML template to edit the configuration parameters */ |
| private static final String EDIT_CONFIG_FORWARD_SERVER = "editConfiguration_Server.html"; |
| private static final String EDIT_CONFIG_FORWARD_PARAMETERS = "editConfiguration_Parameters.html"; |
| |
| /** Forward to the HTML template to view the configuration parameters */ |
| private static final String VIEW_CONFIG_FORWARD = "viewConfiguration.html"; |
| |
| /** Connection expiration interval */ |
| private static final long EXPIRATION_INTERVAL = 60000L; |
| |
| /** ID length is limited, or you get the HTTP 400 error */ |
| private static final int maxIdLength = 512; |
| |
| private HttpClientConnectionManager connectionManager = null; |
| private HttpClient client = null; |
| private long expirationTime = -1L; |
| |
| public ElasticSearchConnector() |
| { |
| } |
| |
| @Override |
| public void connect(ConfigParams configParams) |
| { |
| super.connect(configParams); |
| } |
| |
| protected HttpClient getSession() |
| throws ManifoldCFException |
| { |
| if (client == null) |
| { |
| int socketTimeout = 900000; |
| int connectionTimeout = 60000; |
| |
| // Load configuration from parameters |
| final ElasticSearchConfig config = new ElasticSearchConfig(params); |
| final IKeystoreManager keystoreManager = config.getSSLKeystore(); |
| final String userName = config.getUserName(); |
| final String password = config.getPassword(); |
| |
| final Credentials credentials; |
| if (userName != null && userName.length() > 0) |
| credentials = new UsernamePasswordCredentials(userName, password); |
| else |
| credentials = null; |
| |
| // Set up ingest ssl if indicated |
| SSLConnectionSocketFactory myFactory = null; |
| if (keystoreManager != null) |
| { |
| myFactory = new SSLConnectionSocketFactory(new InterruptibleSocketFactory(keystoreManager.getSecureSocketFactory(), connectionTimeout), |
| NoopHostnameVerifier.INSTANCE); |
| } |
| else |
| { |
| myFactory = SSLConnectionSocketFactory.getSocketFactory(); |
| } |
| |
| // Set up connection manager |
| PoolingHttpClientConnectionManager poolingConnectionManager = new PoolingHttpClientConnectionManager(RegistryBuilder.<ConnectionSocketFactory>create() |
| .register("http", PlainConnectionSocketFactory.getSocketFactory()) |
| .register("https", myFactory) |
| .build()); |
| poolingConnectionManager.setDefaultMaxPerRoute(1); |
| poolingConnectionManager.setValidateAfterInactivity(2000); |
| poolingConnectionManager.setDefaultSocketConfig(SocketConfig.custom() |
| .setTcpNoDelay(true) |
| .setSoTimeout(socketTimeout) |
| .build()); |
| connectionManager = poolingConnectionManager; |
| |
| CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
| if (credentials != null) |
| { |
| credentialsProvider.setCredentials(AuthScope.ANY, credentials); |
| } |
| |
| RequestConfig.Builder requestBuilder = RequestConfig.custom() |
| .setCircularRedirectsAllowed(true) |
| .setSocketTimeout(socketTimeout) |
| .setExpectContinueEnabled(true) |
| .setConnectTimeout(connectionTimeout) |
| .setConnectionRequestTimeout(socketTimeout); |
| |
| client = HttpClients.custom() |
| .setConnectionManager(connectionManager) |
| .setMaxConnTotal(1) |
| .disableAutomaticRetries() |
| .setDefaultRequestConfig(requestBuilder.build()) |
| .setDefaultCredentialsProvider(credentialsProvider) |
| .setRequestExecutor(new HttpRequestExecutor(socketTimeout)) |
| .build(); |
| |
| |
| } |
| expirationTime = System.currentTimeMillis() + EXPIRATION_INTERVAL; |
| return client; |
| } |
| |
| protected void closeSession() |
| { |
| if (connectionManager != null) |
| { |
| connectionManager.shutdown(); |
| connectionManager = null; |
| } |
| client = null; |
| expirationTime = -1L; |
| } |
| |
| /** Create a hashed URI string for ID according to ElasticSearch parameters. |
| * @param domumentURI is the URI of the document. |
| * @return hashed URI. |
| */ |
| protected static String compressDocumentURI(String documentURI) |
| throws ManifoldCFException |
| { |
| // If the ID is too long, we must do things to reduce its size. This involves hashing, but |
| // for backwards compatibility we only do it if it is too long. |
| if (documentURI.length() <= maxIdLength) |
| return documentURI; |
| return ManifoldCF.hash(documentURI); |
| } |
| |
| /** This method is called to assess whether to count this connector instance should |
| * actually be counted as being connected. |
| *@return true if the connector instance is actually connected. |
| */ |
| @Override |
| public boolean isConnected() |
| { |
| return connectionManager != null; |
| } |
| |
| @Override |
| public void disconnect() |
| throws ManifoldCFException |
| { |
| super.disconnect(); |
| closeSession(); |
| } |
| |
| |
| @Override |
| public void poll() |
| throws ManifoldCFException |
| { |
| super.poll(); |
| if (connectionManager != null) |
| { |
| if (System.currentTimeMillis() > expirationTime) |
| { |
| closeSession(); |
| } |
| } |
| } |
| |
| @Override |
| public String[] getActivitiesList() |
| { |
| return ELASTICSEARCH_ACTIVITIES; |
| } |
| |
| /** Read the content of a resource, replace the variable ${PARAMNAME} with the |
| * value and copy it to the out. |
| * |
| * @param resName |
| * @param out |
| * @throws ManifoldCFException */ |
| private static void outputResource(String resName, IHTTPOutput out, |
| Locale locale, ElasticSearchParam params, |
| String tabName, Integer sequenceNumber, Integer currentSequenceNumber) throws ManifoldCFException |
| { |
| Map<String,Object> paramMap = null; |
| if (params != null) { |
| paramMap = params.buildMap(out); |
| if (tabName != null) { |
| paramMap.put("TabName", tabName); |
| } |
| if (currentSequenceNumber != null) |
| paramMap.put("SelectedNum",currentSequenceNumber.toString()); |
| } |
| else |
| { |
| paramMap = new HashMap<String,Object>(); |
| } |
| if (sequenceNumber != null) |
| paramMap.put("SeqNum",sequenceNumber.toString()); |
| |
| Messages.outputResourceWithVelocity(out, locale, resName, paramMap); |
| } |
| |
| @Override |
| public void outputConfigurationHeader(IThreadContext threadContext, |
| IHTTPOutput out, Locale locale, ConfigParams parameters, |
| List<String> tabsArray) throws ManifoldCFException, IOException |
| { |
| super.outputConfigurationHeader(threadContext, out, locale, parameters, |
| tabsArray); |
| tabsArray.add(Messages.getString(locale, ELASTICSEARCH_TAB_SERVER)); |
| tabsArray.add(Messages.getString(locale, ELASTICSEARCH_TAB_PARAMETERS)); |
| outputResource(EDIT_CONFIG_HEADER_FORWARD, out, locale, null, null, null, null); |
| } |
| |
| @Override |
| public void outputConfigurationBody(IThreadContext threadContext, |
| IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName) |
| throws ManifoldCFException, IOException |
| { |
| try { |
| super.outputConfigurationBody(threadContext, out, locale, parameters, |
| tabName); |
| ElasticSearchConfig config = this.getConfigParameters(parameters); |
| outputResource(EDIT_CONFIG_FORWARD_SERVER, out, locale, config, tabName, null, null); |
| outputResource(EDIT_CONFIG_FORWARD_PARAMETERS, out, locale, config, tabName, null, null); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| /** Build a Set of ElasticSearch parameters. If configParams is null, |
| * getConfiguration() is used. |
| * |
| * @param configParams */ |
| final private ElasticSearchConfig getConfigParameters( |
| ConfigParams configParams) |
| { |
| if (configParams == null) |
| configParams = getConfiguration(); |
| return new ElasticSearchConfig(configParams); |
| } |
| |
| @Override |
| public VersionContext getPipelineDescription(Specification os) |
| throws ManifoldCFException |
| { |
| return new VersionContext("",params,os); |
| } |
| |
| @Override |
| public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out, |
| Locale locale, ConfigParams parameters) throws ManifoldCFException, |
| IOException |
| { |
| outputResource(VIEW_CONFIG_FORWARD, out, locale, |
| getConfigParameters(parameters), null, null, null); |
| } |
| |
| @Override |
| public String processConfigurationPost(IThreadContext threadContext, |
| IPostParameters variableContext, ConfigParams parameters) |
| throws ManifoldCFException |
| { |
| return ElasticSearchConfig.contextToConfig(threadContext, variableContext, parameters); |
| } |
| |
| /** Convert an unqualified ACL to qualified form. |
| * @param acl is the initial, unqualified ACL. |
| * @param authorityNameString is the name of the governing authority for this document's acls, or null if none. |
| * @param activities is the activities object, so we can report what's happening. |
| * @return the modified ACL. |
| */ |
| protected static String[] convertACL(String[] acl, String authorityNameString, IOutputAddActivity activities) |
| throws ManifoldCFException |
| { |
| if (acl != null) |
| { |
| String[] rval = new String[acl.length]; |
| int i = 0; |
| while (i < rval.length) |
| { |
| rval[i] = activities.qualifyAccessToken(authorityNameString,acl[i]); |
| i++; |
| } |
| return rval; |
| } |
| return new String[0]; |
| } |
| |
| /** Add (or replace) a document in the output data store using the connector. |
| * This method presumes that the connector object has been configured, and it is thus able to communicate with the output data store should that be |
| * necessary. |
| *@param documentURI is the URI of the document. The URI is presumed to be the unique identifier which the output data store will use to process |
| * and serve the document. This URI is constructed by the repository connector which fetches the document, and is thus universal across all output connectors. |
| *@param pipelineDescription includes the description string that was constructed for this document by the getOutputDescription() method. |
| *@param document is the document data to be processed (handed to the output data store). |
| *@param authorityNameString is the name of the authority responsible for authorizing any access tokens passed in with the repository document. May be null. |
| *@param activities is the handle to an object that the implementer of a pipeline connector may use to perform operations, such as logging processing activity, |
| * or sending a modified document to the next stage in the pipeline. |
| *@return the document status (accepted or permanently rejected). |
| *@throws IOException only if there's a stream error reading the document data. |
| */ |
| @Override |
| public int addOrReplaceDocumentWithException(String documentURI, VersionContext pipelineDescription, |
| RepositoryDocument document, String authorityNameString, |
| IOutputAddActivity activities) throws ManifoldCFException, |
| ServiceInterruption, IOException |
| { |
| String compressedDocumentURI = compressDocumentURI(documentURI); |
| HttpClient client = getSession(); |
| ElasticSearchConfig config = getConfigParameters(null); |
| |
| InputStream inputStream = document.getBinaryStream(); |
| // For ES, we have to have fixed fields only; nothing else is possible b/c we don't have |
| // default field values. |
| String[] acls = null; |
| String[] denyAcls = null; |
| String[] shareAcls = null; |
| String[] shareDenyAcls = null; |
| String[] parentAcls = null; |
| String[] parentDenyAcls = null; |
| Iterator<String> a = document.securityTypesIterator(); |
| while (a.hasNext()) |
| { |
| String securityType = a.next(); |
| String[] convertedAcls = convertACL(document.getSecurityACL(securityType),authorityNameString,activities); |
| String[] convertedDenyAcls = convertACL(document.getSecurityDenyACL(securityType),authorityNameString,activities); |
| if (securityType.equals(RepositoryDocument.SECURITY_TYPE_DOCUMENT)) |
| { |
| acls = convertedAcls; |
| denyAcls = convertedDenyAcls; |
| } |
| else if (securityType.equals(RepositoryDocument.SECURITY_TYPE_SHARE)) |
| { |
| shareAcls = convertedAcls; |
| shareDenyAcls = convertedDenyAcls; |
| } |
| else if (securityType.equals(RepositoryDocument.SECURITY_TYPE_PARENT)) |
| { |
| parentAcls = convertedAcls; |
| parentDenyAcls = convertedDenyAcls; |
| } |
| else |
| { |
| // Don't know how to deal with it |
| activities.recordActivity(null, ELASTICSEARCH_INDEXATION_ACTIVITY, document.getBinaryLength(), documentURI, activities.UNKNOWN_SECURITY, "Rejected document that has security info which ElasticSearch does not recognize: '"+ securityType + "'"); |
| return DOCUMENTSTATUS_REJECTED; |
| } |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| ElasticSearchIndex oi = new ElasticSearchIndex(client, config); |
| try |
| { |
| oi.execute(compressedDocumentURI, document, inputStream, acls, denyAcls, shareAcls, shareDenyAcls, parentAcls, parentDenyAcls, documentURI); |
| if (oi.getResult() != Result.OK) |
| return DOCUMENTSTATUS_REJECTED; |
| return DOCUMENTSTATUS_ACCEPTED; |
| } |
| finally |
| { |
| activities.recordActivity(startTime, ELASTICSEARCH_INDEXATION_ACTIVITY, |
| document.getBinaryLength(), documentURI, oi.getResultCode(), oi.getResultDescription()); |
| } |
| } |
| |
| @Override |
| public void removeDocument(String documentURI, String outputDescription, |
| IOutputRemoveActivity activities) throws ManifoldCFException, |
| ServiceInterruption |
| { |
| String compressedDocumentURI = compressDocumentURI(documentURI); |
| HttpClient client = getSession(); |
| long startTime = System.currentTimeMillis(); |
| ElasticSearchDelete od = new ElasticSearchDelete(client, getConfigParameters(null)); |
| try |
| { |
| od.execute(compressedDocumentURI); |
| } |
| finally |
| { |
| activities.recordActivity(startTime, ELASTICSEARCH_DELETION_ACTIVITY, null, |
| documentURI, od.getResultCode(), od.getResultDescription()); |
| } |
| } |
| |
| @Override |
| public String check() throws ManifoldCFException |
| { |
| HttpClient client = getSession(); |
| ElasticSearchAction oss = new ElasticSearchAction(client, getConfigParameters(null)); |
| try |
| { |
| oss.executeGET(CommandEnum._stats, true); |
| String resultName = oss.getResult().name(); |
| if (resultName.equals("OK")) |
| return super.check(); |
| return resultName + " " + oss.getResultDescription(); |
| } |
| catch (ServiceInterruption e) |
| { |
| return "Transient exception: "+e.getMessage(); |
| } |
| } |
| |
| } |