blob: 2a1140f7519beac9d4229a65a360640600e5750d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the base implementation class for services.
*/
@Public
@Evolving
public abstract class AbstractService implements Service {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractService.class);
/**
* Service name.
*/
private final String name;
/** service state */
private final ServiceStateModel stateModel;
/**
* Service start time. Will be zero until the service is started.
*/
private long startTime;
/**
* The configuration. Will be null until the service is initialized.
*/
private volatile Configuration config;
/**
* List of state change listeners; it is final to ensure
* that it will never be null.
*/
private final ServiceOperations.ServiceListeners listeners
= new ServiceOperations.ServiceListeners();
/**
* Static listeners to all events across all services
*/
private static ServiceOperations.ServiceListeners globalListeners
= new ServiceOperations.ServiceListeners();
/**
* The cause of any failure -will be null.
* if a service did not stop due to a failure.
*/
private Exception failureCause;
/**
* the state in which the service was when it failed.
* Only valid when the service is stopped due to a failure
*/
private STATE failureState = null;
/**
* object used to co-ordinate {@link #waitForServiceToStop(long)}
* across threads.
*/
private final AtomicBoolean terminationNotification =
new AtomicBoolean(false);
/**
* History of lifecycle transitions
*/
private final List<LifecycleEvent> lifecycleHistory
= new ArrayList<LifecycleEvent>(5);
/**
* Map of blocking dependencies
*/
private final Map<String,String> blockerMap = new HashMap<String, String>();
private final Object stateChangeLock = new Object();
/**
* Construct the service.
* @param name service name
*/
public AbstractService(String name) {
this.name = name;
stateModel = new ServiceStateModel(name);
}
@Override
public final STATE getServiceState() {
return stateModel.getState();
}
@Override
public final synchronized Throwable getFailureCause() {
return failureCause;
}
@Override
public synchronized STATE getFailureState() {
return failureState;
}
/**
* Set the configuration for this service.
* This method is called during {@link #init(Configuration)}
* and should only be needed if for some reason a service implementation
* needs to override that initial setting -for example replacing
* it with a new subclass of {@link Configuration}
* @param conf new configuration.
*/
protected void setConfig(Configuration conf) {
this.config = conf;
}
/**
* {@inheritDoc}
* This invokes {@link #serviceInit}
* @param conf the configuration of the service. This must not be null
* @throws ServiceStateException if the configuration was null,
* the state change not permitted, or something else went wrong
*/
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
/**
* {@inheritDoc}
* @throws ServiceStateException if the current service state does not permit
* this action
*/
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void stop() {
if (isInState(STATE.STOPPED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.STOPPED) != STATE.STOPPED) {
try {
serviceStop();
} catch (Exception e) {
//stop-time exceptions are logged if they are the first one,
noteFailure(e);
throw ServiceStateException.convert(e);
} finally {
//report that the service has terminated
terminationNotification.set(true);
synchronized (terminationNotification) {
terminationNotification.notifyAll();
}
//notify anything listening for events
notifyListeners();
}
} else {
//already stopped: note it
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring re-entrant call to stop()");
}
}
}
}
/**
* Relay to {@link #stop()}
* @throws IOException
*/
@Override
public final void close() throws IOException {
stop();
}
/**
* Failure handling: record the exception
* that triggered it -if there was not one already.
* Services are free to call this themselves.
* @param exception the exception
*/
protected final void noteFailure(Exception exception) {
if (LOG.isDebugEnabled()) {
LOG.debug("noteFailure " + exception, (Throwable) null);
}
if (exception == null) {
//make sure failure logic doesn't itself cause problems
return;
}
//record the failure details, and log it
synchronized (this) {
if (failureCause == null) {
failureCause = exception;
failureState = getServiceState();
LOG.info("Service " + getName()
+ " failed in state " + failureState
+ "; cause: " + exception,
exception);
}
}
}
@Override
public final boolean waitForServiceToStop(long timeout) {
boolean completed = terminationNotification.get();
while (!completed) {
try {
synchronized(terminationNotification) {
terminationNotification.wait(timeout);
}
// here there has been a timeout, the object has terminated,
// or there has been a spurious wakeup (which we ignore)
completed = true;
} catch (InterruptedException e) {
// interrupted; have another look at the flag
completed = terminationNotification.get();
}
}
return terminationNotification.get();
}
/* ===================================================================== */
/* Override Points */
/* ===================================================================== */
/**
* All initialization code needed by a service.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #init(Configuration)} prevents re-entrancy.
*
* The base implementation checks to see if the subclass has created
* a new configuration instance, and if so, updates the base class value
* @param conf configuration
* @throws Exception on a failure -these will be caught,
* possibly wrapped, and wil; trigger a service stop
*/
protected void serviceInit(Configuration conf) throws Exception {
if (conf != config) {
LOG.debug("Config has been overridden during init");
setConfig(conf);
}
}
/**
* Actions called during the INITED to STARTED transition.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #start()} prevents re-entrancy.
*
* @throws Exception if needed -these will be caught,
* wrapped, and trigger a service stop
*/
protected void serviceStart() throws Exception {
}
/**
* Actions called during the transition to the STOPPED state.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #stop()} prevents re-entrancy.
*
* Implementations MUST write this to be robust against failures, including
* checks for null references -and for the first failure to not stop other
* attempts to shut down parts of the service.
*
* @throws Exception if needed -these will be caught and logged.
*/
protected void serviceStop() throws Exception {
}
@Override
public void registerServiceListener(ServiceStateChangeListener l) {
listeners.add(l);
}
@Override
public void unregisterServiceListener(ServiceStateChangeListener l) {
listeners.remove(l);
}
/**
* Register a global listener, which receives notifications
* from the state change events of all services in the JVM
* @param l listener
*/
public static void registerGlobalListener(ServiceStateChangeListener l) {
globalListeners.add(l);
}
/**
* unregister a global listener.
* @param l listener to unregister
* @return true if the listener was found (and then deleted)
*/
public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
return globalListeners.remove(l);
}
/**
* Package-scoped method for testing -resets the global listener list
*/
@VisibleForTesting
static void resetGlobalListeners() {
globalListeners.reset();
}
@Override
public String getName() {
return name;
}
@Override
public Configuration getConfig() {
return config;
}
@Override
public long getStartTime() {
return startTime;
}
/**
* Notify local and global listeners of state changes.
* Exceptions raised by listeners are NOT passed up.
*/
private void notifyListeners() {
try {
listeners.notifyListeners(this);
globalListeners.notifyListeners(this);
} catch (Throwable e) {
LOG.warn("Exception while notifying listeners of " + this + ": " + e,
e);
}
}
/**
* Add a state change event to the lifecycle history
*/
private void recordLifecycleEvent() {
LifecycleEvent event = new LifecycleEvent();
event.time = System.currentTimeMillis();
event.state = getServiceState();
lifecycleHistory.add(event);
}
@Override
public synchronized List<LifecycleEvent> getLifecycleHistory() {
return new ArrayList<LifecycleEvent>(lifecycleHistory);
}
/**
* Enter a state; record this via {@link #recordLifecycleEvent}
* and log at the info level.
* @param newState the proposed new state
* @return the original state
* it wasn't already in that state, and the state model permits state re-entrancy.
*/
private STATE enterState(STATE newState) {
assert stateModel != null : "null state in " + name + " " + this.getClass();
STATE oldState = stateModel.enterState(newState);
if (oldState != newState) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Service: " + getName() + " entered state " + getServiceState());
}
recordLifecycleEvent();
}
return oldState;
}
@Override
public final boolean isInState(Service.STATE expected) {
return stateModel.isInState(expected);
}
@Override
public String toString() {
return "Service " + name + " in state " + stateModel;
}
/**
* Put a blocker to the blocker map -replacing any
* with the same name.
* @param name blocker name
* @param details any specifics on the block. This must be non-null.
*/
protected void putBlocker(String name, String details) {
synchronized (blockerMap) {
blockerMap.put(name, details);
}
}
/**
* Remove a blocker from the blocker map -
* this is a no-op if the blocker is not present
* @param name the name of the blocker
*/
public void removeBlocker(String name) {
synchronized (blockerMap) {
blockerMap.remove(name);
}
}
@Override
public Map<String, String> getBlockers() {
synchronized (blockerMap) {
Map<String, String> map = new HashMap<String, String>(blockerMap);
return map;
}
}
}