blob: 1e0518b6bcbc945b6da1ba79b9a163463dca3562 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.exchange;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
import org.apache.nutch.plugin.PluginRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.*;
public class Exchanges {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private Map<String, ExchangeConfigRelation> exchanges;
private ExchangeConfig defaultExchangeConfig;
private boolean availableExchanges = true;
public Exchanges(Configuration conf) {
try {
ExtensionPoint point = PluginRepository.get(conf)
.getExtensionPoint(Exchange.X_POINT_ID);
if (point == null) {
throw new RuntimeException(Exchange.X_POINT_ID + " not found.");
}
HashMap<String, Extension> extensionMap = new HashMap<>();
for (Extension extension : point.getExtensions()) {
extensionMap.putIfAbsent(extension.getClazz(), extension);
}
exchanges = new HashMap<>();
ExchangeConfig[] exchangeConfigs = loadConfigurations(conf);
for (ExchangeConfig exchangeConfig : exchangeConfigs) {
final String clazz = exchangeConfig.getClazz();
// If was enabled in plugin.includes property
if (extensionMap.containsKey(clazz)) {
ExchangeConfigRelation exchangeConfigRelation = new ExchangeConfigRelation(
(Exchange) extensionMap.get(clazz).getExtensionInstance(),
exchangeConfig);
exchanges.put(exchangeConfig.getId(), exchangeConfigRelation);
}
}
if (exchanges.isEmpty() && defaultExchangeConfig == null) {
availableExchanges = false;
LOG.warn("No exchange was configured. The documents will be routed to all index writers.");
}
} catch (PluginRuntimeException e) {
throw new RuntimeException(e);
}
}
public boolean areAvailableExchanges() {
return availableExchanges;
}
/**
* Loads the configuration of each exchange.
*
* @param conf Nutch's configuration.
* @return An array with each exchange's configuration.
*/
private ExchangeConfig[] loadConfigurations(Configuration conf) {
String filename = conf.get("exchanges.exchanges.file",
"exchanges.xml");
InputSource inputSource = new InputSource(
conf.getConfResourceAsInputStream(filename));
final List<ExchangeConfig> configList = new LinkedList<>();
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Element rootElement = builder.parse(inputSource).getDocumentElement();
NodeList exchangeList = rootElement.getElementsByTagName("exchange");
for (int i = 0; i < exchangeList.getLength(); i++) {
Element element = (Element) exchangeList.item(i);
ExchangeConfig exchangeConfig = ExchangeConfig.getInstance(element);
if ("default".equals(exchangeConfig.getClazz())) {
this.defaultExchangeConfig = exchangeConfig;
continue;
}
configList.add(exchangeConfig);
}
} catch (SAXException | IOException | ParserConfigurationException e) {
LOG.error(e.toString());
}
return configList.toArray(new ExchangeConfig[0]);
}
/**
* Opens each configured exchange.
*/
public void open() {
exchanges.forEach(
(id, value) -> value.exchange.open(value.config.getParameters()));
}
/**
* Returns all the indexers where the document must be sent to.
*
* @param nutchDocument The document to process.
* @return Indexers.
*/
public String[] indexWriters(final NutchDocument nutchDocument) {
final Set<String> writersIDs = new HashSet<>();
exchanges.forEach((id, value) -> {
if (value.exchange.match(nutchDocument)) {
writersIDs.addAll(Arrays.asList(value.config.getWritersIDs()));
}
});
// Using the default exchange if it's activated and there is not index writers for this document yet.
if (defaultExchangeConfig != null && writersIDs.isEmpty()) {
return defaultExchangeConfig.getWritersIDs();
}
return writersIDs.toArray(new String[0]);
}
/**
* Wrapper for a single exchange and its configuration.
*/
private class ExchangeConfigRelation {
private final Exchange exchange;
private final ExchangeConfig config;
ExchangeConfigRelation(Exchange exchange, ExchangeConfig config) {
this.exchange = exchange;
this.config = config;
}
}
}