blob: 0ae288dcb435247679c9d89a9e5da08f8b92bbf9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
/**
* Maintains the configuration xml file that is read into memory.
*/
class ConfigManager {
public static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.ConfigManager");
/** Time to wait between checks of the config file */
public static final long RELOAD_INTERVAL = 10 * 1000;
/** Time to wait between successive runs of all policies */
public static final long RESCAN_INTERVAL = 3600 * 1000;
public static final long HAR_PARTFILE_SIZE = 10 * 1024 * 1024 * 1024l;
public static final int DISTRAID_MAX_JOBS = 10;
public static final int DISTRAID_MAX_FILES = 10000;
/**
* Time to wait after the config file has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long RELOAD_WAIT = 5 * 1000;
private Configuration conf; // Hadoop configuration
private String configFileName; // Path to config XML file
private long lastReloadAttempt; // Last time we tried to reload the config file
private long lastSuccessfulReload; // Last time we successfully reloaded config
private boolean lastReloadAttemptFailed = false;
private long reloadInterval = RELOAD_INTERVAL;
private long periodicity; // time between runs of all policies
private long harPartfileSize;
private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for
// a job.
private int maxFilesPerJob; // Max no. of files raided by a job.
// Reload the configuration
private boolean doReload;
private Thread reloadThread;
private volatile boolean running = false;
// Collection of all configured policies.
Collection<PolicyList> allPolicies = new ArrayList<PolicyList>();
public ConfigManager(Configuration conf) throws IOException, SAXException,
RaidConfigurationException, ClassNotFoundException, ParserConfigurationException {
this.conf = conf;
this.configFileName = conf.get("raid.config.file");
this.doReload = conf.getBoolean("raid.config.reload", true);
this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
this.periodicity = conf.getLong("raid.policy.rescan.interval", RESCAN_INTERVAL);
this.harPartfileSize = conf.getLong("raid.har.partfile.size", HAR_PARTFILE_SIZE);
this.maxJobsPerPolicy = conf.getInt("raid.distraid.max.jobs",
DISTRAID_MAX_JOBS);
this.maxFilesPerJob = conf.getInt("raid.distraid.max.files",
DISTRAID_MAX_FILES);
if (configFileName == null) {
String msg = "No raid.config.file given in conf - " +
"the Hadoop Raid utility cannot run. Aborting....";
LOG.warn(msg);
throw new IOException(msg);
}
reloadConfigs();
lastSuccessfulReload = RaidNode.now();
lastReloadAttempt = RaidNode.now();
running = true;
}
/**
* Reload config file if it hasn't been loaded in a while
* Returns true if the file was reloaded.
*/
public synchronized boolean reloadConfigsIfNecessary() {
long time = RaidNode.now();
if (time > lastReloadAttempt + reloadInterval) {
lastReloadAttempt = time;
try {
File file = new File(configFileName);
long lastModified = file.lastModified();
if (lastModified > lastSuccessfulReload &&
time > lastModified + RELOAD_WAIT) {
reloadConfigs();
lastSuccessfulReload = time;
lastReloadAttemptFailed = false;
return true;
}
} catch (Exception e) {
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload config file - " +
"will use existing configuration.", e);
}
lastReloadAttemptFailed = true;
}
}
return false;
}
/**
* Updates the in-memory data structures from the config file. This file is
* expected to be in the following whitespace-separated format:
*
<configuration>
<srcPath prefix="hdfs://hadoop.myhost.com:9000/user/warehouse/u_full/*">
<policy name = RaidScanWeekly>
<destPath> hdfs://dfsname.myhost.com:9000/archive/</destPath>
<parentPolicy> RaidScanMonthly</parentPolicy>
<property>
<name>targetReplication</name>
<value>2</value>
<description> after RAIDing, decrease the replication factor of the file to
this value.
</description>
</property>
<property>
<name>metaReplication</name>
<value>2</value>
<description> the replication factor of the RAID meta file
</description>
</property>
<property>
<name>stripeLength</name>
<value>10</value>
<description> the number of blocks to RAID together
</description>
</property>
</policy>
</srcPath>
</configuration>
*
* Blank lines and lines starting with # are ignored.
*
* @throws IOException if the config file cannot be read.
* @throws RaidConfigurationException if configuration entries are invalid.
* @throws ClassNotFoundException if user-defined policy classes cannot be loaded
* @throws ParserConfigurationException if XML parser is misconfigured.
* @throws SAXException if config file is malformed.
* @returns A new set of policy categories.
*/
void reloadConfigs() throws IOException, ParserConfigurationException,
SAXException, ClassNotFoundException, RaidConfigurationException {
if (configFileName == null) {
return;
}
File file = new File(configFileName);
if (!file.exists()) {
throw new RaidConfigurationException("Configuration file " + configFileName +
" does not exist.");
}
// Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully.
List<PolicyList> all = new ArrayList<PolicyList>();
long periodicityValue = periodicity;
// Read and parse the configuration file.
// allow include files in configuration file
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
docBuilderFactory.setNamespaceAware(true);
try {
docBuilderFactory.setXIncludeAware(true);
} catch (UnsupportedOperationException e) {
LOG.error("Failed to set setXIncludeAware(true) for raid parser "
+ docBuilderFactory + ":" + e, e);
}
LOG.error("Reloading config file " + file);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(file);
Element root = doc.getDocumentElement();
if (!"configuration".equalsIgnoreCase(root.getTagName()))
throw new RaidConfigurationException("Bad configuration file: " +
"top-level element not <configuration>");
NodeList elements = root.getChildNodes();
Map<String, PolicyInfo> existingPolicies =
new HashMap<String, PolicyInfo>();
// loop through all the configured source paths.
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (!(node instanceof Element)) {
continue;
}
Element element = (Element)node;
String elementTagName = element.getTagName();
if ("srcPath".equalsIgnoreCase(elementTagName)) {
String srcPathPrefix = element.getAttribute("prefix");
PolicyList policyList = null;
if (srcPathPrefix != null && srcPathPrefix.length() != 0) {
// Empty srcPath will have no effect but policies will be processed
// This allow us to define some "abstract" policies
policyList = new PolicyList();
all.add(policyList);
policyList.setSrcPath(conf, srcPathPrefix);
}
// loop through all the policies for this source path
NodeList policies = element.getChildNodes();
for (int j = 0; j < policies.getLength(); j++) {
Node node1 = policies.item(j);
if (!(node1 instanceof Element)) {
continue;
}
Element policy = (Element)node1;
if (!"policy".equalsIgnoreCase(policy.getTagName())) {
throw new RaidConfigurationException("Bad configuration file: " +
"Expecting <policy> for srcPath " + srcPathPrefix);
}
String policyName = policy.getAttribute("name");
PolicyInfo curr = new PolicyInfo(policyName, conf);
if (srcPathPrefix != null && srcPathPrefix.length() > 0) {
curr.setSrcPath(srcPathPrefix);
}
// loop through all the properties of this policy
NodeList properties = policy.getChildNodes();
PolicyInfo parent = null;
for (int k = 0; k < properties.getLength(); k++) {
Node node2 = properties.item(k);
if (!(node2 instanceof Element)) {
continue;
}
Element property = (Element)node2;
String propertyName = property.getTagName();
if ("erasureCode".equalsIgnoreCase(propertyName)) {
String text = ((Text)property.getFirstChild()).getData().trim();
LOG.info(policyName + ".erasureCode = " + text);
curr.setErasureCode(text);
} else if ("description".equalsIgnoreCase(propertyName)) {
String text = ((Text)property.getFirstChild()).getData().trim();
curr.setDescription(text);
} else if ("parentPolicy".equalsIgnoreCase(propertyName)) {
String text = ((Text)property.getFirstChild()).getData().trim();
parent = existingPolicies.get(text);
} else if ("property".equalsIgnoreCase(propertyName)) {
NodeList nl = property.getChildNodes();
String pname=null,pvalue=null;
for (int l = 0; l < nl.getLength(); l++){
Node node3 = nl.item(l);
if (!(node3 instanceof Element)) {
continue;
}
Element item = (Element) node3;
String itemName = item.getTagName();
if ("name".equalsIgnoreCase(itemName)){
pname = ((Text)item.getFirstChild()).getData().trim();
} else if ("value".equalsIgnoreCase(itemName)){
pvalue = ((Text)item.getFirstChild()).getData().trim();
}
}
if (pname != null && pvalue != null) {
LOG.info(policyName + "." + pname + " = " + pvalue);
curr.setProperty(pname,pvalue);
}
} else {
LOG.warn("Found bad property " + propertyName +
" for srcPath" + srcPathPrefix +
" policy name " + policyName +
". Ignoring.");
}
} // done with all properties of this policy
PolicyInfo pinfo;
if (parent != null) {
pinfo = new PolicyInfo(policyName, conf);
pinfo.copyFrom(parent);
pinfo.copyFrom(curr);
} else {
pinfo = curr;
}
if (policyList != null) {
policyList.add(pinfo);
}
existingPolicies.put(policyName, pinfo);
} // done with all policies for this srcpath
}
} // done with all srcPaths
setAllPolicies(all);
periodicity = periodicityValue;
return;
}
public synchronized long getPeriodicity() {
return periodicity;
}
public synchronized long getHarPartfileSize() {
return harPartfileSize;
}
public synchronized int getMaxJobsPerPolicy() {
return maxJobsPerPolicy;
}
public synchronized int getMaxFilesPerJob() {
return maxFilesPerJob;
}
/**
* Get a collection of all policies
*/
public synchronized Collection<PolicyList> getAllPolicies() {
return new ArrayList(allPolicies);
}
/**
* Set a collection of all policies
*/
protected synchronized void setAllPolicies(Collection<PolicyList> value) {
this.allPolicies = value;
}
/**
* Start a background thread to reload the config file
*/
void startReload() {
if (doReload) {
reloadThread = new UpdateThread();
reloadThread.start();
}
}
/**
* Stop the background thread that reload the config file
*/
void stopReload() throws InterruptedException {
if (reloadThread != null) {
running = false;
reloadThread.interrupt();
reloadThread.join();
reloadThread = null;
}
}
/**
* A thread which reloads the config file.
*/
private class UpdateThread extends Thread {
private UpdateThread() {
super("Raid update thread");
}
public void run() {
while (running) {
try {
Thread.sleep(reloadInterval);
reloadConfigsIfNecessary();
} catch (InterruptedException e) {
// do nothing
} catch (Exception e) {
LOG.error("Failed to reload config file ", e);
}
}
}
}
}