blob: 735a774e670a1ba0b9f36f3173ad4c0c8955931e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.persistence.EntityManager;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
import org.apache.ambari.server.events.publishers.AgentCommandsPublisher;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.JPAEventPublisher;
import org.apache.ambari.server.metadata.RoleCommandOrder;
import org.apache.ambari.server.metadata.RoleCommandOrderProvider;
import org.apache.ambari.server.metadata.RoleCommandPair;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.serveraction.ServerActionExecutor;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.eventbus.Subscribe;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.google.inject.persist.UnitOfWork;
/**
* This class encapsulates the action scheduler thread.
* Action schedule frequently looks at action database and determines if
* there is an action that can be scheduled.
*/
@Singleton
class ActionScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ActionScheduler.class);
public static final String FAILED_TASK_ABORT_REASONING =
"Server considered task failed and automatically aborted it";
@Inject
private RoleCommandOrderProvider roleCommandOrderProvider;
@Inject
private UnitOfWork unitOfWork;
@Inject
private Clusters clusters;
@Inject
private AmbariEventPublisher ambariEventPublisher;
@Inject
private HostsMap hostsMap;
@Inject
private Configuration configuration;
@Inject
Provider<EntityManager> entityManagerProvider;
/**
* Used for turning instances of {@link HostRoleCommandEntity} into
* {@link HostRoleCommand}.
*/
@Inject
private HostRoleCommandFactory hostRoleCommandFactory;
/**
* Used for retrieving {@link HostRoleCommandEntity} instances which need to
* be cancelled.
*/
@Inject
private HostRoleCommandDAO hostRoleCommandDAO;
@Inject
private AgentCommandsPublisher agentCommandsPublisher;
/**
* The current thread's reference to the {@link EntityManager}.
*/
volatile EntityManager threadEntityManager;
private final long actionTimeout;
private final long sleepTime;
private volatile boolean shouldRun = true;
private Thread schedulerThread = null;
private final ActionDBAccessor db;
private short maxAttempts = 2;
private final JPAEventPublisher jpaPublisher;
private boolean taskTimeoutAdjustment = true;
private final Object wakeupSyncObject = new Object();
private final ServerActionExecutor serverActionExecutor;
private final Set<Long> requestsInProgress = new HashSet<>();
/**
* Contains request ids that have been scheduled to be cancelled,
* but are not cancelled yet
*/
private final Set<Long> requestsToBeCancelled =
Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
/**
* Maps request IDs to reasoning for cancelling request.
* Map is NOT synchronized, so any access to it should synchronize on
* requestsToBeCancelled object
*/
private final Map<Long, String> requestCancelReasons =
new HashMap<>();
/**
* true if scheduler should run ASAP.
* We need this flag to avoid sleep in situations, when
* we receive awake() request during running a scheduler iteration.
*/
private boolean activeAwakeRequest = false;
private AtomicBoolean taskStatusLoaded = new AtomicBoolean();
//Cache for clusterHostinfo, key - stageId-requestId
private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
private Cache<String, Map<String, String>> commandParamsStageCache;
private Cache<String, Map<String, String>> hostParamsStageCache;
/**
* Guice-injected Constructor.
*
* @param sleepTime
* @param actionTimeout
* @param db
* @param jpaPublisher
*/
@Inject
public ActionScheduler(@Named("schedulerSleeptime") long sleepTime,
@Named("actionTimeout") long actionTimeout, ActionDBAccessor db,
JPAEventPublisher jpaPublisher) {
this.sleepTime = sleepTime;
this.actionTimeout = actionTimeout;
this.db = db;
this.jpaPublisher = jpaPublisher;
this.jpaPublisher.register(this);
serverActionExecutor = new ServerActionExecutor(db, sleepTime);
initializeCaches();
}
/**
* Unit Test Constructor.
* @param sleepTimeMilliSec
* @param actionTimeoutMilliSec
* @param db
* @param fsmObject
* @param maxAttempts
* @param hostsMap
* @param unitOfWork
* @param ambariEventPublisher
* @param configuration
* @param entityManagerProvider
* @param hostRoleCommandDAO
* @param hostRoleCommandFactory
* @param roleCommandOrderProvider
*/
protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db,
Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
RoleCommandOrderProvider roleCommandOrderProvider, AgentCommandsPublisher agentCommandsPublisher) {
sleepTime = sleepTimeMilliSec;
actionTimeout = actionTimeoutMilliSec;
this.db = db;
clusters = fsmObject;
this.maxAttempts = (short) maxAttempts;
this.hostsMap = hostsMap;
this.unitOfWork = unitOfWork;
this.ambariEventPublisher = ambariEventPublisher;
this.configuration = configuration;
this.entityManagerProvider = entityManagerProvider;
this.hostRoleCommandDAO = hostRoleCommandDAO;
this.hostRoleCommandFactory = hostRoleCommandFactory;
jpaPublisher = null;
this.roleCommandOrderProvider = roleCommandOrderProvider;
this.agentCommandsPublisher = agentCommandsPublisher;
serverActionExecutor = new ServerActionExecutor(db, sleepTime);
initializeCaches();
}
/**
* Unit Test Constructor.
*
* @param sleepTimeMilliSec
* @param actionTimeoutMilliSec
* @param db
* @param fsmObject
* @param maxAttempts
* @param hostsMap
* @param unitOfWork
* @param ambariEventPublisher
* @param configuration
* @param hostRoleCommandDAO
* @param hostRoleCommandFactory
*/
protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db,
Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
Configuration configuration, Provider<EntityManager> entityManagerProvider,
HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory,
AgentCommandsPublisher agentCommandsPublisher) {
this(sleepTimeMilliSec, actionTimeoutMilliSec, db, fsmObject, maxAttempts, hostsMap, unitOfWork,
ambariEventPublisher, configuration, entityManagerProvider, hostRoleCommandDAO, hostRoleCommandFactory,
null, agentCommandsPublisher);
}
/**
* Initializes the caches.
*/
private void initializeCaches() {
clusterHostInfoCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
commandParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
hostParamsStageCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
}
public void start() {
schedulerThread = new Thread(this, "ambari-action-scheduler");
schedulerThread.start();
// Start up the ServerActionExecutor. Since it is directly related to the ActionScheduler it
// should be started and stopped along with it.
serverActionExecutor.start();
}
public void stop() {
shouldRun = false;
schedulerThread.interrupt();
// Stop the ServerActionExecutor. Since it is directly related to the ActionScheduler it should
// be started and stopped along with it.
serverActionExecutor.stop();
}
/**
* Should be called from another thread when we want scheduler to
* make a run ASAP (for example, to process desired configs of SCHs).
* The method is guaranteed to return quickly.
*/
public void awake() {
synchronized (wakeupSyncObject) {
activeAwakeRequest = true;
wakeupSyncObject.notify();
}
}
@Override
public void run() {
while (shouldRun) {
try {
synchronized (wakeupSyncObject) {
if (!activeAwakeRequest) {
wakeupSyncObject.wait(sleepTime);
}
activeAwakeRequest = false;
}
doWork();
} catch (InterruptedException ex) {
LOG.warn("Scheduler thread is interrupted going to stop", ex);
shouldRun = false;
} catch (Exception ex) {
LOG.warn("Exception received", ex);
requestsInProgress.clear();
} catch (Throwable t) {
LOG.warn("ERROR", t);
requestsInProgress.clear();
}
}
}
public void doWork() throws AmbariException {
try {
unitOfWork.begin();
// grab a reference to this UnitOfWork's EM
threadEntityManager = entityManagerProvider.get();
// The first thing to do is to abort requests that are cancelled
processCancelledRequestsList();
// !!! getting the stages in progress could be a very expensive call due
// to the join being used; there's no need to make it if there are
// no commands in progress
if (db.getCommandsInProgressCount() == 0) {
// Nothing to do
if (LOG.isDebugEnabled()) {
LOG.debug("There are no stages currently in progress.");
}
return;
}
Set<Long> runningRequestIds = new HashSet<>();
List<Stage> firstStageInProgressPerRequest = db.getFirstStageInProgressPerRequest();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduler wakes up");
LOG.debug("Processing {} in progress stages", firstStageInProgressPerRequest.size());
}
publishInProgressTasks(firstStageInProgressPerRequest);
if (firstStageInProgressPerRequest.isEmpty()) {
// Nothing to do
if (LOG.isDebugEnabled()) {
LOG.debug("There are no stages currently in progress.");
}
return;
}
int i_stage = 0;
// get the range of requests in progress
long iLowestRequestIdInProgress = firstStageInProgressPerRequest.get(0).getRequestId();
long iHighestRequestIdInProgress = firstStageInProgressPerRequest.get(
firstStageInProgressPerRequest.size() - 1).getRequestId();
List<String> hostsWithPendingTasks = hostRoleCommandDAO.getHostsWithPendingTasks(
iLowestRequestIdInProgress, iHighestRequestIdInProgress);
// filter the stages in progress down to those which can be scheduled in
// parallel
List<Stage> stages = filterParallelPerHostStages(firstStageInProgressPerRequest);
boolean exclusiveRequestIsGoing = false;
// This loop greatly depends on the fact that order of stages in
// a list does not change between invocations
for (Stage stage : stages) {
// Check if we can process this stage in parallel with another stages
i_stage++;
long requestId = stage.getRequestId();
LOG.debug("==> STAGE_i = {}(requestId={},StageId={})", i_stage, requestId, stage.getStageId());
RequestEntity request = db.getRequestEntity(requestId);
if (request.isExclusive()) {
if (runningRequestIds.size() > 0) {
// As a result, we will wait until any previous stages are finished
LOG.debug("Stage requires exclusive execution, but other requests are already executing. Stopping for now");
break;
}
exclusiveRequestIsGoing = true;
}
if (runningRequestIds.contains(requestId)) {
// We don't want to process different stages from the same request in parallel
LOG.debug("==> We don't want to process different stages from the same request in parallel");
continue;
} else {
runningRequestIds.add(requestId);
if (!requestsInProgress.contains(requestId)) {
requestsInProgress.add(requestId);
db.startRequest(requestId);
}
}
// Commands that will be scheduled in current scheduler wakeup
List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
Multimap<Long, AgentCommand> commandsToEnqueue = ArrayListMultimap.create();
Map<String, RoleStats> roleStats =
processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
// Check if stage is failed
boolean failed = false;
for (Map.Entry<String, RoleStats> entry : roleStats.entrySet()) {
String role = entry.getKey();
RoleStats stats = entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Stats for role: {}, stats={}", role, stats);
}
// only fail the request if the role failed and the stage is not
// skippable
if (stats.isRoleFailed() && !stage.isSkippable()) {
LOG.warn("{} failed, request {} will be aborted", role, request.getRequestId());
failed = true;
break;
}
}
if (!failed) {
// Prior stage may have failed and it may need to fail the whole request
failed = hasPreviousStageFailed(stage);
}
if (failed) {
LOG.error("Operation completely failed, aborting request id: {}", stage.getRequestId());
cancelHostRoleCommands(stage.getOrderedHostRoleCommands(), FAILED_TASK_ABORT_REASONING);
abortOperationsForStage(stage);
return;
}
List<ExecutionCommand> commandsToStart = new ArrayList<>();
List<ExecutionCommand> commandsToUpdate = new ArrayList<>();
//Schedule what we have so far
for (ExecutionCommand cmd : commandsToSchedule) {
ConfigHelper.processHiddenAttribute(cmd.getConfigurations(), cmd.getConfigurationAttributes(), cmd.getRole(), false);
processHostRole(request, stage, cmd, commandsToStart, commandsToUpdate);
}
LOG.debug("==> Commands to start: {}", commandsToStart.size());
LOG.debug("==> Commands to update: {}", commandsToUpdate.size());
//Multimap is analog of Map<Object, List<Object>> but allows to avoid nested loop
ListMultimap<String, ServiceComponentHostEvent> eventMap = formEventMap(stage, commandsToStart);
Map<ExecutionCommand, String> commandsToAbort = new HashMap<>();
if (!eventMap.isEmpty()) {
LOG.debug("==> processing {} serviceComponentHostEvents...", eventMap.size());
Cluster cluster = clusters.getCluster(stage.getClusterName());
if (cluster != null) {
Map<ServiceComponentHostEvent, String> failedEvents = cluster.processServiceComponentHostEvents(eventMap);
if (failedEvents.size() > 0) {
LOG.error("==> {} events failed.", failedEvents.size());
}
for (Iterator<ExecutionCommand> iterator = commandsToUpdate.iterator(); iterator.hasNext(); ) {
ExecutionCommand cmd = iterator.next();
for (ServiceComponentHostEvent event : failedEvents.keySet()) {
if (StringUtils.equals(event.getHostName(), cmd.getHostname()) &&
StringUtils.equals(event.getServiceComponentName(), cmd.getRole())) {
iterator.remove();
commandsToAbort.put(cmd, failedEvents.get(event));
break;
}
}
}
} else {
LOG.warn("There was events to process but cluster {} not found", stage.getClusterName());
}
}
LOG.debug("==> Scheduling {} tasks...", commandsToUpdate.size());
db.bulkHostRoleScheduled(stage, commandsToUpdate);
if (commandsToAbort.size() > 0) { // Code branch may be a bit slow, but is extremely rarely used
LOG.debug("==> Aborting {} tasks...", commandsToAbort.size());
// Build a list of HostRoleCommands
List<Long> taskIds = new ArrayList<>();
for (ExecutionCommand command : commandsToAbort.keySet()) {
taskIds.add(command.getTaskId());
}
Collection<HostRoleCommand> hostRoleCommands = db.getTasks(taskIds);
cancelHostRoleCommands(hostRoleCommands, FAILED_TASK_ABORT_REASONING);
db.bulkAbortHostRole(stage, commandsToAbort);
}
LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
for (ExecutionCommand cmd : commandsToUpdate) {
// Do not queue up server actions; however if we encounter one, wake up the ServerActionExecutor
if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
serverActionExecutor.awake();
} else {
commandsToEnqueue.put(clusters.getHost(cmd.getHostname()).getHostId(), cmd);
}
}
if (!commandsToEnqueue.isEmpty()) {
agentCommandsPublisher.sendAgentCommand(commandsToEnqueue);
}
LOG.debug("==> Finished.");
if (!configuration.getParallelStageExecution()) { // If disabled
return;
}
if (exclusiveRequestIsGoing) {
// As a result, we will prevent any further stages from being executed
LOG.debug("Stage requires exclusive execution, skipping all executing any further stages");
break;
}
}
requestsInProgress.retainAll(runningRequestIds);
} finally {
LOG.debug("Scheduler finished work.");
unitOfWork.end();
}
}
/**
* publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap}
* and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup.
* This is required as some tasks may have been in progress when server was last stopped
* @param stages list of stages
*/
private void publishInProgressTasks(List<Stage> stages) {
if (taskStatusLoaded.compareAndSet(false, true)) {
if (!stages.isEmpty()) {
Function<Stage, Long> transform = new Function<Stage, Long>() {
@Override
public Long apply(Stage stage) {
return stage.getRequestId();
}
};
Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform));
List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID);
hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands);
}
}
}
/**
* Returns filtered list of stages such that the returned list is an ordered
* list of stages that may be executed in parallel or in the order in which
* they are presented.
* <p/>
* The specified stages must be ordered by request ID and may only contain the
* next stage in progress per request (as returned by
* {@link ActionDBAccessor#getFirstStageInProgressPerRequest()}. This is
* because there is a requirement that within a request, no two stages may
* ever run in parallel.
* <p/>
* The following rules will be applied to the list:
* <ul>
* <li>Stages are filtered such that the first stage in the list (assumed to
* be the first pending stage from the earliest active request) has priority.
* </li>
* <li>No stage in any request may be executed before an earlier stage in the
* same request. This requirement is automatically covered by virtue of the
* supplied stages only being for the next stage in progress per request.</li>
* <li>A stage in different request may be performed in parallel
* if-and-only-if the relevant hosts for the stage in the later requests do
* not intersect with the union of hosts from (pending) stages in earlier
* requests. In order to accomplish this</li>
* </ul>
*
* @param firstStageInProgressPerRequest
* the stages to process, one stage per request
* @return a list of stages that may be executed in parallel
*/
private List<Stage> filterParallelPerHostStages(List<Stage> firstStageInProgressPerRequest) {
// if there's only 1 stage in progress in 1 request, simply return that stage
if (firstStageInProgressPerRequest.size() == 1) {
return firstStageInProgressPerRequest;
}
List<Stage> retVal = new ArrayList<>();
// set the lower range (inclusive) of requests to limit the query a bit
// since there can be a LOT of commands
long lowerRequestIdInclusive = firstStageInProgressPerRequest.get(0).getRequestId();
// determine if this stage can be scheduled in parallel with the other
// stages from other requests
for (Stage stage : firstStageInProgressPerRequest) {
long requestId = stage.getRequestId();
if (LOG.isTraceEnabled()) {
LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, stage.getStageId(), stage.getRequestContext());
}
boolean addStage = true;
// there are at least 2 request in progress concurrently; determine which
// hosts are affected
HashSet<String> hostsInProgressForEarlierRequests = new HashSet<>(
hostRoleCommandDAO.getBlockingHostsForRequest(lowerRequestIdInclusive, requestId));
// Iterate over the relevant hosts for this stage to see if any intersect with the set of
// hosts needed for previous stages. If any intersection occurs, this stage may not be
// executed in parallel.
for (String host : stage.getHosts()) {
LOG.trace("===> Processing Host {}", host);
if (hostsInProgressForEarlierRequests.contains(host)) {
if (LOG.isTraceEnabled()) {
LOG.trace("===> Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", stage.getRequestId(), stage.getStageId(), stage.getRequestContext());
}
addStage = false;
break;
}
}
// add the stage is no other prior stages for prior requests intersect the
// hosts in this stage
if (addStage) {
if (LOG.isTraceEnabled()) {
LOG.trace("===> Adding stage to return value: {}/{} ({})", stage.getRequestId(), stage.getStageId(), stage.getRequestContext());
}
retVal.add(stage);
}
}
return retVal;
}
private boolean hasPreviousStageFailed(Stage stage) {
boolean failed = false;
long prevStageId = stage.getStageId() - 1;
if (prevStageId >= 0) {
// Find previous stage instance
String actionId = StageUtils.getActionId(stage.getRequestId(), prevStageId);
Stage prevStage = db.getStage(actionId);
// If the previous stage is skippable then we shouldn't automatically fail the given stage
if (prevStage == null || prevStage.isSkippable()) {
return false;
}
Map<Role, Integer> hostCountsForRoles = new HashMap<>();
Map<Role, Integer> failedHostCountsForRoles = new HashMap<>();
for (String host : prevStage.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = prevStage.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
failedHostCountsForRoles.put(c.getRole(), 0);
}
int hostCount = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), hostCount + 1);
if (c.getStatus().isFailedAndNotSkippableState()) {
int failedHostCount = failedHostCountsForRoles.get(c.getRole());
failedHostCountsForRoles.put(c.getRole(), failedHostCount + 1);
}
}
}
for (Role role : hostCountsForRoles.keySet()) {
float failedHosts = failedHostCountsForRoles.get(role);
float totalHosts = hostCountsForRoles.get(role);
if (((totalHosts - failedHosts) / totalHosts) < prevStage.getSuccessFactor(role)) {
failed = true;
}
}
}
return failed;
}
/**
* This method processes command timeouts and retry attempts, and
* adds new (pending) execution commands to commandsToSchedule list.
* @return the stats for the roles in the stage which are used to determine
* whether stage has succeeded or failed
*/
protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
long now = System.currentTimeMillis();
Set<RoleCommandPair> rolesCommandsInProgress = s.getHostRolesInProgress();
Cluster cluster = null;
if (null != s.getClusterName()) {
cluster = clusters.getCluster(s.getClusterName());
}
for (String host : s.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
Host hostObj = null;
try {
hostObj = clusters.getHost(host);
} catch (AmbariException e) {
LOG.debug("Host {} not found, stage is likely a server side action", host);
}
int i_my = 0;
LOG.trace("===>host={}", host);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
i_my++;
if (LOG.isTraceEnabled()) {
LOG.trace("Host task {}) id = {} status = {} (role={}), roleCommand = {}", i_my, c.getTaskId(), status, roleStr, c.getRoleCommand());
}
boolean hostDeleted = false;
if (null != cluster) {
Service svc = null;
if (c.getServiceName() != null && !c.getServiceName().isEmpty()) {
svc = cluster.getService(c.getServiceName());
}
ServiceComponent svcComp = null;
Map<String, ServiceComponentHost> scHosts = null;
try {
if (svc != null) {
svcComp = svc.getServiceComponent(roleStr);
scHosts = svcComp.getServiceComponentHosts();
}
} catch (ServiceComponentNotFoundException scnex) {
String msg = String.format(
"%s is not not a service component, assuming its an action",
roleStr);
LOG.debug(msg);
}
hostDeleted = (scHosts != null && !scHosts.containsKey(host));
if (hostDeleted) {
String message = String.format(
"Host component information has not been found. Details:" +
"cluster=%s; host=%s; service=%s; component=%s; ",
c.getClusterName(), host,
svcComp == null ? "null" : svcComp.getServiceName(),
svcComp == null ? "null" : svcComp.getName());
LOG.warn(message);
}
}
//basic timeout for stage
long commandTimeout = actionTimeout;
if (taskTimeoutAdjustment) {
Map<String, String> commandParams = c.getCommandParams();
String timeoutKey = ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
if (commandParams != null && commandParams.containsKey(timeoutKey)) {
String timeoutStr = commandParams.get(timeoutKey);
commandTimeout += Long.parseLong(timeoutStr) * 1000; // Converting to milliseconds
} else {
LOG.error("Execution command has no timeout parameter" +
c);
}
}
// Check that service host component is not deleted
boolean isHostStateUnknown = false;
if (hostDeleted) {
String message = String.format(
"Host not found when trying to schedule an execution command. " +
"The most probable reason for that is that host or host component " +
"has been deleted recently. The command has been aborted and dequeued." +
"Execution command details: " +
"cmdId: %s; taskId: %s; roleCommand: %s",
c.getCommandId(), c.getTaskId(), c.getRoleCommand());
LOG.warn("Host {} has been detected as non-available. {}", host, message);
// Abort the command itself
// We don't need to send CANCEL_COMMANDs in this case
db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)
|| (isHostStateUnknown = isHostStateUnknown(s, hostObj, roleStr))) {
// Process command timeouts
if (s.getAttemptCount(host, roleStr) >= maxAttempts || isHostStateUnknown) {
LOG.warn("Host: {}, role: {}, actionId: {} expired and will be failed", host, roleStr,
s.getActionId());
// determine if the task should be auto skipped
boolean isSkipSupported = s.isAutoSkipOnFailureSupported();
HostRoleCommand hostRoleCommand = s.getHostRoleCommand(c.getTaskId());
if (isSkipSupported && null != hostRoleCommand) {
isSkipSupported = hostRoleCommand.isFailureAutoSkipped();
}
db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), isSkipSupported, isHostStateUnknown);
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
if (null != cluster) {
if (!RoleCommand.CUSTOM_COMMAND.equals(c.getRoleCommand())
&& !RoleCommand.SERVICE_CHECK.equals(c.getRoleCommand())
&& !RoleCommand.ACTIONEXECUTE.equals(c.getRoleCommand())) {
//commands above don't affect host component state (e.g. no in_progress state in process), transition will fail
transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false);
}
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
}
// Dequeue command
LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
} else {
cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue);
LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host,
roleStr, s.getActionId());
// reschedule command
commandsToSchedule.add(c);
LOG.trace("===> commandsToSchedule(reschedule)={}", commandsToSchedule.size());
}
} else if (status.equals(HostRoleStatus.PENDING)) {
// in case of DEPENDENCY_ORDERED stage command can be scheduled only if all of it's dependencies are
// already finished
if (CommandExecutionType.STAGE == s.getCommandExecutionType() ||
(CommandExecutionType.DEPENDENCY_ORDERED == s.getCommandExecutionType() &&
CommandExecutionType.DEPENDENCY_ORDERED == configuration.getStageExecutionType() &&
areCommandDependenciesFinished(c, s, rolesCommandsInProgress))) {
//Need to schedule first time
commandsToSchedule.add(c);
LOG.trace("===>commandsToSchedule(first_time)={}", commandsToSchedule.size());
}
}
updateRoleStats(status, roleStats.get(roleStr));
if (status == HostRoleStatus.FAILED) {
LOG.info("Role {} on host {} was failed", roleStr, host);
}
}
}
LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
return roleStats;
}
/**
* Returns true if all command dependencies are already finished (not IN_PROGRESS states).
* @param command
* @param stage
* @param rolesCommandsInProgress
* @return
*/
private boolean areCommandDependenciesFinished(ExecutionCommand command, Stage stage, Set<RoleCommandPair>
rolesCommandsInProgress) {
boolean areCommandDependenciesFinished = true;
RoleCommandOrder rco = roleCommandOrderProvider.getRoleCommandOrder(stage.getClusterId());
if (rco != null) {
RoleCommandPair roleCommand = new
RoleCommandPair(Role.valueOf(command.getRole()), command.getRoleCommand());
Set<RoleCommandPair> roleCommandDependencies = rco.getDependencies().get(roleCommand);
// check if there are any dependencies IN_PROGRESS
if (roleCommandDependencies != null) {
// remove eventual references to the same RoleCommand
roleCommandDependencies.remove(roleCommand);
if (CollectionUtils.containsAny(rolesCommandsInProgress, roleCommandDependencies)) {
areCommandDependenciesFinished = false;
}
}
}
return areCommandDependenciesFinished;
}
/**
* Generate a OPFailed event before aborting all operations in the stage
* @param stage
*/
private void abortOperationsForStage(Stage stage) {
long now = System.currentTimeMillis();
for (String hostName : stage.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers =
stage.getExecutionCommands(hostName);
for(ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
transitionToFailedState(stage.getClusterName(), c.getServiceName(),
c.getRole(), hostName, now, true);
}
}
Collection<HostRoleCommandEntity> abortedOperations = db.abortOperation(stage.getRequestId());
for (HostRoleCommandEntity command: abortedOperations) {
if (command.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = stage.getClusterName();
processActionDeath(clusterName, command.getHostName(), command.getRole().name());
}
}
}
/**
* Raise a OPFailed event for a SCH
* @param clusterName
* @param serviceName
* @param componentName
* @param hostname
* @param timestamp
*/
private void transitionToFailedState(String clusterName, String serviceName,
String componentName, String hostname,
long timestamp,
boolean ignoreTransitionException) {
try {
Cluster cluster = clusters.getCluster(clusterName);
ServiceComponentHostOpFailedEvent failedEvent =
new ServiceComponentHostOpFailedEvent(componentName,
hostname, timestamp);
if (serviceName != null && ! serviceName.isEmpty() &&
componentName != null && ! componentName.isEmpty()) {
Service svc = cluster.getService(serviceName);
ServiceComponent svcComp = svc.getServiceComponent(componentName);
ServiceComponentHost svcCompHost =
svcComp.getServiceComponentHost(hostname);
svcCompHost.handleEvent(failedEvent);
} else {
LOG.info("Service name is " + serviceName + ", component name is " + componentName +
"skipping sending ServiceComponentHostOpFailedEvent for " + componentName);
}
} catch (ServiceComponentNotFoundException scnex) {
LOG.debug("{} associated with service {} is not a service component, assuming it's an action.", componentName, serviceName);
} catch (ServiceComponentHostNotFoundException e) {
String msg = String.format("Service component host %s not found, " +
"unable to transition to failed state.", componentName);
LOG.warn(msg, e);
} catch (InvalidStateTransitionException e) {
if (ignoreTransitionException) {
LOG.debug("Unable to transition to failed state.", e);
} else {
LOG.warn("Unable to transition to failed state.", e);
}
} catch (AmbariException e) {
LOG.warn("Unable to transition to failed state.", e);
}
}
/**
* Populates a map < role_name, role_stats>.
*/
private Map<String, RoleStats> initRoleStats(Stage s) {
// Meaning: how many hosts are affected by commands for each role
Map<Role, Integer> hostCountsForRoles = new HashMap<>();
// < role_name, rolestats >
Map<String, RoleStats> roleStats = new TreeMap<>();
for (String host : s.getHostRoleCommands().keySet()) {
Map<String, HostRoleCommand> roleCommandMap = s.getHostRoleCommands().get(host);
for (String role : roleCommandMap.keySet()) {
HostRoleCommand c = roleCommandMap.get(role);
if (hostCountsForRoles.get(c.getRole()) == null) {
hostCountsForRoles.put(c.getRole(), 0);
}
int val = hostCountsForRoles.get(c.getRole());
hostCountsForRoles.put(c.getRole(), val + 1);
}
}
for (Role r : hostCountsForRoles.keySet()) {
RoleStats stats = new RoleStats(hostCountsForRoles.get(r),
s.getSuccessFactor(r));
roleStats.put(r.name(), stats);
}
return roleStats;
}
/**
* Checks if ambari-agent was restarted during role command execution
* @param host the host with ambari-agent to check
* @param stage the stage
* @param role the role to check
* @return {@code true} if ambari-agent was restarted
*/
protected boolean wasAgentRestartedDuringOperation(Host host, Stage stage, String role) {
String hostName = host.getHostName();
long taskStartTime = stage.getHostRoleCommand(hostName, role).getStartTime();
long lastAgentStartTime = host.getLastAgentStartTime();
return taskStartTime > 0 && lastAgentStartTime > 0 && taskStartTime <= lastAgentStartTime;
}
/**
* Checks if timeout is required.
* @param status the status of the current role
* @param stage the stage
* @param host the host object; can be {@code null} for server-side tasks
* @param role the role
* @param currentTime the current
* @param taskTimeout the amount of time to determine timeout
* @return {@code true} if timeout is needed
* @throws AmbariException
*/
protected boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
Host host, String role, long currentTime, long taskTimeout) throws
AmbariException {
if (( !status.equals(HostRoleStatus.QUEUED) ) &&
( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
return false;
}
// tasks are held in a variety of in-memory maps that require a hostname key
// host being null is ok - that means it's a server-side task
String hostName = (null == host) ? null : host.getHostName();
// If we have other command in progress for this stage do not timeout this one
if (hasCommandInProgress(stage, hostName)
&& !status.equals(HostRoleStatus.IN_PROGRESS)) {
return false;
}
if (currentTime >= stage.getLastAttemptTime(hostName, role)
+ taskTimeout) {
return true;
}
return false;
}
private boolean isHostStateUnknown(Stage stage, Host host, String role) {
if (null != host &&
(host.getState().equals(HostState.HEARTBEAT_LOST) || wasAgentRestartedDuringOperation(host, stage, role))) {
LOG.debug("Abort action since agent is not heartbeating or agent was restarted.");
return true;
}
return false;
}
private boolean hasCommandInProgress(Stage stage, String host) {
List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = stage.getHostRoleStatus(host, roleStr);
if (status == HostRoleStatus.IN_PROGRESS) {
return true;
}
}
return false;
}
private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage s, List<ExecutionCommand> commands) {
ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = ArrayListMultimap.create();
for (ExecutionCommand cmd : commands) {
String hostname = cmd.getHostname();
String roleStr = cmd.getRole();
if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
serviceEventMap.put(cmd.getServiceName(), s.getFsmEvent(hostname, roleStr).getEvent());
}
}
return serviceEventMap;
}
private void processHostRole(RequestEntity r, Stage s, ExecutionCommand cmd, List<ExecutionCommand> commandsToStart,
List<ExecutionCommand> commandsToUpdate)
throws AmbariException {
long now = System.currentTimeMillis();
String roleStr = cmd.getRole();
String hostname = cmd.getHostname();
// start time is -1 if host role command is not started yet
if (s.getStartTime(hostname, roleStr) < 0) {
commandsToStart.add(cmd);
s.setStartTime(hostname,roleStr, now);
s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
}
s.setLastAttemptTime(hostname, roleStr, now);
s.incrementAttemptCount(hostname, roleStr);
String requestPK = r.getRequestId().toString();
String stagePk = s.getStageId() + "-" + s.getRequestId();
// Try to get clusterHostInfo from cache
Map<String, Set<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(requestPK);
if (clusterHostInfo == null) {
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
clusterHostInfo = StageUtils.getGson().fromJson(r.getClusterHostInfo(), type);
clusterHostInfoCache.put(requestPK, clusterHostInfo);
}
cmd.setClusterHostInfo(clusterHostInfo);
// Try to get commandParams from cache and merge them with command-level parameters
Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
if (commandParams == null){
Type type = new TypeToken<Map<String, String>>() {}.getType();
commandParams = StageUtils.getGson().fromJson(s.getCommandParamsStage(), type);
commandParamsStageCache.put(stagePk, commandParams);
}
Map<String, String> commandParamsCmd = cmd.getCommandParams();
commandParamsCmd.putAll(commandParams);
cmd.setCommandParams(commandParamsCmd);
try {
Cluster cluster = clusters.getCluster(s.getClusterName());
if (null != cluster) {
// Generate localComponents
for (ServiceComponentHost sch : cluster.getServiceComponentHosts(hostname)) {
cmd.getLocalComponents().add(sch.getServiceComponentName());
}
}
} catch (ClusterNotFoundException cnfe) {
// NOP
}
// Try to get hostParams from cache and merge them with command-level parameters
Map<String, String> hostParams = hostParamsStageCache.getIfPresent(stagePk);
if (hostParams == null) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
hostParams = StageUtils.getGson().fromJson(s.getHostParamsStage(), type);
hostParamsStageCache.put(stagePk, hostParams);
}
Map<String, String> hostParamsCmd = cmd.getHostLevelParams();
hostParamsCmd.putAll(hostParams);
cmd.setHostLevelParams(hostParamsCmd);
// change the hostname in the command for the host itself
cmd.setHostname(hostsMap.getHostMap(hostname));
commandsToUpdate.add(cmd);
}
/**
* @param requestId request will be cancelled on next scheduler wake up
* (if it is in state that allows cancellation, e.g. QUEUED, PENDING, IN_PROGRESS)
* @param reason why request is being cancelled
*/
public void scheduleCancellingRequest(long requestId, String reason) {
synchronized (requestsToBeCancelled) {
requestsToBeCancelled.add(requestId);
requestCancelReasons.put(requestId, reason);
}
}
/**
* Aborts all stages that belong to requests that are being cancelled
*/
private void processCancelledRequestsList() throws AmbariException {
synchronized (requestsToBeCancelled) {
// Now, cancel stages completely
for (Long requestId : requestsToBeCancelled) {
// only pull back entities that have not completed; pulling back all
// entities for the request can cause OOM problems on large requests,
// like those for upgrades
List<HostRoleCommandEntity> entitiesToDequeue = hostRoleCommandDAO.findByRequestIdAndStatuses(
requestId, HostRoleStatus.NOT_COMPLETED_STATUSES);
if (!entitiesToDequeue.isEmpty()) {
List<HostRoleCommand> tasksToDequeue = new ArrayList<>(entitiesToDequeue.size());
for (HostRoleCommandEntity hrcEntity : entitiesToDequeue) {
HostRoleCommand task = hostRoleCommandFactory.createExisting(hrcEntity);
tasksToDequeue.add(task);
}
String reason = requestCancelReasons.get(requestId);
cancelHostRoleCommands(tasksToDequeue, reason);
}
// abort any stages in progress that belong to this request; don't execute this for all stages since
// that could lead to OOM errors on large requests, like those for
// upgrades
List<Stage> stagesInProgress = db.getStagesInProgressForRequest(requestId);
for (Stage stageInProgress : stagesInProgress) {
abortOperationsForStage(stageInProgress);
}
}
requestsToBeCancelled.clear();
requestCancelReasons.clear();
}
}
/**
* Cancels host role commands (those that are not finished yet).
* Dequeues host role commands that have been added to ActionQueue,
* and automatically generates and adds to ActionQueue CANCEL_COMMANDs
* for all hostRoleCommands that have already been sent to an agent for
* execution.
* @param hostRoleCommands a list of hostRoleCommands
* @param reason why the request is being cancelled
*/
void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) throws AmbariException {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
// There are no server actions in actionQueue
if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED ||
hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason(reason);
agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostId(), cancelCommand);
}
}
if (hostRoleCommand.getStatus().isHoldingState()) {
db.abortHostRole(hostRoleCommand.getHostName(),
hostRoleCommand.getRequestId(),
hostRoleCommand.getStageId(), hostRoleCommand.getRole().name());
}
// If host role is an Action, we have to send an event
if (hostRoleCommand.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
String clusterName = hostRoleCommand.getExecutionCommandWrapper().getExecutionCommand().getClusterName();
processActionDeath(clusterName,
hostRoleCommand.getHostName(),
hostRoleCommand.getRole().name());
}
}
}
void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<Long, AgentCommand> commandsToEnqueue) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
// There are no server actions in actionQueue
if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED ||
hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) {
CancelCommand cancelCommand = new CancelCommand();
cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
cancelCommand.setReason("Stage timeout");
commandsToEnqueue.put(hostRoleCommand.getHostId(), cancelCommand);
}
}
}
}
/**
* Attempts to process kill/timeout/abort of action and send
* appropriate event to all listeners
*/
private void processActionDeath(String clusterName,
String hostname,
String role) {
try {
// Usually clusterId is defined (except the awkward case when
// "Distribute repositories/install packages" action has been issued
// against a concrete host without binding to a cluster)
Long clusterId = clusterName != null ?
clusters.getCluster(clusterName).getClusterId() : null;
CommandReport report = new CommandReport();
report.setRole(role);
report.setStdOut("Action is dead");
report.setStdErr("Action is dead");
report.setStructuredOut("{}");
report.setExitCode(1);
report.setStatus(HostRoleStatus.ABORTED.toString());
ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
clusterId, hostname, report, true);
ambariEventPublisher.publish(event);
} catch (AmbariException e) {
LOG.error(String.format("Can not get cluster %s", clusterName), e);
}
}
private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
switch (status) {
case COMPLETED:
rs.numSucceeded++;
break;
case FAILED:
rs.numFailed++;
break;
case QUEUED:
rs.numQueued++;
break;
case PENDING:
rs.numPending++;
break;
case TIMEDOUT:
rs.numTimedOut++;
break;
case ABORTED:
rs.numAborted++;
break;
case IN_PROGRESS:
rs.numInProgress++;
break;
case HOLDING:
case HOLDING_FAILED:
case HOLDING_TIMEDOUT:
rs.numHolding++;
break;
case SKIPPED_FAILED:
rs.numSkipped++;
break;
default:
LOG.error("Unknown status " + status.name());
}
}
public void setTaskTimeoutAdjustment(boolean val) {
taskTimeoutAdjustment = val;
}
ServerActionExecutor getServerActionExecutor() {
return serverActionExecutor;
}
/**
* Handles {@link EntityManagerCacheInvalidationEvent} instances and instructs
* the thread running this scheduler to evict instances from the
* {@link EntityManager}.
*
* @param event
* the event to handle (not {@code null}).
*/
@Subscribe
public void onEvent(EntityManagerCacheInvalidationEvent event) {
try {
if (null != threadEntityManager && threadEntityManager.isOpen()) {
threadEntityManager.clear();
}
} catch (Throwable throwable) {
LOG.error("Unable to clear the EntityManager for the scheduler thread", throwable);
}
}
static class RoleStats {
int numInProgress;
int numQueued = 0;
int numSucceeded = 0;
int numFailed = 0;
int numTimedOut = 0;
int numPending = 0;
int numAborted = 0;
int numHolding = 0;
int numSkipped = 0;
final int totalHosts;
final float successFactor;
RoleStats(int total, float successFactor) {
totalHosts = total;
this.successFactor = successFactor;
}
/**
* Role successful means the role is successful enough to
*/
boolean isSuccessFactorMet() {
int minSuccessNeeded = (int) Math.ceil(successFactor * totalHosts);
return minSuccessNeeded <= numSucceeded;
}
private boolean isRoleInProgress() {
return numPending + numQueued + numInProgress + numHolding > 0;
}
/**
* Role failure means role is no longer in progress and success factor is
* not met.
*/
boolean isRoleFailed() {
return !(isRoleInProgress() || isSuccessFactorMet());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("numQueued=").append(numQueued);
builder.append(", numInProgress=").append(numInProgress);
builder.append(", numSucceeded=").append(numSucceeded);
builder.append(", numFailed=").append(numFailed);
builder.append(", numTimedOut=").append(numTimedOut);
builder.append(", numPending=").append(numPending);
builder.append(", numAborted=").append(numAborted);
builder.append(", numSkipped=").append(numSkipped);
builder.append(", totalHosts=").append(totalHosts);
builder.append(", successFactor=").append(successFactor);
return builder.toString();
}
}
}