blob: 9a8d708f3cde83a2b388b29072f9c4039a9a1aed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.serveraction.ServerAction;
import org.apache.ambari.server.serveraction.ServerActionManager;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.reflect.TypeToken;
import com.google.inject.persist.UnitOfWork;
/**
* This class encapsulates the action scheduler thread.
* Action schedule frequently looks at action database and determines if
* there is an action that can be scheduled.
*/
class ActionScheduler implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(ActionScheduler.class);
private final long actionTimeout;
private final long sleepTime;
private final UnitOfWork unitOfWork;
private volatile boolean shouldRun = true;
private Thread schedulerThread = null;
private final ActionDBAccessor db;
private final short maxAttempts;
private final ActionQueue actionQueue;
private final Clusters fsmObject;
private boolean taskTimeoutAdjustment = true;
private final HostsMap hostsMap;
private final Object wakeupSyncObject = new Object();
private final ServerActionManager serverActionManager;
/**
* true if scheduler should run ASAP.
* We need this flag to avoid sleep in situations, when
* we receive awake() request during running a scheduler iteration.
*/
private boolean activeAwakeRequest = false;
private Cache<Long, Map<String, List<String>>> clusterHostInfoCache;
public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager, UnitOfWork unitOfWork) {
this.sleepTime = sleepTimeMilliSec;
this.hostsMap = hostsMap;
this.actionTimeout = actionTimeoutMilliSec;
this.db = db;
this.actionQueue = actionQueue;
this.fsmObject = fsmObject;
this.maxAttempts = (short) maxAttempts;
this.serverActionManager = serverActionManager;
this.unitOfWork = unitOfWork;
this.clusterHostInfoCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
}
public void start() {
schedulerThread = new Thread(this);
schedulerThread.start();
}
public void stop() {
shouldRun = false;
schedulerThread.interrupt();
}
/**
* Should be called from another thread when we want scheduler to
* make a run ASAP (for example, to process desired configs of SCHs).
* The method is guaranteed to return quickly.
*/
public void awake() {
synchronized (wakeupSyncObject) {
activeAwakeRequest = true;
wakeupSyncObject.notify();
}
}
@Override
public void run() {
while (shouldRun) {
try {
synchronized (wakeupSyncObject) {
if (!activeAwakeRequest) {
wakeupSyncObject.wait(sleepTime);
}
activeAwakeRequest = false;
}
doWork();
} catch (InterruptedException ex) {
LOG.warn("Scheduler thread is interrupted going to stop", ex);
shouldRun = false;
} catch (Exception ex) {
LOG.warn("Exception received", ex);
} catch (Throwable t) {
LOG.warn("ERROR", t);
}
}
}
public void doWork() throws AmbariException {
try {
unitOfWork.begin();
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
}
if (stages == null || stages.isEmpty()) {
//Nothing to do
if (LOG.isDebugEnabled()) {
LOG.debug("No stage in progress..nothing to do");
}
return;
}
for (Stage s : stages) {
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule);
//Check if stage is failed
boolean failed = false;
for (String role : roleStats.keySet()) {
RoleStats stats = roleStats.get(role);
if (LOG.isDebugEnabled()) {
LOG.debug("Stats for role:" + role + ", stats=" + stats);
}
if (stats.isRoleFailed()) {
failed = true;
break;
}
}
if(!failed) {
// Prior stage may have failed and it may need to fail the whole request
failed = hasPreviousStageFailed(s);
}
if (failed) {
LOG.warn("Operation completely failed, aborting request id:"
+ s.getRequestId());
db.abortOperation(s.getRequestId());
return;
}
//Schedule what we have so far
for (ExecutionCommand cmd : commandsToSchedule) {
if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
try {
long now = System.currentTimeMillis();
String hostName = cmd.getHostname();
String roleName = cmd.getRole().toString();
s.setStartTime(hostName, roleName, now);
s.setLastAttemptTime(hostName, roleName, now);
s.incrementAttemptCount(hostName, roleName);
s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
db.hostRoleScheduled(s, hostName, roleName);
String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
reportServerActionSuccess(s, cmd);
} catch (AmbariException e) {
LOG.warn("Could not execute server action " + cmd.toString(), e);
reportServerActionFailure(s, cmd, e.getMessage());
}
} else {
try {
scheduleHostRole(s, cmd);
} catch (InvalidStateTransitionException e) {
LOG.warn("Could not schedule host role " + cmd.toString(), e);
db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
}
}
}
//Check if ready to go to next stage
boolean goToNextStage = true;
for (String role : roleStats.keySet()) {
RoleStats stats = roleStats.get(role);
if (!stats.isSuccessFactorMet()) {
goToNextStage = false;
break;
}
}
if (!goToNextStage) {
return;
}
}
} finally {
unitOfWork.end();
}
}
private boolean hasPreviousStageFailed(Stage stage) {
boolean failed = false;
long prevStageId = stage.getStageId() - 1;
if (prevStageId > 0) {
List<Stage> allStages = db.getAllStages(stage.getRequestId());
Stage prevStage = null;
for (Stage s : allStages) {
if (s.getStageId() == prevStageId) {
prevStage = s;
break;
}
}
//It may be null for test scenarios
if(prevStage != null) {
Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
Map<Role, Integer> failedHostCountsForRoles = new HashMap<Role, Integer>();
for (String host : prevStage.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = prevStage.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
failedHostCountsForRoles.put(c.getRole(), 0);
}
int hostCount = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), hostCount + 1);
if (c.getStatus().isFailedState()) {
int failedHostCount = failedHostCountsForRoles.get(c.getRole());
failedHostCountsForRoles.put(c.getRole(), failedHostCount + 1);
}
}
}
for (Role role : hostCountsForRoles.keySet()) {
float failedHosts = failedHostCountsForRoles.get(role);
float totalHosts = hostCountsForRoles.get(role);
if (((totalHosts - failedHosts) / totalHosts) < prevStage.getSuccessFactor(role)) {
failed = true;
}
}
}
}
return failed;
}
private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) {
CommandReport report = new CommandReport();
report.setStatus(HostRoleStatus.COMPLETED.toString());
report.setExitCode(0);
report.setStdOut("Server action succeeded");
report.setStdErr("");
db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
cmd.getRole().toString(), report);
}
private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
CommandReport report = new CommandReport();
report.setStatus(HostRoleStatus.FAILED.toString());
report.setExitCode(1);
report.setStdOut("Server action failed");
report.setStdErr(message);
db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
cmd.getRole().toString(), report);
}
/**
* @param commandsToSchedule
* @return Stats for the roles in the stage. It is used to determine whether stage
* has succeeded or failed.
*/
private Map<String, RoleStats> processInProgressStage(Stage s,
List<ExecutionCommand> commandsToSchedule) throws AmbariException {
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
long now = System.currentTimeMillis();
long taskTimeout = actionTimeout;
if (taskTimeoutAdjustment) {
taskTimeout = actionTimeout + s.getTaskTimeout();
}
for (String host : s.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
Cluster cluster = fsmObject.getCluster(s.getClusterName());
Host hostObj = fsmObject.getHost(host);
for(ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole().toString();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
taskTimeout)) {
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
+ s.getActionId() + " timed out");
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
+ s.getActionId() + " expired");
db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
ServiceComponentHostOpFailedEvent timeoutEvent =
new ServiceComponentHostOpFailedEvent(roleStr,
host, now);
try {
Service svc = cluster.getService(c.getServiceName());
ServiceComponent svcComp = svc.getServiceComponent(
roleStr);
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(host);
svcCompHost.handleEvent(timeoutEvent);
LOG.warn("Operation timed out. Role: " + roleStr + ", host: " + host);
} catch (ServiceComponentNotFoundException scnex) {
LOG.debug("Not a service component, assuming its an action. Details: "
+ scnex.getMessage());
} catch (InvalidStateTransitionException e) {
LOG.info("Transition failed for host: " + host + ", role: "
+ roleStr, e);
} catch (AmbariException ex) {
LOG.warn("Invalid live state", ex);
}
// Dequeue command
actionQueue.dequeue(host, c.getCommandId());
} else {
commandsToSchedule.add(c);
}
} else if (status.equals(HostRoleStatus.PENDING)) {
//Need to schedule first time
commandsToSchedule.add(c);
}
this.updateRoleStats(status, roleStats.get(roleStr));
}
}
return roleStats;
}
private Map<String, RoleStats> initRoleStats(Stage s) {
Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
for (String host : s.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = s.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
}
int val = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), val + 1);
}
}
for (Role r : hostCountsForRoles.keySet()) {
RoleStats stats = new RoleStats(hostCountsForRoles.get(r),
s.getSuccessFactor(r));
roleStats.put(r.toString(), stats);
}
return roleStats;
}
private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
Host host, String role, long currentTime, long taskTimeout) throws
AmbariException {
if (( !status.equals(HostRoleStatus.QUEUED) ) &&
( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
return false;
}
// Fast fail task if host state is unknown
if (host.getState().equals(HostState.HEARTBEAT_LOST)) {
LOG.debug("Timing out action since agent is not heartbeating.");
return true;
}
if (currentTime > stage.getLastAttemptTime(host.getHostName(),
role) + taskTimeout) {
return true;
}
return false;
}
private void scheduleHostRole(Stage s, ExecutionCommand cmd)
throws InvalidStateTransitionException, AmbariException {
long now = System.currentTimeMillis();
String roleStr = cmd.getRole().toString();
String hostname = cmd.getHostname();
if (s.getStartTime(hostname, roleStr) < 0) {
try {
Cluster c = fsmObject.getCluster(s.getClusterName());
Service svc = c.getService(cmd.getServiceName());
ServiceComponent svcComp = svc.getServiceComponent(roleStr);
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(hostname);
svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
} catch (ServiceComponentNotFoundException scnex) {
LOG.info("Not a service component, assuming its an action", scnex);
} catch (InvalidStateTransitionException e) {
LOG.info(
"Transition failed for host: " + hostname + ", role: "
+ roleStr, e);
throw e;
} catch (AmbariException e) {
LOG.warn("Exception in fsm: " + hostname + ", role: " + roleStr,
e);
throw e;
}
s.setStartTime(hostname,roleStr, now);
s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
}
s.setLastAttemptTime(hostname, roleStr, now);
s.incrementAttemptCount(hostname, roleStr);
LOG.debug("Scheduling command: "+cmd.toString()+" for host: "+hostname);
/** change the hostname in the command for the host itself **/
cmd.setHostname(hostsMap.getHostMap(hostname));
//Try to get clusterHostInfo from cache
Map<String, List<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(s.getStageId());
if (clusterHostInfo == null) {
Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
clusterHostInfoCache.put(s.getStageId(), clusterHostInfo);
}
cmd.setClusterHostInfo(clusterHostInfo);
actionQueue.enqueue(hostname, cmd);
db.hostRoleScheduled(s, hostname, roleStr);
}
private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
switch (status) {
case COMPLETED:
rs.numSucceeded++;
break;
case FAILED:
rs.numFailed++;
break;
case QUEUED:
rs.numQueued++;
break;
case PENDING:
rs.numPending++;
break;
case TIMEDOUT:
rs.numTimedOut++;
break;
case ABORTED:
rs.numAborted++;
break;
case IN_PROGRESS:
rs.numInProgress++;
break;
default:
LOG.error("Unknown status " + status.name());
}
}
public void setTaskTimeoutAdjustment(boolean val) {
this.taskTimeoutAdjustment = val;
}
static class RoleStats {
int numInProgress;
int numQueued = 0;
int numSucceeded = 0;
int numFailed = 0;
int numTimedOut = 0;
int numPending = 0;
int numAborted = 0;
final int totalHosts;
final float successFactor;
RoleStats(int total, float successFactor) {
this.totalHosts = total;
this.successFactor = successFactor;
}
/**
* Role successful means the role is successful enough to
*/
boolean isSuccessFactorMet() {
int minSuccessNeeded = (int) Math.ceil(successFactor * totalHosts);
if (minSuccessNeeded <= numSucceeded) {
return true;
} else {
return false;
}
}
private boolean isRoleInProgress() {
return (numPending+numQueued+numInProgress > 0);
}
/**
* Role failure means role is no longer in progress and success factor is
* not met.
*/
boolean isRoleFailed() {
if (isRoleInProgress() || isSuccessFactorMet()) {
return false;
} else {
return true;
}
}
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("numQueued="+numQueued);
builder.append(", numInProgress="+numInProgress);
builder.append(", numSucceeded="+numSucceeded);
builder.append(", numFailed="+numFailed);
builder.append(", numTimedOut="+numTimedOut);
builder.append(", numPending="+numPending);
builder.append(", numAborted="+numAborted);
builder.append(", totalHosts="+totalHosts);
builder.append(", successFactor="+successFactor);
return builder.toString();
}
}
}