blob: 146c6d844f222bfd30ae2604f067d4edb062fa4b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.conf;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
/**
* Utility base class for implementing the Reconfigurable interface.
*
* Subclasses should override reconfigurePropertyImpl to change individual
* properties and getReconfigurableProperties to get all properties that
* can be changed at run time.
*/
public abstract class ReconfigurableBase
extends Configured implements Reconfigurable {
private static final Logger LOG =
LoggerFactory.getLogger(ReconfigurableBase.class);
// Use for testing purpose.
private ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
/** Background thread to reload configuration. */
private Thread reconfigThread = null;
private volatile boolean shouldRun = true;
private Object reconfigLock = new Object();
/**
* The timestamp when the <code>reconfigThread</code> starts.
*/
private long startTime = 0;
/**
* The timestamp when the <code>reconfigThread</code> finishes.
*/
private long endTime = 0;
/**
* A map of <changed property, error message>. If error message is present,
* it contains the messages about the error occurred when applies the particular
* change. Otherwise, it indicates that the change has been successfully applied.
*/
private Map<PropertyChange, Optional<String>> status = null;
/**
* Construct a ReconfigurableBase.
*/
public ReconfigurableBase() {
super(new Configuration());
}
/**
* Construct a ReconfigurableBase with the {@link Configuration}
* conf.
*/
public ReconfigurableBase(Configuration conf) {
super((conf == null) ? new Configuration() : conf);
}
@VisibleForTesting
public void setReconfigurationUtil(ReconfigurationUtil ru) {
reconfigurationUtil = Preconditions.checkNotNull(ru);
}
/**
* Create a new configuration.
*/
protected abstract Configuration getNewConf();
@VisibleForTesting
public Collection<PropertyChange> getChangedProperties(
Configuration newConf, Configuration oldConf) {
return reconfigurationUtil.parseChangedProperties(newConf, oldConf);
}
/**
* A background thread to apply configuration changes.
*/
private static class ReconfigurationThread extends Thread {
private ReconfigurableBase parent;
ReconfigurationThread(ReconfigurableBase base) {
this.parent = base;
}
// See {@link ReconfigurationServlet#applyChanges}
public void run() {
LOG.info("Starting reconfiguration task.");
final Configuration oldConf = parent.getConf();
final Configuration newConf = parent.getNewConf();
final Collection<PropertyChange> changes =
parent.getChangedProperties(newConf, oldConf);
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
ConfigRedactor newRedactor = new ConfigRedactor(newConf);
for (PropertyChange change : changes) {
String errorMessage = null;
String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
String newValRedacted = newRedactor.redact(change.prop, change.newVal);
if (!parent.isPropertyReconfigurable(change.prop)) {
LOG.info(String.format(
"Property %s is not configurable: old value: %s, new value: %s",
change.prop,
oldValRedacted,
newValRedacted));
continue;
}
LOG.info("Change property: " + change.prop + " from \""
+ ((change.oldVal == null) ? "<default>" : oldValRedacted)
+ "\" to \""
+ ((change.newVal == null) ? "<default>" : newValRedacted)
+ "\".");
try {
String effectiveValue =
parent.reconfigurePropertyImpl(change.prop, change.newVal);
if (change.newVal != null) {
oldConf.set(change.prop, effectiveValue);
} else {
oldConf.unset(change.prop);
}
} catch (ReconfigurationException e) {
errorMessage = e.getCause().getMessage();
}
results.put(change, Optional.fromNullable(errorMessage));
}
synchronized (parent.reconfigLock) {
parent.endTime = Time.now();
parent.status = Collections.unmodifiableMap(results);
parent.reconfigThread = null;
}
}
}
/**
* Start a reconfiguration task to reload configuration in background.
*/
public void startReconfigurationTask() throws IOException {
synchronized (reconfigLock) {
if (!shouldRun) {
String errorMessage = "The server is stopped.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
}
if (reconfigThread != null) {
String errorMessage = "Another reconfiguration task is running.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
}
reconfigThread = new ReconfigurationThread(this);
reconfigThread.setDaemon(true);
reconfigThread.setName("Reconfiguration Task");
reconfigThread.start();
startTime = Time.now();
}
}
public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
synchronized (reconfigLock) {
if (reconfigThread != null) {
return new ReconfigurationTaskStatus(startTime, 0, null);
}
return new ReconfigurationTaskStatus(startTime, endTime, status);
}
}
public void shutdownReconfigurationTask() {
Thread tempThread;
synchronized (reconfigLock) {
shouldRun = false;
if (reconfigThread == null) {
return;
}
tempThread = reconfigThread;
reconfigThread = null;
}
try {
tempThread.join();
} catch (InterruptedException e) {
}
}
/**
* {@inheritDoc}
*
* This method makes the change to this objects {@link Configuration}
* and calls reconfigurePropertyImpl to update internal data structures.
* This method cannot be overridden, subclasses should instead override
* reconfigureProperty.
*/
@Override
public final void reconfigureProperty(String property, String newVal)
throws ReconfigurationException {
if (isPropertyReconfigurable(property)) {
LOG.info("changing property " + property + " to " + newVal);
synchronized(getConf()) {
getConf().get(property);
String effectiveValue = reconfigurePropertyImpl(property, newVal);
if (newVal != null) {
getConf().set(property, effectiveValue);
} else {
getConf().unset(property);
}
}
} else {
throw new ReconfigurationException(property, newVal,
getConf().get(property));
}
}
/**
* {@inheritDoc}
*
* Subclasses must override this.
*/
@Override
public abstract Collection<String> getReconfigurableProperties();
/**
* {@inheritDoc}
*
* Subclasses may wish to override this with a more efficient implementation.
*/
@Override
public boolean isPropertyReconfigurable(String property) {
return getReconfigurableProperties().contains(property);
}
/**
* Change a configuration property.
*
* Subclasses must override this. This method applies the change to
* all internal data structures derived from the configuration property
* that is being changed. If this object owns other Reconfigurable objects
* reconfigureProperty should be called recursively to make sure that
* to make sure that the configuration of these objects is updated.
*
* @param property Name of the property that is being reconfigured.
* @param newVal Proposed new value of the property.
* @return Effective new value of the property. This may be different from
* newVal.
*
* @throws ReconfigurationException if there was an error applying newVal.
*/
protected abstract String reconfigurePropertyImpl(
String property, String newVal) throws ReconfigurationException;
}