Pull up changes from CONNECTORS-818 branch
git-svn-id: https://svn.apache.org/repos/asf/manifoldcf/branches/CONNECTORS-818-2@1613116 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/build.xml b/build.xml
index c762675..71854b9 100644
--- a/build.xml
+++ b/build.xml
@@ -51,7 +51,6 @@
<echo message="Overlay -lib package, or run 'make-core-deps' target first"/>
</target>
-
<target name="build-site" depends="downloaded-check" if="downloaded">
<ant dir="site" target="all"/>
</target>
@@ -1432,6 +1431,17 @@
</antcall>
</target>
+ <target name="download-rabbitmq-client">
+ <mkdir dir="lib"/>
+ <antcall target="download-via-maven">
+ <param name="target" value="lib"/>
+ <param name="project-path" value="com/rabbitmq"/>
+ <param name="artifact-version" value="3.1.4"/>
+ <param name="artifact-name" value="amqp-client"/>
+ <param name="artifact-type" value="jar"/>
+ </antcall>
+ </target>
+
<target name="download-google-api-client">
<mkdir dir="lib"/>
<antcall target="download-via-maven">
@@ -1802,7 +1812,7 @@
</antcall>
</target>
- <target name="make-core-deps" depends="download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-wstx-asl,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-derby,download-postgresql,download-axis,download-saaj,download-wsdl4j,download-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-protobuf,download-tika,download-jackson">
+ <target name="make-core-deps" depends="download-mongo-java-driver,download-jira-client,download-google-api-client,download-dropbox-client,download-solrj,download-zookeeper,download-httpcomponents,download-json,download-hsqldb,download-xerces,download-commons,download-elasticsearch-plugin,download-solr-plugins,download-sharepoint-plugins,download-jstl,download-xmlgraphics-commons,download-wstx-asl,download-xmlsec,download-xml-apis,download-wss4j,download-velocity,download-streambuffer,download-stax,download-servlet-api,download-xml-resolver,download-osgi,download-opensaml,download-mimepull,download-mail,download-log4j,download-junit,download-jaxws,download-glassfish,download-jaxb,download-tomcat,download-h2,download-h2-support,download-geronimo-specs,download-fop,download-derby,download-postgresql,download-axis,download-saaj,download-wsdl4j,download-castor,download-jetty,download-slf4j,download-xalan,download-activation,download-avalon-framework,download-poi,download-chemistry,download-ecj,download-hadoop,download-protobuf,download-tika,download-jackson,download-rabbitmq-client">
<copy todir="lib">
<fileset dir="lib-license" includes="*.txt"/>
</copy>
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 93540af..e693506 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -45,7 +45,6 @@
<module>nulloutput</module>
<module>nulltransformation</module>
<module>rss</module>
- <module>sharepoint</module>
<module>solr</module>
<module>webcrawler</module>
<module>cmis</module>
@@ -63,6 +62,7 @@
<module>forcedmetadata</module>
<module>tika</module>
<module>documentfilter</module>
+ <module>rabbitmq</module>
</modules>
</project>
diff --git a/connectors/rabbitmq/build.xml b/connectors/rabbitmq/build.xml
new file mode 100644
index 0000000..8bd69a5
--- /dev/null
+++ b/connectors/rabbitmq/build.xml
@@ -0,0 +1,56 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project name="rabbitmq" default="all">
+
+ <property environment="env"/>
+ <condition property="mcf-dist" value="${env.MCFDISTPATH}">
+ <isset property="env.MCFDISTPATH"/>
+ </condition>
+ <property name="abs-dist" location="../../dist"/>
+ <condition property="mcf-dist" value="${abs-dist}">
+ <not>
+ <isset property="env.MCFDISTPATH"/>
+ </not>
+ </condition>
+
+ <import file="${mcf-dist}/connector-build.xml"/>
+
+ <path id="connector-classpath">
+ <path refid="mcf-connector-build.connector-classpath"/>
+ <fileset dir="../../lib">
+ <include name="amqp*.jar"/>
+ </fileset>
+ </path>
+
+ <target name="lib" depends="mcf-connector-build.lib,precompile-check" if="canBuild">
+ <mkdir dir="dist/lib"/>
+ <copy todir="dist/lib">
+ <fileset dir="../../lib">
+ <include name="amqp*.jar"/>
+ </fileset>
+ </copy>
+ </target>
+
+ <target name="deliver-connector" depends="mcf-connector-build.deliver-connector">
+ <!-- antcall target="general-add-repository-connector">
+ <param name="connector-label" value="RabbitMQ"/>
+ <param name="connector-class" value="org.apache.manifoldcf.???"/>
+ </antcall -->
+ </target>
+
+</project>
diff --git a/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
new file mode 100644
index 0000000..f491994
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/OutboundDocument.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.rabbitmq;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Iterator;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.core.common.Base64;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class OutboundDocument {
+ protected static final String allowAttributeName = "allow_token_";
+ protected static final String denyAttributeName = "deny_token_";
+ protected static final String noSecurityToken = "__nosecurity__";
+ protected static final boolean useNullValue = false;
+ private RepositoryDocument document;
+ private InputStream inputStream;
+ private String documentURI;
+ public static enum Operation { ADD, UPDATE, DELETE };
+ Operation operation = Operation.ADD;
+
+ public OutboundDocument(RepositoryDocument document) {
+ this.document = document;
+ this.inputStream = document.getBinaryStream();
+ }
+
+ public OutboundDocument(String documentUri) {
+ this.documentURI = documentUri;
+ }
+
+ public OutboundDocument() {
+ }
+
+ public RepositoryDocument getDocument() {
+ return this.document;
+ }
+
+ public String getDocumentURI() {
+ return this.documentURI;
+ }
+
+ public void setOperation(Operation operation) {
+ this.operation = operation;
+ }
+
+
+ // TODO: write to Logstash format, or support
+ // a range of inputs.
+ public String writeTo(Writer out) throws JSONException, IOException,
+ ManifoldCFException {
+ JSONObject json = new JSONObject();
+
+ json.put("documentUri", this.documentURI);
+ json.put("operation", this.operation);
+
+ if (operation != Operation.DELETE) {
+ json.put("acl", this.document.getACL());
+ json.put("acl_deny", this.document.getDenyACL());
+ json.put("acl_share", this.document.getShareACL());
+ json.put("acl_share_deny", this.document.getShareDenyACL());
+
+ JSONObject fields = new JSONObject();
+ json.put("fields", fields);
+ Iterator i = this.document.getFields();
+ while (i.hasNext()) {
+ String fieldName = (String) i.next();
+ String[] fieldValues = this.document.getFieldAsStrings(fieldName);
+ fields.put(fieldName, fieldValues);
+ }
+
+ Base64 base64 = new Base64();
+ StringWriter outputWriter = new StringWriter();
+ // TODO: We can not, in general, assume we can
+ // fit the entire stream in memory.
+ base64.encodeStream(this.inputStream, outputWriter);
+
+
+ JSONObject file = new JSONObject();
+ file.put("content", outputWriter.toString());
+ file.put("name", this.document.getFileName());
+ outputWriter.close();
+ json.put("file", file);
+ }
+
+ json.write(out);
+ return json.toString();
+ }
+}
diff --git a/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqConfig.java b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqConfig.java
new file mode 100644
index 0000000..a756e9a
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqConfig.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2013 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.manifoldcf.agents.output.rabbitmq;
+
+import org.apache.manifoldcf.core.interfaces.ConfigParams;
+import org.apache.manifoldcf.core.interfaces.IPostParameters;
+import org.apache.manifoldcf.crawler.system.Logging;
+
+/**
+ *
+ * @author Christian
+ */
+public class RabbitmqConfig {
+
+ public static final String hostParameter = "host";
+ public static final String queueParameter = "queue";
+ public static final String durableParameter = "durable";
+ public static final String autoDeleteParameter = "autodelete";
+ public static final String exclusiveParameter = "exclusive";
+ public static final String transactionParameter = "transaction";
+
+ private String queueName = "manifoldcf";
+ private String host = "localhost";
+ private int port;
+ private boolean durable = true;
+ private boolean exclusive = false;
+ private boolean autoDelete = false;
+ private boolean useTransactions = false;
+
+
+ RabbitmqConfig(ConfigParams configParams) {
+ this.host = configParams.getParameter(hostParameter);
+ this.queueName = configParams.getParameter(queueParameter);
+ extractAutoDeleteParameter(configParams);
+ extractExclusiveParameter(configParams);
+ extractDurableParameter(configParams);
+ }
+
+ public final static void contextToConfig(IPostParameters variableContext,
+ ConfigParams parameters)
+ {
+ String host = variableContext.getParameter(hostParameter);
+ if (host != null) {
+ parameters.setParameter(hostParameter, host);
+ }
+ String queue = variableContext.getParameter(queueParameter);
+ if (queue != null) {
+ parameters.setParameter(queueParameter, queue);
+ }
+ String _autodelete = variableContext.getParameter(autoDeleteParameter);
+ if (_autodelete != null) {
+ parameters.setParameter(autoDeleteParameter, _autodelete);
+ }
+ String _durable = variableContext.getParameter(durableParameter);
+ if (_durable != null) {
+ parameters.setParameter(durableParameter, _durable);
+ }
+ String _exclusive = variableContext.getParameter(_durable);
+ if (_exclusive != null) {
+ parameters.setParameter(exclusiveParameter, _exclusive);
+ }
+ }
+
+
+ private void extractAutoDeleteParameter(ConfigParams variableContext) {
+ String _active = variableContext.getParameter(autoDeleteParameter);
+ if(_active != null) {
+ this.autoDelete = Boolean.parseBoolean(_active);
+ Logging.connectors.debug("Channel parameter active set to " + this.autoDelete);
+ } else {
+ this.autoDelete = false;
+ Logging.connectors.debug("Channel parameter active parameter not set, defaults to " + this.autoDelete);
+ }
+ }
+
+ private void extractDurableParameter(ConfigParams variableContext) {
+ String _durable = variableContext.getParameter(durableParameter);
+ if(_durable != null) {
+ this.durable = Boolean.parseBoolean(_durable);
+ Logging.connectors.debug("Channel parameter durable set to " + this.durable);
+ } else {
+ this.autoDelete = true;
+ Logging.connectors.debug("Channel parameter durable parameter not set, defaults to " + this.durable);
+ }
+ }
+
+ private void extractExclusiveParameter(ConfigParams variableContext) {
+ String _exclusive = variableContext.getParameter(exclusiveParameter);
+ if(_exclusive != null) {
+ this.exclusive = Boolean.parseBoolean(_exclusive);
+ Logging.connectors.debug("Channel parameter exclusive set to " + this.exclusive);
+ } else {
+ this.exclusive = false;
+ Logging.connectors.debug("Channel parameter exclusive parameter not set, defaults to " + this.exclusive);
+ }
+ }
+
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public boolean isExclusive() {
+ return exclusive;
+ }
+
+ public boolean isAutoDelete() {
+ return autoDelete;
+ }
+
+ public boolean isUseTransactions() {
+ return useTransactions;
+ }
+}
diff --git a/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
new file mode 100644
index 0000000..ae77892
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/java/org/apache/manifoldcf/agents/output/rabbitmq/RabbitmqOutputConnector.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.manifoldcf.agents.output.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
+import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
+import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
+import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
+import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
+import org.apache.manifoldcf.agents.output.BaseOutputConnector;
+import org.apache.manifoldcf.core.interfaces.ConfigParams;
+import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
+import org.apache.manifoldcf.core.interfaces.IPostParameters;
+import org.apache.manifoldcf.core.interfaces.IThreadContext;
+import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.crawler.system.Logging;
+import org.apache.manifoldcf.ui.util.Encoder;
+import org.json.JSONException;
+
+/**
+ *
+ * @author Christian Rieck, Comperio AS
+ */
+public class RabbitmqOutputConnector extends BaseOutputConnector {
+
+ RabbitmqConfig rabbitconfig;
+ public static final String INGEST_ACTIVITY = "document ingest";
+ public static final String REMOVE_ACTIVITY = "document deletion";
+
+
+ private ConnectionFactory factory = new ConnectionFactory();
+ private Connection connection = null;
+ private Channel channel = null;
+
+ @Override
+ public void connect(ConfigParams configParams) {
+ super.connect(configParams);
+ rabbitconfig = new RabbitmqConfig(configParams);
+ }
+
+ public void outputConfigurationHeader(IThreadContext threadContext,
+ IHTTPOutput out, Locale locale, ConfigParams parameters,
+ List<String> tabsArray) throws ManifoldCFException, IOException {
+ tabsArray.add("Settings");
+ }
+
+ public void outputConfigurationBody(IThreadContext threadContext,
+ IHTTPOutput out, Locale locale, ConfigParams parameters,
+ String tabName) throws ManifoldCFException, IOException {
+ // called when displaying editable configuration tables
+ // NPE when edited in use of rabbitconfig. need to parse?
+ System.out.println("outputConfigurationBody called");
+ if (rabbitconfig == null) {
+ rabbitconfig = new RabbitmqConfig(parameters);
+ }
+ if (tabName.equals("Settings")) {
+ out.print("<table class=\"displaytable\">\n"
+ + " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n"
+ + " <tr>\n"
+ + " <td class=\"description\"><nobr>Host</nobr></td><td class=\"value\">\n"
+ + " <input type=\"text\" name=\"host\" value=\""+rabbitconfig.getHost()+"\"/>\n"
+ + " </td>\n"
+ + " </tr>\n"
+ + " <tr>\n"
+ + " <td class=\"description\"><nobr>Queue</nobr></td><td class=\"value\">\n"
+ + " <input type=\"text\" name=\"queue\" value=\""+rabbitconfig.getQueueName()+"\"/>\n"
+ + " </td>\n"
+ + " </tr>\n"
+ + " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n"
+ + "</table>");
+ }
+ }
+
+ @Override
+ public boolean checkDocumentIndexable(String outputDescription,
+ File localFile) throws ManifoldCFException, ServiceInterruption {
+ // System.out.println(outputDescription);
+
+ return true;
+ }
+
+ @Override
+ public void viewConfiguration(IThreadContext threadContext,
+ IHTTPOutput out, Locale locale, ConfigParams parameters)
+ throws ManifoldCFException, IOException {
+ // called when viewing output connector, simple enough
+ System.out.println("viewConfiguration called");
+ if (rabbitconfig == null) {
+ rabbitconfig = new RabbitmqConfig(parameters);
+ }
+ String host = rabbitconfig.getHost();
+ String queueName = rabbitconfig.getQueueName();
+ out.print("<table class=\"displaytable\"> \n"
+ + " <tr> \n"
+ + " <td class=\"value\" colspan=\"3\"> \n"
+ + " <nobr>Directory = " + Encoder.bodyEscape(host) + "</nobr><br/>\n"
+ + " </td>\n"
+ + " </tr>\n"
+ + " <tr> \n"
+ + " <td class=\"value\" colspan=\"3\"> \n"
+ + " <nobr>Directory = " + Encoder.bodyEscape(queueName) + "</nobr><br/>\n"
+ + " </td>\n"
+ + " </tr>\n"
+ + "</table>");
+ }
+
+ @Override
+ public void viewSpecification(IHTTPOutput out, Locale locale,
+ OutputSpecification os) throws ManifoldCFException, IOException {
+ super.viewSpecification(out, locale, os);
+ }
+
+ @Override
+ public String processConfigurationPost(IThreadContext threadContext,
+ IPostParameters variableContext, ConfigParams parameters)
+ throws ManifoldCFException {
+ // called each time the tab changes when creating a new output connector, at least
+ RabbitmqConfig.contextToConfig(variableContext, parameters);
+ return null;
+ }
+
+
+ /**
+ * Remove a document using the connector. Note that the last
+ * outputDescription is included, since it may be necessary for the
+ * connector to use such information to know how to properly remove the
+ * document.
+ *
+ * @param documentURI is the URI of the document. The URI is presumed to be
+ * the unique identifier which the output data store will use to process and
+ * serve the document. This URI is constructed by the repository connector
+ * which fetches the document, and is thus universal across all output
+ * connectors.
+ * @param outputDescription is the last description string that was
+ * constructed for this document by the getOutputDescription() method above.
+ * @param activities is the handle to an object that the implementer of an
+ * output connector may use to perform operations, such as logging
+ * processing activity.
+ */
+ @Override
+ public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities)
+ throws ManifoldCFException, ServiceInterruption {
+ if (Logging.connectors.isDebugEnabled()) {
+ Logging.connectors.debug("Deleting document: " + documentURI);
+ }
+ connectToRabbitInstance();
+
+ OutboundDocument rawDocument = new OutboundDocument(documentURI);
+ rawDocument.operation = OutboundDocument.Operation.DELETE;
+ try {
+ String bindingName = "";
+ String json = jsonSerialize(rawDocument);
+ byte[] bytes = json.getBytes();
+ channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC, bytes);
+ activities.recordActivity(null, "deletion message sent", 0l, documentURI, "OK", "Deletion message sendt");
+ } catch (Exception e) {
+ Logging.connectors.error(
+ "Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
+ activities.recordActivity(null, "Failed to push to rabbitmq", 0l, documentURI, "ERROR", e.getMessage());
+ }
+
+ }
+
+ @Override
+ public int addOrReplaceDocument(String documentURI,
+ String outputDescription, RepositoryDocument document,
+ String authorityNameString, IOutputAddActivity activities)
+ throws ManifoldCFException, ServiceInterruption {
+ if (Logging.connectors.isDebugEnabled()) {
+ Logging.connectors.debug("New document: " + documentURI);
+ }
+ connectToRabbitInstance();
+ if (sendDocument(document, activities, documentURI)) {
+ return 1;
+ }
+
+ return 0;
+ }
+
+ private void connectToRabbitInstance() throws ManifoldCFException {
+ factory.setHost(rabbitconfig.getHost());
+
+ try {
+ if (connection == null || !connection.isOpen()) {
+ connection = factory.newConnection();
+ }
+ if (channel == null || !connection.isOpen()) {
+ channel = connection.createChannel();
+ }
+ // TODO: problems with shutting down client correctly.
+ // com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
+ Map<java.lang.String,java.lang.Object> arguments = null;
+ channel.queueDeclare(rabbitconfig.getQueueName(),
+ rabbitconfig.isDurable(),
+ rabbitconfig.isExclusive(),
+ rabbitconfig.isAutoDelete(),
+ arguments);
+
+ } catch (IOException e1) {
+ Logging.connectors.error("Failed to initialize connection to rabbitmq, "+ e1.getMessage());
+ // TODO Log to activities?
+ throw new ManifoldCFException("Failed to initialize connection to rabbitmq", e1);
+ }
+ }
+
+ private boolean sendDocument(RepositoryDocument document, IOutputAddActivity activities, String documentURI) {
+ try {
+ String bindingName = "";
+ byte[] bytes = convertToRabbitDocument(document);
+ channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC, bytes);
+ activities.recordActivity(null, "document ingest", new Long(document.getBinaryLength()), documentURI, "OK", null);
+ } catch (Exception e) {
+ Logging.connectors.error(
+ "Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void noteJobComplete(IOutputNotifyActivity activities)
+ throws ManifoldCFException, ServiceInterruption {
+
+ try {
+ // channel and connection will be null if no new documents was found
+ // in the current job. addDocument() is not called and the
+ // initializations are not made.
+ if (channel != null && channel.isOpen()) {
+ channel.close();
+ }
+ if (connection != null && connection.isOpen()) {
+ connection.close();
+ }
+ } catch (IOException e) {
+ Logging.connectors.error("Could not close channel or connection to rabbitmq: " + e);
+ } catch (NullPointerException npe) {
+ Logging.connectors.error("Channel or connection was null", npe);
+ } catch (Exception e) {
+ Logging.connectors.error(e.getMessage(), e);
+ }
+ }
+
+
+ /** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
+ * to help filter out documents that are too long to be indexable.
+ *@param outputDescription is the document's output version.
+ *@param length is the length of the document.
+ *@return true if the file is indexable.
+ */
+ @Override
+ public boolean checkLengthIndexable(String outputDescription, long length)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ // TODO: inspect outputDescription to parse of length, then check.
+ // Logging.connectors.error("Document length:, "+ length + " vs " + outputDescription);
+ return true;
+ }
+
+ private byte[] convertToRabbitDocument(RepositoryDocument document) throws IOException, JSONException,
+ ManifoldCFException {
+ if (Logging.connectors.isDebugEnabled()) {
+ Logging.connectors.debug("Atempting to serialize " + document.getFileName());
+ }
+ OutboundDocument rawDocument = new OutboundDocument(document);
+ String json = jsonSerialize(rawDocument);
+ return json.getBytes();
+ }
+
+ private String jsonSerialize(OutboundDocument outbound) throws IOException, JSONException,
+ ManifoldCFException{
+
+ StringWriter sw = new StringWriter();
+ BufferedWriter out = new BufferedWriter(sw);
+ String jsons = outbound.writeTo(out);
+
+ out.close();
+ sw.close();
+ return jsons;
+ }
+}
diff --git a/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_en_US.properties b/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_en_US.properties
new file mode 100644
index 0000000..9ead60d
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_en_US.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+RabbitmqConnector.HostTabName=Output Path
+RabbitmqConnector.HostLabel=Root path:
+RabbitmqConnector.QueueLabel=Queue:
+RabbitmqConnector.DurableLabel=Durable:
+RabbitmqConnector.AutodeleteLabel=Auto Delete:
+RabbitmqConnector.ExclusiveLabel=Exclusive:
+RabbitmqConnector.HostCannotBeNull=Host cannot be null
+RabbitmqConnector.QueueNameCannotBeNull=Queue name cannot be null
diff --git a/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_ja_JP.properties b/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_ja_JP.properties
new file mode 100644
index 0000000..006a450
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/native2ascii/org/apache/manifoldcf/agents/output/rabbitmq/common_ja_JP.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FileConnector.PathTabName=出力パス
+FileConnector.RootPath=ルートパス
+FileConnector.RootPathCannotBeNull=Root path cannot be null
diff --git a/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html b/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
new file mode 100644
index 0000000..51478d5
--- /dev/null
+++ b/connectors/rabbitmq/connector/src/main/resources/org/apache/manifoldcf/agents/output/rabbitmq/viewConfiguration.html
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+RabbitmqConnector.HostTabName=RabbitMQ
+RabbitmqConnector.HostLabel=Host:
+RabbitmqConnector.QueueLabel=Queue:
+RabbitmqConnector.DurableLabel=Durable:
+RabbitmqConnector.AutodeleteLabel=Auto Delete:
+RabbitmqConnector.ExclusiveLabel=Exclusive:
+RabbitmqConnector.HostCannotBeNull=Host cannot be null
+RabbitmqConnector.QueueNameCannotBeNull=Queue name cannot be null
+
+
+<table class="displaytable">
+ <tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.HostLabel'))</nobr>
+ $Encoder.bodyEscape($ResourceBundle.getString('ElasticSearchConnector.URLColon'))</td>
+ <td class="value">$Encoder.bodyEscape($HOST)</td>
+ </tr>
+ <tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.QueueLabel'))</nobr></td>
+ <td class="value">$Encoder.bodyEscape($QUEUENAME)</td>
+ </tr>
+ <tr>
+ <td class="description"><nobr>$Encoder.bodyEscape($ResourceBundle.getString('RabbitmqConnector.DurableLabel'))</nobr></td>
+ <td class="value">$Encoder.bodyEscape($DURABLE)</td>
+ </tr>
+</table>
\ No newline at end of file
diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml
new file mode 100644
index 0000000..867e2fd
--- /dev/null
+++ b/connectors/rabbitmq/pom.xml
@@ -0,0 +1,390 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.manifoldcf</groupId>
+ <artifactId>mcf-connectors</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <developers>
+ <developer>
+ <name>Christian Marshall Rieck</name>
+ <organization>Comperio</organization>
+ <organizationUrl>http://www.comperiosearch.com</organizationUrl>
+ <url />
+ </developer>
+ </developers>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+
+ <artifactId>mcf-rabbitmq-connector</artifactId>
+ <name>ManifoldCF - Connectors - RabbitMQ</name>
+
+ <build>
+ <defaultGoal>integration-test</defaultGoal>
+ <sourceDirectory>${basedir}/connector/src/main/java</sourceDirectory>
+ <testSourceDirectory>${basedir}/connector/src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>${basedir}/connector/src/main/native2ascii</directory>
+ <includes>
+ <include>**/*.properties</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>${basedir}/connector/src/main/resources</directory>
+ <includes>
+ <include>**/*.html</include>
+ <include>**/*.js</include>
+ </includes>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>${basedir}/connector/src/test/resources</directory>
+ </testResource>
+ </testResources>
+
+ <plugins>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>native2ascii-maven-plugin</artifactId>
+ <version>1.0-alpha-1</version>
+ <configuration>
+ <dest>target/classes</dest>
+ <src>connector/src/main/native2ascii</src>
+ </configuration>
+ <executions>
+ <execution>
+ <id>native2ascii-utf8</id>
+ <goals>
+ <goal>native2ascii</goal>
+ </goals>
+ <configuration>
+ <encoding>UTF8</encoding>
+ <includes>**/*.properties</includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Test plugin configuration -->
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-war</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/dependency</outputDirectory>
+ <artifactItems>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-api-service</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <overWrite>false</overWrite>
+ <destFileName>mcf-api-service.war</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-authority-service</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <overWrite>false</overWrite>
+ <destFileName>mcf-authority-service.war</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-crawler-ui</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <overWrite>false</overWrite>
+ <destFileName>mcf-crawler-ui.war</destFileName>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*Postgresql*.java</exclude>
+ <exclude>**/*MySQL*.java</exclude>
+ </excludes>
+ <forkMode>always</forkMode>
+ <workingDirectory>target/test-output</workingDirectory>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12.3</version>
+ <configuration>
+ <skipTests>${skipITs}</skipTests>
+ <systemPropertyVariables>
+ <crawlerWarPath>../dependency/mcf-crawler-ui.war</crawlerWarPath>
+ <authorityserviceWarPath>../dependency/mcf-authority-service.war</authorityserviceWarPath>
+ <apiWarPath>../dependency/mcf-api-service.war</apiWarPath>
+ </systemPropertyVariables>
+ <excludes>
+ <exclude>**/*Postgresql*.java</exclude>
+ <exclude>**/*MySQL*.java</exclude>
+ </excludes>
+ <forkMode>always</forkMode>
+ <workingDirectory>target/test-output</workingDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <id>integration-test</id>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verify</id>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+ <dependencies>
+
+ <!-- Main build dependencies -->
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-agents</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-pull-agent</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-ui-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>3.1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpcomponent.httpclient.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>${commons-logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ </dependency>
+
+ <!-- Testing dependencies -->
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-agents</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-pull-agent</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${postgresql.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>${hsqldb.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-api-service</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-authority-service</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>mcf-crawler-ui</artifactId>
+ <version>${project.version}</version>
+ <type>war</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-webapp</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-xml</artifactId>
+ <version>${jetty.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1-glassfish</artifactId>
+ <version>${glassfish.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-glassfish</artifactId>
+ <version>${glassfish.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>