blob: 57533612b81174916a80ca5d017da8ae539a2020 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.serveraction.ServerActionExecutor;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.reflect.TypeToken;
import com.google.inject.persist.UnitOfWork;
/**
* This class encapsulates the action scheduler thread.
* Action schedule frequently looks at action database and determines if
* there is an action that can be scheduled.
*/
class ActionScheduler implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(ActionScheduler.class);
public static final String FAILED_TASK_ABORT_REASONING =
"Server considered task failed and automatically aborted it";
private final long actionTimeout;
private final long sleepTime;
private final UnitOfWork unitOfWork;
private volatile boolean shouldRun = true;
private Thread schedulerThread = null;
private final ActionDBAccessor db;
private final short maxAttempts;
private final ActionQueue actionQueue;
private final Clusters clusters;
private final AmbariEventPublisher ambariEventPublisher;
private boolean taskTimeoutAdjustment = true;
private final HostsMap hostsMap;
private final Object wakeupSyncObject = new Object();
private final ServerActionExecutor serverActionExecutor;
private final Configuration configuration;
private final Set<Long> requestsInProgress = new HashSet<Long>();
/**
* Contains request ids that have been scheduled to be cancelled,
* but are not cancelled yet
*/
private final Set<Long> requestsToBeCancelled =
Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
/**
* Maps request IDs to reasoning for cancelling request.
* Map is NOT synchronized, so any access to it should synchronize on
* requestsToBeCancelled object
*/
private final Map<Long, String> requestCancelReasons =
new HashMap<Long, String>();
/**
* true if scheduler should run ASAP.
* We need this flag to avoid sleep in situations, when
* we receive awake() request during running a scheduler iteration.
*/
private boolean activeAwakeRequest = false;
//Cache for clusterHostinfo, key - stageId-requestId
private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
private Cache<String, Map<String, String>> commandParamsStageCache;
private Cache<String, Map<String, String>> hostParamsStageCache;
public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
ActionDBAccessor db, ActionQueue actionQueue, Clusters clusters,
int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
Configuration configuration) {
sleepTime = sleepTimeMilliSec;
this.hostsMap = hostsMap;
actionTimeout = actionTimeoutMilliSec;
this.db = db;
this.actionQueue = actionQueue;
this.clusters = clusters;
this.ambariEventPublisher = ambariEventPublisher;
this.maxAttempts = (short) maxAttempts;
serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
this.unitOfWork = unitOfWork;
clusterHostInfoCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
commandParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
hostParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
this.configuration = configuration;
}
public void start() {
schedulerThread = new Thread(this, "ambari-action-scheduler");
schedulerThread.start();
// Start up the ServerActionExecutor. Since it is directly related to the ActionScheduler it
// should be started and stopped along with it.
serverActionExecutor.start();
}
public void stop() {
shouldRun = false;
schedulerThread.interrupt();
// Stop the ServerActionExecutor. Since it is directly related to the ActionScheduler it should
// be started and stopped along with it.
serverActionExecutor.stop();
}
/**
* Should be called from another thread when we want scheduler to
* make a run ASAP (for example, to process desired configs of SCHs).
* The method is guaranteed to return quickly.
*/
public void awake() {
synchronized (wakeupSyncObject) {
activeAwakeRequest = true;
wakeupSyncObject.notify();
}
}
@Override
public void run() {
while (shouldRun) {
try {
synchronized (wakeupSyncObject) {
if (!activeAwakeRequest) {
wakeupSyncObject.wait(sleepTime);
}
activeAwakeRequest = false;
}
doWork();
} catch (InterruptedException ex) {
LOG.warn("Scheduler thread is interrupted going to stop", ex);
shouldRun = false;
} catch (Exception ex) {
LOG.warn("Exception received", ex);
requestsInProgress.clear();
} catch (Throwable t) {
LOG.warn("ERROR", t);
requestsInProgress.clear();
}
}
}
public void doWork() throws AmbariException {
try {
unitOfWork.begin();
// The first thing to do is to abort requests that are cancelled
processCancelledRequestsList();
// !!! getting the stages in progress could be a very expensive call due
// to the join being used; there's no need to make it if there are
// no commands in progress
if (db.getCommandsInProgressCount() == 0) {
// Nothing to do
if (LOG.isDebugEnabled()) {
LOG.debug("There are no stages currently in progress.");
}
actionQueue.updateListOfHostsWithPendingTask(null);
return;
}
Set<Long> runningRequestIds = new HashSet<Long>();
List<Stage> stages = db.getStagesInProgress();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
LOG.debug("Processing {} in progress stages ", stages.size());
}
if (stages.isEmpty()) {
// Nothing to do
if (LOG.isDebugEnabled()) {
LOG.debug("There are no stages currently in progress.");
}
actionQueue.updateListOfHostsWithPendingTask(null);
return;
}
int i_stage = 0;
HashSet<String> hostsWithTasks = getListOfHostsWithPendingTask(stages);
actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks);
stages = filterParallelPerHostStages(stages);
// At this point the stages is a filtered list
boolean exclusiveRequestIsGoing = false;
// This loop greatly depends on the fact that order of stages in
// a list does not change between invocations
for (Stage stage : stages) {
// Check if we can process this stage in parallel with another stages
i_stage ++;
long requestId = stage.getRequestId();
LOG.debug("==> STAGE_i = " + i_stage + "(requestId=" + requestId + ",StageId=" + stage.getStageId() + ")");
RequestEntity request = db.getRequestEntity(requestId);
if (request.isExclusive()) {
if (runningRequestIds.size() > 0 ) {
// As a result, we will wait until any previous stages are finished
LOG.debug("Stage requires exclusive execution, but other requests are already executing. Stopping for now");
break;
}
exclusiveRequestIsGoing = true;
}
if (runningRequestIds.contains(requestId)) {
// We don't want to process different stages from the same request in parallel
LOG.debug("==> We don't want to process different stages from the same request in parallel" );
continue;
} else {
runningRequestIds.add(requestId);
if (!requestsInProgress.contains(requestId)) {
requestsInProgress.add(requestId);
db.startRequest(requestId);
}
}
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule);
// Check if stage is failed
boolean failed = false;
for (Map.Entry<String, RoleStats> entry : roleStats.entrySet()) {
String role = entry.getKey();
RoleStats stats = entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Stats for role: {}, stats={}", role, stats);
}
// only fail the request if the role failed and the stage is not
// skippable
if (stats.isRoleFailed() && !stage.isSkippable()) {
LOG.warn("{} failed, request {} will be aborted", role, request.getRequestId());
failed = true;
break;
}
}
if (!failed) {
// Prior stage may have failed and it may need to fail the whole request
failed = hasPreviousStageFailed(stage);
}
if (failed) {
LOG.warn("Operation completely failed, aborting request id: {}", stage.getRequestId());
cancelHostRoleCommands(stage.getOrderedHostRoleCommands(), FAILED_TASK_ABORT_REASONING);
abortOperationsForStage(stage);
return;
}
List<ExecutionCommand> commandsToStart = new ArrayList<ExecutionCommand>();
List<ExecutionCommand> commandsToUpdate = new ArrayList<ExecutionCommand>();
//Schedule what we have so far
for (ExecutionCommand cmd : commandsToSchedule) {
// Hack - Remove passwords from configs
if ((cmd.getRole().equals(Role.HIVE_CLIENT.toString()) ||
cmd.getRole().equals(Role.WEBHCAT_SERVER.toString()) ||
cmd.getRole().equals(Role.HCAT.toString())) &&
cmd.getConfigurations().containsKey(Configuration.HIVE_CONFIG_TAG)) {
cmd.getConfigurations().get(Configuration.HIVE_CONFIG_TAG).remove(Configuration.HIVE_METASTORE_PASSWORD_PROPERTY);
}
processHostRole(stage, cmd, commandsToStart, commandsToUpdate);
}
LOG.debug("==> Commands to start: {}", commandsToStart.size());
LOG.debug("==> Commands to update: {}", commandsToUpdate.size());
//Multimap is analog of Map<Object, List<Object>> but allows to avoid nested loop
ListMultimap<String, ServiceComponentHostEvent> eventMap = formEventMap(stage, commandsToStart);
Map<ExecutionCommand, String> commandsToAbort = new HashMap<ExecutionCommand, String>();
if (!eventMap.isEmpty()) {
LOG.debug("==> processing {} serviceComponentHostEvents...", eventMap.size());
Cluster cluster = clusters.getCluster(stage.getClusterName());
if (cluster != null) {
Map<ServiceComponentHostEvent, String> failedEvents = cluster.processServiceComponentHostEvents(eventMap);
if (failedEvents.size() > 0) {
LOG.error("==> {} events failed.", failedEvents.size());
}
for (Iterator<ExecutionCommand> iterator = commandsToUpdate.iterator(); iterator.hasNext(); ) {
ExecutionCommand cmd = iterator.next();
for (ServiceComponentHostEvent event : failedEvents.keySet()) {
if (StringUtils.equals(event.getHostName(), cmd.getHostname()) &&
StringUtils.equals(event.getServiceComponentName(), cmd.getRole())) {
iterator.remove();
commandsToAbort.put(cmd, failedEvents.get(event));
break;
}
}
}
} else {
LOG.warn("There was events to process but cluster {} not found", stage.getClusterName());
}
}
LOG.debug("==> Scheduling {} tasks...", commandsToUpdate.size());
db.bulkHostRoleScheduled(stage, commandsToUpdate);
if (commandsToAbort.size() > 0) { // Code branch may be a bit slow, but is extremely rarely used
LOG.debug("==> Aborting {} tasks...", commandsToAbort.size());
// Build a list of HostRoleCommands
List<Long> taskIds = new ArrayList<Long>();
for (ExecutionCommand command : commandsToAbort.keySet()) {
taskIds.add(command.getTaskId());
}
Collection<HostRoleCommand> hostRoleCommands = db.getTasks(taskIds);
cancelHostRoleCommands(hostRoleCommands, FAILED_TASK_ABORT_REASONING);
db.bulkAbortHostRole(stage, commandsToAbort);
}
LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
for (ExecutionCommand cmd : commandsToUpdate) {
// Do not queue up server actions; however if we encounter one, wake up the ServerActionExecutor
if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
serverActionExecutor.awake();
} else {
actionQueue.enqueue(cmd.getHostname(), cmd);
}
}
LOG.debug("==> Finished.");
if (! configuration.getParallelStageExecution()) { // If disabled
return;
}
if (exclusiveRequestIsGoing) {
// As a result, we will prevent any further stages from being executed
LOG.debug("Stage requires exclusive execution, skipping all executing any further stages");
break;
}
}
requestsInProgress.retainAll(runningRequestIds);
} finally {
LOG.debug("Scheduler finished work.");
unitOfWork.end();
}
}
/**
* Returns the list of hosts that have a task assigned
*
* @param stages
*
* @return
*/
private HashSet<String> getListOfHostsWithPendingTask(List<Stage> stages) {
HashSet<String> hostsWithTasks = new HashSet<String>();
for (Stage s : stages) {
hostsWithTasks.addAll(s.getHosts());
}
return hostsWithTasks;
}
/**
* Returns filtered list of stages such that the returned list is an ordered list of stages that may
* be executed in parallel or in the order in which they are presented
* <p/>
* Assumption: the list of stages supplied as input are ordered by request id and then stage id.
* <p/>
* Rules:
* <ul>
* <li>
* Stages are filtered such that the first stage in the list (assumed to be the first pending
* stage from the earliest active request) has priority
* </li>
* <li>
* No stage in any request may be executed before an earlier stage in the same request
* </li>
* <li>
* A stages in different requests may be performed in parallel if the relevant hosts for the
* stage in the later requests do not intersect with the union of hosts from (pending) stages
* in earlier requests
* </li>
* </ul>
*
* @param stages the stages to process
* @return a list of stages that may be executed in parallel
*/
private List<Stage> filterParallelPerHostStages(List<Stage> stages) {
List<Stage> retVal = new ArrayList<Stage>();
Set<String> affectedHosts = new HashSet<String>();
Set<Long> affectedRequests = new HashSet<Long>();
for (Stage s : stages) {
long requestId = s.getRequestId();
if (LOG.isTraceEnabled()) {
LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, s.getStageId(), s.getRequestContext());
}
boolean addStage = true;
// Iterate over the relevant hosts for this stage to see if any intersect with the set of
// hosts needed for previous stages. If any intersection occurs, this stage may not be
// executed in parallel.
for (String host : s.getHosts()) {
LOG.trace("===> Processing Host {}", host);
if (affectedHosts.contains(host)) {
if (LOG.isTraceEnabled()) {
LOG.trace("===> Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
}
addStage &= false;
} else {
if (!Stage.INTERNAL_HOSTNAME.equalsIgnoreCase(host) && !isStageHasBackgroundCommandsOnly(s, host)) {
LOG.trace("====> Adding host to affected hosts: {}", host);
affectedHosts.add(host);
}
addStage &= true;
}
}
// If this stage is for a request that we have already processed, the it cannot execute in
// parallel since only one stage per request my execute at a time. The first time we encounter
// a request id, will be for the first pending stage for that request, so it is a candidate
// for execution at this time - if the previous test for host intersection succeeds.
if (affectedRequests.contains(requestId)) {
if (LOG.isTraceEnabled()) {
LOG.trace("===> Skipping stage since the request it is in has been processed already: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
}
addStage = false;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("====> Adding request to affected requests: {}", requestId);
}
affectedRequests.add(requestId);
addStage &= true;
}
// If both tests pass - the stage is the first pending stage in its request and the hosts
// required in the stage do not intersect with hosts from stages that should occur before this,
// than add it to the list of stages that may be executed in parallel.
if (addStage) {
if (LOG.isTraceEnabled()) {
LOG.trace("===> Adding stage to return value: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext());
}
retVal.add(s);
}
}
return retVal;
}
private boolean isStageHasBackgroundCommandsOnly(Stage s, String host) {
for (ExecutionCommandWrapper c : s.getExecutionCommands(host)) {
if(c.getExecutionCommand().getCommandType() != AgentCommandType.BACKGROUND_EXECUTION_COMMAND)
{
return false;
}
}
return true;
}
private boolean hasPreviousStageFailed(Stage stage) {
boolean failed = false;
long prevStageId = stage.getStageId() - 1;
if (prevStageId > 0) {
// Find previous stage instance
String actionId = StageUtils.getActionId(stage.getRequestId(), prevStageId);
Stage prevStage = db.getStage(actionId);
// If the previous stage is skippable then we shouldn't automatically fail the given stage
if (prevStage == null || prevStage.isSkippable()) {
return false;
}
Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
Map<Role, Integer> failedHostCountsForRoles = new HashMap<Role, Integer>();
for (String host : prevStage.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = prevStage.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
failedHostCountsForRoles.put(c.getRole(), 0);
}
int hostCount = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), hostCount + 1);
if (c.getStatus().isFailedState()) {
int failedHostCount = failedHostCountsForRoles.get(c.getRole());
failedHostCountsForRoles.put(c.getRole(), failedHostCount + 1);
}
}
}
for (Role role : hostCountsForRoles.keySet()) {
float failedHosts = failedHostCountsForRoles.get(role);
float totalHosts = hostCountsForRoles.get(role);
if (((totalHosts - failedHosts) / totalHosts) < prevStage.getSuccessFactor(role)) {
failed = true;
}
}
}
return failed;
}
/**
* This method processes command timeouts and retry attempts, and
* adds new (pending) execution commands to commandsToSchedule list.
*
* @return the stats for the roles in the stage which are used to determine
* whether stage has succeeded or failed
*/
private Map<String, RoleStats> processInProgressStage(Stage s,
List<ExecutionCommand> commandsToSchedule) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
long now = System.currentTimeMillis();
Cluster cluster = null;
if (null != s.getClusterName()) {
cluster = clusters.getCluster(s.getClusterName());
}
for (String host : s.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
Host hostObj = null;
try {
hostObj = clusters.getHost(host);
} catch (AmbariException e) {
LOG.debug("Host {} not found, stage is likely a server side action", host);
}
int i_my = 0;
LOG.trace("===>host=" + host);
for(ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
i_my ++;
if (LOG.isTraceEnabled()) {
LOG.trace("Host task " + i_my + ") id = " + c.getTaskId() + " status = " + status.toString() +
" (role=" + roleStr + "), roleCommand = "+ c.getRoleCommand());
}
boolean hostDeleted = false;
if (null != cluster) {
Service svc = null;
if (c.getServiceName() != null && !c.getServiceName().isEmpty()) {
svc = cluster.getService(c.getServiceName());
}
ServiceComponent svcComp = null;
Map<String, ServiceComponentHost> scHosts = null;
try {
if (svc != null) {
svcComp = svc.getServiceComponent(roleStr);
scHosts = svcComp.getServiceComponentHosts();
}
} catch (ServiceComponentNotFoundException scnex) {
String msg = String.format(
"%s is not not a service component, assuming its an action",
roleStr);
LOG.debug(msg);
}
hostDeleted = (scHosts != null && !scHosts.containsKey(host));
if (hostDeleted) {
String message = String.format(
"Host component information has not been found. Details:" +
"cluster=%s; host=%s; service=%s; component=%s; ",
c.getClusterName(), host,
svcComp == null ? "null" : svcComp.getServiceName(),
svcComp == null ? "null" : svcComp.getName());
LOG.warn(message);
}
}
//basic timeout for stage
long commandTimeout = actionTimeout;
if (taskTimeoutAdjustment) {
Map<String, String> commandParams = c.getCommandParams();
String timeoutKey = ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
if (commandParams != null && commandParams.containsKey(timeoutKey)) {
String timeoutStr = commandParams.get(timeoutKey);
commandTimeout += Long.parseLong(timeoutStr) * 1000; // Converting to milliseconds
} else {
LOG.error("Execution command has no timeout parameter" +
c.toString());
}
}
// Check that service host component is not deleted
if (hostDeleted) {
String message = String.format(
"Host not found when trying to schedule an execution command. " +
"The most probable reason for that is that host or host component " +
"has been deleted recently. The command has been aborted and dequeued." +
"Execution command details: " +
"cmdId: %s; taskId: %s; roleCommand: %s",
c.getCommandId(), c.getTaskId(), c.getRoleCommand());
LOG.warn("Host {} has been detected as non-available. {}", host, message);
// Abort the command itself
// We don't need to send CANCEL_COMMANDs in this case
db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
// Process command timeouts
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " expired");
db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole());
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
if (null != cluster) {
transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
}
// Dequeue command
LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
actionQueue.dequeue(host, c.getCommandId());
} else {
// reschedule command
commandsToSchedule.add(c);
LOG.trace("===> commandsToSchedule(reschedule)=" + commandsToSchedule.size());
}
} else if (status.equals(HostRoleStatus.PENDING)) {
//Need to schedule first time
commandsToSchedule.add(c);
LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size());
}
updateRoleStats(status, roleStats.get(roleStr));
}
}
LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
return roleStats;
}
/**
* Generate a OPFailed event before aborting all operations in the stage
* @param stage
*/
private void abortOperationsForStage(Stage stage) {
long now = System.currentTimeMillis();
for (String hostName : stage.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers =
stage.getExecutionCommands(hostName);
for(ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
transitionToFailedState(stage.getClusterName(), c.getServiceName(),
c.getRole(), hostName, now, true);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = c.getClusterName();
processActionDeath(clusterName,
c.getHostname(),
c.getRole());
}
}
}
db.abortOperation(stage.getRequestId());
}
/**
* Raise a OPFailed event for a SCH
* @param clusterName
* @param serviceName
* @param componentName
* @param hostname
* @param timestamp
*/
private void transitionToFailedState(String clusterName, String serviceName,
String componentName, String hostname,
long timestamp,
boolean ignoreTransitionException) {
try {
Cluster cluster = clusters.getCluster(clusterName);
ServiceComponentHostOpFailedEvent failedEvent =
new ServiceComponentHostOpFailedEvent(componentName,
hostname, timestamp);
if (serviceName != null && ! serviceName.isEmpty() &&
componentName != null && ! componentName.isEmpty()) {
Service svc = cluster.getService(serviceName);
ServiceComponent svcComp = svc.getServiceComponent(componentName);
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(hostname);
svcCompHost.handleEvent(failedEvent);
} else {
LOG.info("Service name is " + serviceName + ", component name is " + componentName +
"skipping sending ServiceComponentHostOpFailedEvent for " + componentName);
}
} catch (ServiceComponentNotFoundException scnex) {
LOG.debug(componentName + " associated with service " + serviceName +
" is not a service component, assuming it's an action.");
} catch (ServiceComponentHostNotFoundException e) {
String msg = String.format("Service component host %s not found, " +
"unable to transition to failed state.", componentName);
LOG.warn(msg, e);
} catch (InvalidStateTransitionException e) {
if (ignoreTransitionException) {
LOG.debug("Unable to transition to failed state.", e);
} else {
LOG.warn("Unable to transition to failed state.", e);
}
} catch (AmbariException e) {
LOG.warn("Unable to transition to failed state.", e);
}
}
/**
* Populates a map < role_name, role_stats>.
*/
private Map<String, RoleStats> initRoleStats(Stage s) {
// Meaning: how many hosts are affected by commands for each role
Map<Role, Integer> hostCountsForRoles = new HashMap<Role, Integer>();
// < role_name, rolestats >
Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
for (String host : s.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = s.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
}
int val = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), val + 1);
}
}
for (Role r : hostCountsForRoles.keySet()) {
RoleStats stats = new RoleStats(hostCountsForRoles.get(r),
s.getSuccessFactor(r));
roleStats.put(r.name(), stats);
}
return roleStats;
}
/**
* Checks if timeout is required.
* @param status the status of the current role
* @param stage the stage
* @param host the host object; can be {@code null} for server-side tasks
* @param role the role
* @param currentTime the current
* @param taskTimeout the amount of time to determine timeout
* @return {@code true} if timeout is needed
* @throws AmbariException
*/
private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
Host host, String role, long currentTime, long taskTimeout) throws
AmbariException {
if (( !status.equals(HostRoleStatus.QUEUED) ) &&
( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
return false;
}
// Fast fail task if host state is unknown
if (null != host && host.getState().equals(HostState.HEARTBEAT_LOST)) {
LOG.debug("Timing out action since agent is not heartbeating.");
return true;
}
// tasks are held in a variety of in-memory maps that require a hostname key
// host being null is ok - that means it's a server-side task
String hostName = (null == host) ? null : host.getHostName();
// If we have other command in progress for this stage do not timeout this one
if (hasCommandInProgress(stage, hostName)
&& !status.equals(HostRoleStatus.IN_PROGRESS)) {
return false;
}
if (currentTime >= stage.getLastAttemptTime(hostName, role)
+ taskTimeout) {
return true;
}
return false;
}
/**
* Determines if at least one task for a given hostname in the specified stage is in progress.
* <p/>
* If the specified hostname is <code>null</code>, the Ambari Server host is assumed.
* See {@link Stage#getSafeHost(String)}.
*
* @param stage a stage
* @param hostname a host name, if null the Ambari Server host is assumed
* @return true if at least one task for the given hostname in the specified stage is in progress; otherwize false
* @see Stage#getExecutionCommands(String)
* @see Stage#getHostRoleStatus(String, String)
*/
private boolean hasCommandInProgress(Stage stage, String hostname) {
List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(hostname);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = stage.getHostRoleStatus(hostname, roleStr);
if (status == HostRoleStatus.IN_PROGRESS) {
return true;
}
}
return false;
}
private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage s, List<ExecutionCommand> commands) {
ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = ArrayListMultimap.create();
for (ExecutionCommand cmd : commands) {
String hostname = cmd.getHostname();
String roleStr = cmd.getRole();
if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
serviceEventMap.put(cmd.getServiceName(), s.getFsmEvent(hostname, roleStr).getEvent());
}
}
return serviceEventMap;
}
private void processHostRole(Stage s, ExecutionCommand cmd, List<ExecutionCommand> commandsToStart,
List<ExecutionCommand> commandsToUpdate)
throws AmbariException {
long now = System.currentTimeMillis();
String roleStr = cmd.getRole();
String hostname = cmd.getHostname();
// start time is -1 if host role command is not started yet
if (s.getStartTime(hostname, roleStr) < 0) {
commandsToStart.add(cmd);
s.setStartTime(hostname,roleStr, now);
s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
}
s.setLastAttemptTime(hostname, roleStr, now);
s.incrementAttemptCount(hostname, roleStr);
/** change the hostname in the command for the host itself **/
cmd.setHostname(hostsMap.getHostMap(hostname));
//Try to get clusterHostInfo from cache
String stagePk = s.getStageId() + "-" + s.getRequestId();
Map<String, Set<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(stagePk);
if (clusterHostInfo == null) {
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
clusterHostInfoCache.put(stagePk, clusterHostInfo);
}
cmd.setClusterHostInfo(clusterHostInfo);
//Try to get commandParams from cache and merge them with command-level parameters
Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
if (commandParams == null){
Type type = new TypeToken<Map<String, String>>() {}.getType();
commandParams = StageUtils.getGson().fromJson(s.getCommandParamsStage(), type);
commandParamsStageCache.put(stagePk, commandParams);
}
Map<String, String> commandParamsCmd = cmd.getCommandParams();
commandParamsCmd.putAll(commandParams);
cmd.setCommandParams(commandParamsCmd);
//Try to get hostParams from cache and merge them with command-level parameters
Map<String, String> hostParams = hostParamsStageCache.getIfPresent(stagePk);
if (hostParams == null) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
hostParams = StageUtils.getGson().fromJson(s.getHostParamsStage(), type);
hostParamsStageCache.put(stagePk, hostParams);
}
Map<String, String> hostParamsCmd = cmd.getHostLevelParams();
hostParamsCmd.putAll(hostParams);
cmd.setHostLevelParams(hostParamsCmd);
commandsToUpdate.add(cmd);
}
/**
* @param requestId request will be cancelled on next scheduler wake up
* (if it is in state that allows cancellation, e.g. QUEUED, PENDING, IN_PROGRESS)
* @param reason why request is being cancelled
*/
public void scheduleCancellingRequest(long requestId, String reason) {
synchronized (requestsToBeCancelled) {
requestsToBeCancelled.add(requestId);
requestCancelReasons.put(requestId, reason);
}
}
/**
* Aborts all stages that belong to requests that are being cancelled
*/
private void processCancelledRequestsList() {
synchronized (requestsToBeCancelled) {
// Now, cancel stages completely
for (Long requestId : requestsToBeCancelled) {
List<HostRoleCommand> tasksToDequeue = db.getRequestTasks(requestId);
String reason = requestCancelReasons.get(requestId);
cancelHostRoleCommands(tasksToDequeue, reason);
List<Stage> stages = db.getAllStages(requestId);
for (Stage stage : stages) {
abortOperationsForStage(stage);
}
}
requestsToBeCancelled.clear();
requestCancelReasons.clear();
}
}
/**
* Cancels host role commands (those that are not finished yet).
* Dequeues host role commands that have been added to ActionQueue,
* and automatically generates and adds to ActionQueue CANCEL_COMMANDs
* for all hostRoleCommands that have already been sent to an agent for
* execution.
* @param hostRoleCommands a list of hostRoleCommands
* @param reason why the request is being cancelled
*/
void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED) {
// Dequeue all tasks that have been already scheduled for sending to agent
actionQueue.dequeue(hostRoleCommand.getHostName(),
hostRoleCommand.getExecutionCommandWrapper().
getExecutionCommand().getCommandId());
}
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED ||
hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason(reason);
actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
}
if (hostRoleCommand.getStatus().isHoldingState()) {
db.abortHostRole(hostRoleCommand.getHostName(),
hostRoleCommand.getRequestId(),
hostRoleCommand.getStageId(), hostRoleCommand.getRole().name());
}
// If host role is an Action, we have to send an event
if (hostRoleCommand.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = hostRoleCommand.getExecutionCommandWrapper().getExecutionCommand().getClusterName();
processActionDeath(clusterName,
hostRoleCommand.getHostName(),
hostRoleCommand.getRole().name());
}
}
}
/**
* Attempts to process kill/timeout/abort of action and send
* appropriate event to all listeners
*/
private void processActionDeath(String clusterName,
String hostname,
String role) {
try {
// Usually clusterId is defined (except the awkward case when
// "Distribute repositories/install packages" action has been issued
// against a concrete host without binding to a cluster)
Long clusterId = clusterName != null ?
clusters.getCluster(clusterName).getClusterId() : null;
ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
clusterId, hostname, null,
role);
ambariEventPublisher.publish(event);
} catch (AmbariException e) {
LOG.error(String.format("Can not get cluster %s", clusterName), e);
}
}
private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
switch (status) {
case COMPLETED:
rs.numSucceeded++;
break;
case FAILED:
rs.numFailed++;
break;
case QUEUED:
rs.numQueued++;
break;
case PENDING:
rs.numPending++;
break;
case TIMEDOUT:
rs.numTimedOut++;
break;
case ABORTED:
rs.numAborted++;
break;
case IN_PROGRESS:
rs.numInProgress++;
break;
case HOLDING:
case HOLDING_FAILED:
case HOLDING_TIMEDOUT:
rs.numHolding++;
break;
case SKIPPED_FAILED:
rs.numSkipped++;
break;
default:
LOG.error("Unknown status " + status.name());
}
}
public void setTaskTimeoutAdjustment(boolean val) {
taskTimeoutAdjustment = val;
}
ServerActionExecutor getServerActionExecutor() {
return serverActionExecutor;
}
static class RoleStats {
int numInProgress;
int numQueued = 0;
int numSucceeded = 0;
int numFailed = 0;
int numTimedOut = 0;
int numPending = 0;
int numAborted = 0;
int numHolding = 0;
int numSkipped = 0;
final int totalHosts;
final float successFactor;
RoleStats(int total, float successFactor) {
totalHosts = total;
this.successFactor = successFactor;
}
/**
* Role successful means the role is successful enough to
*/
boolean isSuccessFactorMet() {
int minSuccessNeeded = (int) Math.ceil(successFactor * totalHosts);
return minSuccessNeeded <= numSucceeded;
}
private boolean isRoleInProgress() {
return numPending + numQueued + numInProgress + numHolding > 0;
}
/**
* Role failure means role is no longer in progress and success factor is
* not met.
*/
boolean isRoleFailed() {
return !(isRoleInProgress() || isSuccessFactorMet());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("numQueued=").append(numQueued);
builder.append(", numInProgress=").append(numInProgress);
builder.append(", numSucceeded=").append(numSucceeded);
builder.append(", numFailed=").append(numFailed);
builder.append(", numTimedOut=").append(numTimedOut);
builder.append(", numPending=").append(numPending);
builder.append(", numAborted=").append(numAborted);
builder.append(", numSkipped=").append(numSkipped);
builder.append(", totalHosts=").append(totalHosts);
builder.append(", successFactor=").append(successFactor);
return builder.toString();
}
}
}