blob: ae77892752e2558323cc133cb6bee94b64072d34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.manifoldcf.agents.output.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.manifoldcf.agents.interfaces.IOutputAddActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputNotifyActivity;
import org.apache.manifoldcf.agents.interfaces.IOutputRemoveActivity;
import org.apache.manifoldcf.agents.interfaces.OutputSpecification;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.agents.output.BaseOutputConnector;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.IThreadContext;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.crawler.system.Logging;
import org.apache.manifoldcf.ui.util.Encoder;
import org.json.JSONException;
/**
*
* @author Christian Rieck, Comperio AS
*/
public class RabbitmqOutputConnector extends BaseOutputConnector {
RabbitmqConfig rabbitconfig;
public static final String INGEST_ACTIVITY = "document ingest";
public static final String REMOVE_ACTIVITY = "document deletion";
private ConnectionFactory factory = new ConnectionFactory();
private Connection connection = null;
private Channel channel = null;
@Override
public void connect(ConfigParams configParams) {
super.connect(configParams);
rabbitconfig = new RabbitmqConfig(configParams);
}
public void outputConfigurationHeader(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters,
List<String> tabsArray) throws ManifoldCFException, IOException {
tabsArray.add("Settings");
}
public void outputConfigurationBody(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters,
String tabName) throws ManifoldCFException, IOException {
// called when displaying editable configuration tables
// NPE when edited in use of rabbitconfig. need to parse?
System.out.println("outputConfigurationBody called");
if (rabbitconfig == null) {
rabbitconfig = new RabbitmqConfig(parameters);
}
if (tabName.equals("Settings")) {
out.print("<table class=\"displaytable\">\n"
+ " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n"
+ " <tr>\n"
+ " <td class=\"description\"><nobr>Host</nobr></td><td class=\"value\">\n"
+ " <input type=\"text\" name=\"host\" value=\""+rabbitconfig.getHost()+"\"/>\n"
+ " </td>\n"
+ " </tr>\n"
+ " <tr>\n"
+ " <td class=\"description\"><nobr>Queue</nobr></td><td class=\"value\">\n"
+ " <input type=\"text\" name=\"queue\" value=\""+rabbitconfig.getQueueName()+"\"/>\n"
+ " </td>\n"
+ " </tr>\n"
+ " <tr><td class=\"separator\" colspan=\"2\"><hr/></td></tr>\n"
+ "</table>");
}
}
@Override
public boolean checkDocumentIndexable(String outputDescription,
File localFile) throws ManifoldCFException, ServiceInterruption {
// System.out.println(outputDescription);
return true;
}
@Override
public void viewConfiguration(IThreadContext threadContext,
IHTTPOutput out, Locale locale, ConfigParams parameters)
throws ManifoldCFException, IOException {
// called when viewing output connector, simple enough
System.out.println("viewConfiguration called");
if (rabbitconfig == null) {
rabbitconfig = new RabbitmqConfig(parameters);
}
String host = rabbitconfig.getHost();
String queueName = rabbitconfig.getQueueName();
out.print("<table class=\"displaytable\"> \n"
+ " <tr> \n"
+ " <td class=\"value\" colspan=\"3\"> \n"
+ " <nobr>Directory = " + Encoder.bodyEscape(host) + "</nobr><br/>\n"
+ " </td>\n"
+ " </tr>\n"
+ " <tr> \n"
+ " <td class=\"value\" colspan=\"3\"> \n"
+ " <nobr>Directory = " + Encoder.bodyEscape(queueName) + "</nobr><br/>\n"
+ " </td>\n"
+ " </tr>\n"
+ "</table>");
}
@Override
public void viewSpecification(IHTTPOutput out, Locale locale,
OutputSpecification os) throws ManifoldCFException, IOException {
super.viewSpecification(out, locale, os);
}
@Override
public String processConfigurationPost(IThreadContext threadContext,
IPostParameters variableContext, ConfigParams parameters)
throws ManifoldCFException {
// called each time the tab changes when creating a new output connector, at least
RabbitmqConfig.contextToConfig(variableContext, parameters);
return null;
}
/**
* Remove a document using the connector. Note that the last
* outputDescription is included, since it may be necessary for the
* connector to use such information to know how to properly remove the
* document.
*
* @param documentURI is the URI of the document. The URI is presumed to be
* the unique identifier which the output data store will use to process and
* serve the document. This URI is constructed by the repository connector
* which fetches the document, and is thus universal across all output
* connectors.
* @param outputDescription is the last description string that was
* constructed for this document by the getOutputDescription() method above.
* @param activities is the handle to an object that the implementer of an
* output connector may use to perform operations, such as logging
* processing activity.
*/
@Override
public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption {
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("Deleting document: " + documentURI);
}
connectToRabbitInstance();
OutboundDocument rawDocument = new OutboundDocument(documentURI);
rawDocument.operation = OutboundDocument.Operation.DELETE;
try {
String bindingName = "";
String json = jsonSerialize(rawDocument);
byte[] bytes = json.getBytes();
channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC, bytes);
activities.recordActivity(null, "deletion message sent", 0l, documentURI, "OK", "Deletion message sendt");
} catch (Exception e) {
Logging.connectors.error(
"Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
activities.recordActivity(null, "Failed to push to rabbitmq", 0l, documentURI, "ERROR", e.getMessage());
}
}
@Override
public int addOrReplaceDocument(String documentURI,
String outputDescription, RepositoryDocument document,
String authorityNameString, IOutputAddActivity activities)
throws ManifoldCFException, ServiceInterruption {
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("New document: " + documentURI);
}
connectToRabbitInstance();
if (sendDocument(document, activities, documentURI)) {
return 1;
}
return 0;
}
private void connectToRabbitInstance() throws ManifoldCFException {
factory.setHost(rabbitconfig.getHost());
try {
if (connection == null || !connection.isOpen()) {
connection = factory.newConnection();
}
if (channel == null || !connection.isOpen()) {
channel = connection.createChannel();
}
// TODO: problems with shutting down client correctly.
// com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel
Map<java.lang.String,java.lang.Object> arguments = null;
channel.queueDeclare(rabbitconfig.getQueueName(),
rabbitconfig.isDurable(),
rabbitconfig.isExclusive(),
rabbitconfig.isAutoDelete(),
arguments);
} catch (IOException e1) {
Logging.connectors.error("Failed to initialize connection to rabbitmq, "+ e1.getMessage());
// TODO Log to activities?
throw new ManifoldCFException("Failed to initialize connection to rabbitmq", e1);
}
}
private boolean sendDocument(RepositoryDocument document, IOutputAddActivity activities, String documentURI) {
try {
String bindingName = "";
byte[] bytes = convertToRabbitDocument(document);
channel.basicPublish(bindingName, rabbitconfig.getQueueName(), MessageProperties.PERSISTENT_BASIC, bytes);
activities.recordActivity(null, "document ingest", new Long(document.getBinaryLength()), documentURI, "OK", null);
} catch (Exception e) {
Logging.connectors.error(
"Failed to push to rabbitmq (" + rabbitconfig.getHost() + "): ", e);
return true;
}
return false;
}
@Override
public void noteJobComplete(IOutputNotifyActivity activities)
throws ManifoldCFException, ServiceInterruption {
try {
// channel and connection will be null if no new documents was found
// in the current job. addDocument() is not called and the
// initializations are not made.
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException e) {
Logging.connectors.error("Could not close channel or connection to rabbitmq: " + e);
} catch (NullPointerException npe) {
Logging.connectors.error("Channel or connection was null", npe);
} catch (Exception e) {
Logging.connectors.error(e.getMessage(), e);
}
}
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
*@param outputDescription is the document's output version.
*@param length is the length of the document.
*@return true if the file is indexable.
*/
@Override
public boolean checkLengthIndexable(String outputDescription, long length)
throws ManifoldCFException, ServiceInterruption
{
// TODO: inspect outputDescription to parse of length, then check.
// Logging.connectors.error("Document length:, "+ length + " vs " + outputDescription);
return true;
}
private byte[] convertToRabbitDocument(RepositoryDocument document) throws IOException, JSONException,
ManifoldCFException {
if (Logging.connectors.isDebugEnabled()) {
Logging.connectors.debug("Atempting to serialize " + document.getFileName());
}
OutboundDocument rawDocument = new OutboundDocument(document);
String json = jsonSerialize(rawDocument);
return json.getBytes();
}
private String jsonSerialize(OutboundDocument outbound) throws IOException, JSONException,
ManifoldCFException{
StringWriter sw = new StringWriter();
BufferedWriter out = new BufferedWriter(sw);
String jsons = outbound.writeTo(out);
out.close();
sw.close();
return jsons;
}
}