blob: 0725fe6973c4946d018bde268c69ba385e7366ff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.services.docstore.utility;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
import org.apache.hadoop.service.ServiceStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* This resembles the YARN CompositeService, except that it
* starts one service after another: it's init & start operations
* only work with one service
*/
public class SequenceService extends AbstractService implements Parent,
ServiceStateChangeListener {
private static final Logger log =
LoggerFactory.getLogger(SequenceService.class);
/**
* list of services
*/
private final List<Service> serviceList = new ArrayList<Service>();
/**
* The current service.
* Volatile -may change & so should be read into a
* local variable before working with
*/
private volatile Service currentService;
/*
the previous service -the last one that finished.
Null if one did not finish yet
*/
private volatile Service previousService;
/**
* Create a service sequence with the given list of services
* @param name service name
* @param offspring initial sequence
*/
public SequenceService(String name, Service...offspring) {
super(name);
for (Service service : offspring) {
addService(service);
}
}
/**
* Get the current service -which may be null
* @return service running
*/
public Service getCurrentService() {
return currentService;
}
public Service getPreviousService() {
return previousService;
}
/**
* When started
* @throws Exception
*/
@Override
protected void serviceStart() throws Exception {
startNextService();
}
@Override
protected void serviceStop() throws Exception {
//stop current service.
//this triggers a callback that is caught and ignored
Service current = currentService;
previousService = current;
currentService = null;
if (current != null) {
current.stop();
}
}
/**
* Start the next service in the list.
* Return false if there are no more services to run, or this
* service has stopped
* @return true if a service was started
* @throws RuntimeException from any init or start failure
* @throws ServiceStateException if this call is made before
* the service is started
*/
public synchronized boolean startNextService() {
if (isInState(STATE.STOPPED)) {
//downgrade to a failed
log.debug("Not starting next service -{} is stopped", this);
return false;
}
if (!isInState(STATE.STARTED)) {
//reject attempts to start a service too early
throw new ServiceStateException(
"Cannot start a child service when not started");
}
if (serviceList.isEmpty()) {
//nothing left to run
return false;
}
if (currentService != null && currentService.getFailureCause() != null) {
//did the last service fail? Is this caused by some premature callback?
log.debug("Not starting next service due to a failure of {}",
currentService);
return false;
}
//bear in mind that init & start can fail, which
//can trigger re-entrant calls into the state change listener.
//by setting the current service to null
//the start-next-service logic is skipped.
//now, what does that mean w.r.t exit states?
currentService = null;
Service head = serviceList.remove(0);
try {
head.init(getConfig());
head.registerServiceListener(this);
head.start();
} catch (RuntimeException e) {
noteFailure(e);
throw e;
}
//at this point the service must have explicitly started & not failed,
//else an exception would have been raised
currentService = head;
return true;
}
/**
* State change event relays service stop events to
* {@link #onServiceCompleted(Service)}. Subclasses can
* extend that with extra logic
* @param service the service that has changed.
*/
@Override
public void stateChanged(Service service) {
if (service == currentService && service.isInState(STATE.STOPPED)) {
onServiceCompleted(service);
}
}
/**
* handler for service completion: base class starts the next service
* @param service service that has completed
*/
protected synchronized void onServiceCompleted(Service service) {
log.info("Running service stopped: {}", service);
previousService = currentService;
//start the next service if we are not stopped ourselves
if (isInState(STATE.STARTED)) {
//did the service fail? if so: propagate
Throwable failureCause = service.getFailureCause();
if (failureCause != null) {
Exception e = SliderServiceUtils.convertToException(failureCause);
noteFailure(e);
stop();
}
//start the next service
boolean started;
try {
started = startNextService();
} catch (Exception e) {
//something went wrong here
noteFailure(e);
started = false;
}
if (!started) {
//no start because list is empty
//stop and expect the notification to go upstream
stop();
}
} else {
//not started, so just note that the current service
//has gone away
currentService = null;
}
}
/**
* Add the passed {@link Service} to the list of services managed by this
* {@link SequenceService}
* @param service the {@link Service} to be added
*/
@Override //Parent
public synchronized void addService(Service service) {
log.debug("Adding service {} ", service.getName());
synchronized (serviceList) {
serviceList.add(service);
}
}
/**
* Get an unmodifiable list of services
* @return a list of child services at the time of invocation -
* added services will not be picked up.
*/
@Override //Parent
public synchronized List<Service> getServices() {
return Collections.unmodifiableList(serviceList);
}
@Override // Object
public synchronized String toString() {
return super.toString() + "; current service " + currentService
+ "; queued service count=" + serviceList.size();
}
}