blob: a6dfd7c28354bad8ae05faea4fcf025df1caf79d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
public class SchedulingMonitor extends AbstractService {
private final SchedulingEditPolicy scheduleEditPolicy;
private static final Logger LOG =
LoggerFactory.getLogger(SchedulingMonitor.class);
// ScheduledExecutorService which schedules the PreemptionChecker to run
// periodically.
private ScheduledExecutorService ses;
private ScheduledFuture<?> handler;
private volatile boolean stopped;
private long monitorInterval;
private RMContext rmContext;
public SchedulingMonitor(RMContext rmContext,
SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy;
this.rmContext = rmContext;
}
@VisibleForTesting
public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
return scheduleEditPolicy;
}
public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing SchedulingMonitor=" + getName());
scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
}
@Override
public void serviceStart() throws Exception {
LOG.info("Starting SchedulingMonitor=" + getName());
assert !stopped : "starting when already stopped";
ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(getName());
return t;
}
});
schedulePreemptionChecker();
super.serviceStart();
}
private void schedulePreemptionChecker() {
handler = ses.scheduleAtFixedRate(new PolicyInvoker(),
0, monitorInterval, TimeUnit.MILLISECONDS);
}
@Override
public void serviceStop() throws Exception {
stopped = true;
if (handler != null) {
LOG.info("Stop " + getName());
handler.cancel(true);
ses.shutdown();
}
super.serviceStop();
}
@VisibleForTesting
public void invokePolicy(){
scheduleEditPolicy.editSchedule();
}
private class PolicyInvoker implements Runnable {
@Override
public void run() {
try {
if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) {
handler.cancel(true);
monitorInterval = scheduleEditPolicy.getMonitoringInterval();
schedulePreemptionChecker();
} else {
invokePolicy();
}
} catch (Throwable t) {
// The preemption monitor does not alter structures nor do structures
// persist across invocations. Therefore, log, skip, and retry.
LOG.error("Exception raised while executing preemption"
+ " checker, skip this run..., exception=", t);
}
}
}
}