| /* $Id: ElasticSearchConnector.java 1299512 2012-03-12 00:58:38Z piergiorgio $ */ |
| |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.manifoldcf.agents.output.elasticsearch; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| |
| import org.apache.http.conn.ClientConnectionManager; |
| import org.apache.http.impl.conn.PoolingClientConnectionManager; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.impl.client.DefaultHttpClient; |
| |
| import org.apache.commons.io.FilenameUtils; |
| import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity; |
| import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity; |
| import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity; |
| import org.apache.manifoldcf.agents.interfaces.OutputSpecification; |
| import org.apache.manifoldcf.agents.interfaces.RepositoryDocument; |
| import org.apache.manifoldcf.agents.interfaces.ServiceInterruption; |
| import org.apache.manifoldcf.agents.output.BaseOutputConnector; |
| import org.apache.manifoldcf.agents.output.elasticsearch.ElasticSearchAction.CommandEnum; |
| import org.apache.manifoldcf.agents.output.elasticsearch.ElasticSearchConnection.Result; |
| import org.apache.manifoldcf.core.interfaces.ConfigParams; |
| import org.apache.manifoldcf.core.interfaces.ConfigurationNode; |
| import org.apache.manifoldcf.core.interfaces.IHTTPOutput; |
| import org.apache.manifoldcf.core.interfaces.IPostParameters; |
| import org.apache.manifoldcf.core.interfaces.IThreadContext; |
| import org.apache.manifoldcf.core.interfaces.ManifoldCFException; |
| import org.apache.manifoldcf.core.interfaces.SpecificationNode; |
| import org.json.JSONException; |
| import org.json.JSONObject; |
| |
| /** |
| * This is the "output connector" for elasticsearch. |
| * |
| * @author Luca Stancapiano |
| */ |
| public class ElasticSearchConnector extends BaseOutputConnector |
| { |
| |
| private final static String ELASTICSEARCH_INDEXATION_ACTIVITY = "Indexation"; |
| private final static String ELASTICSEARCH_DELETION_ACTIVITY = "Deletion"; |
| private final static String ELASTICSEARCH_OPTIMIZE_ACTIVITY = "Optimize"; |
| |
| private final static String[] ELASTICSEARCH_ACTIVITIES = |
| { ELASTICSEARCH_INDEXATION_ACTIVITY, ELASTICSEARCH_DELETION_ACTIVITY, |
| ELASTICSEARCH_OPTIMIZE_ACTIVITY }; |
| |
| private final static String ELASTICSEARCH_TAB_ELASTICSEARCH = "ElasticSearchConnector.ElasticSearch"; |
| private final static String ELASTICSEARCH_TAB_PARAMETERS = "ElasticSearchConnector.Parameters"; |
| |
| /** Forward to the javascript to check the configuration parameters */ |
| private static final String EDIT_CONFIG_HEADER_FORWARD = "editConfiguration.js"; |
| |
| /** Forward to the HTML template to edit the configuration parameters */ |
| private static final String EDIT_CONFIG_FORWARD_PARAMETERS = "editConfiguration_Parameters.html"; |
| |
| /** Forward to the HTML template to view the configuration parameters */ |
| private static final String VIEW_CONFIG_FORWARD = "viewConfiguration.html"; |
| |
| /** Forward to the javascript to check the specification parameters for the job */ |
| private static final String EDIT_SPEC_HEADER_FORWARD = "editSpecification.js"; |
| |
| /** Forward to the template to edit the configuration parameters for the job */ |
| private static final String EDIT_SPEC_FORWARD_ELASTICSEARCH = "editSpecification_ElasticSearch.html"; |
| |
| /** Forward to the template to view the specification parameters for the job */ |
| private static final String VIEW_SPEC_FORWARD = "viewSpecification.html"; |
| |
| |
| private ClientConnectionManager connectionManager = null; |
| private HttpClient client = null; |
| |
| public ElasticSearchConnector() |
| { |
| } |
| |
| @Override |
| public void connect(ConfigParams configParams) |
| { |
| super.connect(configParams); |
| PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager(); |
| localConnectionManager.setMaxTotal(1); |
| connectionManager = localConnectionManager; |
| DefaultHttpClient localClient = new DefaultHttpClient(connectionManager); |
| client = localClient; |
| } |
| |
| @Override |
| public void disconnect() |
| throws ManifoldCFException |
| { |
| super.disconnect(); |
| connectionManager.shutdown(); |
| connectionManager = null; |
| client = null; |
| } |
| |
| @Override |
| public void poll() |
| throws ManifoldCFException |
| { |
| super.poll(); |
| // Free idle connections in the pool. |
| // MHL |
| } |
| |
| @Override |
| public String[] getActivitiesList() |
| { |
| return ELASTICSEARCH_ACTIVITIES; |
| } |
| |
| /** Read the content of a resource, replace the variable ${PARAMNAME} with the |
| * value and copy it to the out. |
| * |
| * @param resName |
| * @param out |
| * @throws ManifoldCFException */ |
| private static void outputResource(String resName, IHTTPOutput out, |
| Locale locale, ElasticSearchParam params, String tabName) throws ManifoldCFException |
| { |
| Map<String,String> paramMap = null; |
| if (params != null) { |
| paramMap = params.buildMap(); |
| if (tabName != null) { |
| paramMap.put("TabName", tabName); |
| } |
| } |
| Messages.outputResourceWithVelocity(out, locale, resName, paramMap, true); |
| } |
| |
| @Override |
| public void outputConfigurationHeader(IThreadContext threadContext, |
| IHTTPOutput out, Locale locale, ConfigParams parameters, |
| List<String> tabsArray) throws ManifoldCFException, IOException |
| { |
| super.outputConfigurationHeader(threadContext, out, locale, parameters, |
| tabsArray); |
| tabsArray.add(Messages.getString(locale, ELASTICSEARCH_TAB_PARAMETERS)); |
| outputResource(EDIT_CONFIG_HEADER_FORWARD, out, locale, null, null); |
| } |
| |
| @Override |
| public void outputConfigurationBody(IThreadContext threadContext, |
| IHTTPOutput out, Locale locale, ConfigParams parameters, String tabName) |
| throws ManifoldCFException, IOException |
| { |
| super.outputConfigurationBody(threadContext, out, locale, parameters, |
| tabName); |
| ElasticSearchConfig config = this.getConfigParameters(parameters); |
| outputResource(EDIT_CONFIG_FORWARD_PARAMETERS, out, locale, config, tabName); |
| } |
| |
| @Override |
| public void outputSpecificationHeader(IHTTPOutput out, Locale locale, |
| OutputSpecification os, List<String> tabsArray) |
| throws ManifoldCFException, IOException |
| { |
| super.outputSpecificationHeader(out, locale, os, tabsArray); |
| tabsArray.add(Messages.getString(locale, ELASTICSEARCH_TAB_ELASTICSEARCH)); |
| outputResource(EDIT_SPEC_HEADER_FORWARD, out, locale, null, null); |
| } |
| |
| final private SpecificationNode getSpecNode(OutputSpecification os) |
| { |
| int l = os.getChildCount(); |
| for (int i = 0; i < l; i++) |
| { |
| SpecificationNode node = os.getChild(i); |
| if (ElasticSearchSpecs.ELASTICSEARCH_SPECS_NODE.equals(node.getType())) |
| { |
| return node; |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public void outputSpecificationBody(IHTTPOutput out, Locale locale, |
| OutputSpecification os, String tabName) throws ManifoldCFException, |
| IOException |
| { |
| super.outputSpecificationBody(out, locale, os, tabName); |
| ElasticSearchSpecs specs = getSpecParameters(os); |
| outputResource(EDIT_SPEC_FORWARD_ELASTICSEARCH, out, locale, specs, tabName); |
| } |
| |
| @Override |
| public String processSpecificationPost(IPostParameters variableContext, |
| Locale locale, OutputSpecification os) throws ManifoldCFException |
| { |
| ConfigurationNode specNode = getSpecNode(os); |
| boolean bAdd = (specNode == null); |
| if (bAdd) |
| { |
| specNode = new SpecificationNode( |
| ElasticSearchSpecs.ELASTICSEARCH_SPECS_NODE); |
| } |
| ElasticSearchSpecs.contextToSpecNode(variableContext, specNode); |
| if (bAdd) |
| os.addChild(os.getChildCount(), specNode); |
| return null; |
| } |
| |
| /** Build a Set of ElasticSearch parameters. If configParams is null, |
| * getConfiguration() is used. |
| * |
| * @param configParams */ |
| final private ElasticSearchConfig getConfigParameters( |
| ConfigParams configParams) |
| { |
| if (configParams == null) |
| configParams = getConfiguration(); |
| return new ElasticSearchConfig(configParams); |
| } |
| |
| final private ElasticSearchSpecs getSpecParameters(OutputSpecification os) |
| throws ManifoldCFException |
| { |
| return new ElasticSearchSpecs(getSpecNode(os)); |
| } |
| |
| final private ElasticSearchSpecs getSpecsCache(String outputDescription) |
| throws ManifoldCFException |
| { |
| try |
| { |
| return new ElasticSearchSpecs(new JSONObject(outputDescription)); |
| } catch (JSONException e) |
| { |
| throw new ManifoldCFException(e); |
| } |
| } |
| |
| @Override |
| public String getOutputDescription(OutputSpecification os) |
| throws ManifoldCFException |
| { |
| ElasticSearchSpecs specs = new ElasticSearchSpecs(getSpecNode(os)); |
| return specs.toJson().toString(); |
| } |
| |
| @Override |
| public boolean checkLengthIndexable(String outputDescription, long length) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| ElasticSearchSpecs specs = getSpecsCache(outputDescription); |
| long maxFileSize = specs.getMaxFileSize(); |
| if (length > maxFileSize) |
| return false; |
| return super.checkLengthIndexable(outputDescription, length); |
| } |
| |
| @Override |
| public boolean checkDocumentIndexable(String outputDescription, File localFile) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| ElasticSearchSpecs specs = getSpecsCache(outputDescription); |
| return specs |
| .checkExtension(FilenameUtils.getExtension(localFile.getName())); |
| } |
| |
| @Override |
| public boolean checkMimeTypeIndexable(String outputDescription, |
| String mimeType) throws ManifoldCFException, ServiceInterruption |
| { |
| ElasticSearchSpecs specs = getSpecsCache(outputDescription); |
| return specs.checkMimeType(mimeType); |
| } |
| |
| @Override |
| public void viewConfiguration(IThreadContext threadContext, IHTTPOutput out, |
| Locale locale, ConfigParams parameters) throws ManifoldCFException, |
| IOException |
| { |
| outputResource(VIEW_CONFIG_FORWARD, out, locale, |
| getConfigParameters(parameters), null); |
| } |
| |
| @Override |
| public void viewSpecification(IHTTPOutput out, Locale locale, |
| OutputSpecification os) throws ManifoldCFException, IOException |
| { |
| outputResource(VIEW_SPEC_FORWARD, out, locale, getSpecParameters(os), null); |
| } |
| |
| @Override |
| public String processConfigurationPost(IThreadContext threadContext, |
| IPostParameters variableContext, ConfigParams parameters) |
| throws ManifoldCFException |
| { |
| ElasticSearchConfig.contextToConfig(variableContext, parameters); |
| return null; |
| } |
| |
| @Override |
| public int addOrReplaceDocument(String documentURI, String outputDescription, |
| RepositoryDocument document, String authorityNameString, |
| IOutputAddActivity activities) throws ManifoldCFException, |
| ServiceInterruption |
| { |
| ElasticSearchConfig config = getConfigParameters(null); |
| InputStream inputStream = document.getBinaryStream(); |
| long startTime = System.currentTimeMillis(); |
| ElasticSearchIndex oi = new ElasticSearchIndex(client, documentURI, |
| document, inputStream, config); |
| activities.recordActivity(startTime, ELASTICSEARCH_INDEXATION_ACTIVITY, |
| document.getBinaryLength(), documentURI, oi.getResult().name(), |
| oi.getResultDescription()); |
| if (oi.getResult() != Result.OK) |
| return DOCUMENTSTATUS_REJECTED; |
| return DOCUMENTSTATUS_ACCEPTED; |
| } |
| |
| @Override |
| public void removeDocument(String documentURI, String outputDescription, |
| IOutputRemoveActivity activities) throws ManifoldCFException, |
| ServiceInterruption |
| { |
| long startTime = System.currentTimeMillis(); |
| ElasticSearchDelete od = new ElasticSearchDelete(client, documentURI, |
| getConfigParameters(null)); |
| activities.recordActivity(startTime, ELASTICSEARCH_DELETION_ACTIVITY, null, |
| documentURI, od.getResult().name(), od.getResultDescription()); |
| } |
| |
| @Override |
| public String check() throws ManifoldCFException |
| { |
| ElasticSearchAction oss = new ElasticSearchAction(client, CommandEnum._status, |
| getConfigParameters(null), true); |
| String resultName = oss.getResult().name(); |
| if (resultName.equals("OK")) |
| return super.check(); |
| return resultName + " " + oss.getResultDescription(); |
| } |
| |
| @Override |
| public void noteJobComplete(IOutputNotifyActivity activities) |
| throws ManifoldCFException, ServiceInterruption |
| { |
| long startTime = System.currentTimeMillis(); |
| ElasticSearchAction oo = new ElasticSearchAction(client, CommandEnum._optimize, |
| getConfigParameters(null), false); |
| activities.recordActivity(startTime, ELASTICSEARCH_OPTIMIZE_ACTIVITY, null, |
| oo.getCallUrlSnippet(), oo.getResult().name(), |
| oo.getResultDescription()); |
| } |
| |
| } |