blob: 033569cc17f137720b87ff30d50bea3aeed7951b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.monitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.STARTED;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_NOT_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.READY;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_WINDOW;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.DEFAULT_CONTAINER_FAILURE_WINDOW;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.DEFAULT_READINESS_CHECK_INTERVAL;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.READINESS_CHECK_INTERVAL;
public class ServiceMonitor extends AbstractService {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceMonitor.class);
public ScheduledExecutorService executorService;
private Map<ContainerId, ComponentInstance> liveInstances = null;
private ServiceContext context;
private Configuration conf;
public ServiceMonitor(String name, ServiceContext context) {
super(name);
liveInstances = context.scheduler.getLiveInstances();
this.context = context;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
executorService = Executors.newScheduledThreadPool(1);
this.conf = conf;
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
long readinessCheckInterval = YarnServiceConf
.getLong(READINESS_CHECK_INTERVAL, DEFAULT_READINESS_CHECK_INTERVAL,
context.service.getConfiguration(), conf);
executorService
.scheduleAtFixedRate(new ReadinessChecker(), readinessCheckInterval,
readinessCheckInterval, TimeUnit.SECONDS);
// Default 6 hours.
long failureResetInterval = YarnServiceConf
.getLong(CONTAINER_FAILURE_WINDOW, DEFAULT_CONTAINER_FAILURE_WINDOW,
context.service.getConfiguration(), conf);
executorService
.scheduleAtFixedRate(new ContainerFailureReset(), failureResetInterval,
failureResetInterval, TimeUnit.SECONDS);
}
@Override
public void serviceStop() throws Exception {
if (executorService != null) {
executorService.shutdownNow();
}
}
private class ReadinessChecker implements Runnable {
@Override
public void run() {
// check if the comp instance are ready
for (Map.Entry<ContainerId, ComponentInstance> entry : liveInstances
.entrySet()) {
ComponentInstance instance = entry.getValue();
ProbeStatus status = instance.ping();
if (status.isSuccess()) {
if (instance.getState() == STARTED) {
LOG.info("Readiness check succeeded for {}: {}", instance
.getCompInstanceName(), status);
// synchronously update the state.
instance.handle(
new ComponentInstanceEvent(entry.getKey(), BECOME_READY));
}
} else {
LOG.info("Readiness check failed for {}: {}", instance
.getCompInstanceName(), status);
if (instance.getState() == READY) {
instance.handle(
new ComponentInstanceEvent(entry.getKey(), BECOME_NOT_READY));
}
}
}
for (Component component : context.scheduler.getAllComponents()
.values()) {
// If comp hasn't started yet and its dependencies are satisfied
if (component.getState() == ComponentState.INIT && component
.areDependenciesReady()) {
LOG.info("[COMPONENT {}]: Dependencies satisfied, ramping up.",
component.getName());
ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
.setDesired(component.getComponentSpec().getNumberOfContainers());
component.handle(event);
}
}
}
}
private class ContainerFailureReset implements Runnable {
@Override
public void run() {
for (Component component : context.scheduler.getAllComponents().values()) {
component.resetCompFailureCount();
}
}
}
}