blob: 1a055b3170709b73d8924c6590e095b84ab0d87a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
import org.apache.ambari.annotations.TransactionalLock.LockType;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.audit.AuditLogger;
import org.apache.ambari.server.audit.event.AuditEvent;
import org.apache.ambari.server.audit.event.OperationStatusAuditEvent;
import org.apache.ambari.server.audit.event.TaskStatusAuditEvent;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.HostsRemovedEvent;
import org.apache.ambari.server.events.RequestFinishedEvent;
import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
import org.apache.ambari.server.orm.dao.RequestScheduleDAO;
import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO;
import org.apache.ambari.server.orm.dao.StageDAO;
import org.apache.ambari.server.orm.entities.ClusterEntity;
import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.topology.TopologyManager;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.google.inject.persist.Transactional;
@Singleton
public class ActionDBAccessorImpl implements ActionDBAccessor {
private static final Logger LOG = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
private long requestId;
@Inject
ClusterDAO clusterDAO;
@Inject
HostDAO hostDAO;
@Inject
RequestDAO requestDAO;
@Inject
StageDAO stageDAO;
@Inject
HostRoleCommandDAO hostRoleCommandDAO;
@Inject
ExecutionCommandDAO executionCommandDAO;
@Inject
RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
@Inject
StageFactory stageFactory;
@Inject
RequestFactory requestFactory;
@Inject
HostRoleCommandFactory hostRoleCommandFactory;
@Inject
Clusters clusters;
@Inject
RequestScheduleDAO requestScheduleDAO;
@Inject
Configuration configuration;
@Inject
AmbariEventPublisher ambariEventPublisher;
@Inject
TaskEventPublisher taskEventPublisher;
@Inject
AuditLogger auditLogger;
@Inject
STOMPUpdatePublisher STOMPUpdatePublisher;
@Inject
TopologyManager topologyManager;
/**
* Cache for auditlog. It stores a {@link RequestDetails} object for every requests.
* {@link RequestDetails} contains the previous status of the request and a map for tasks.
* A task has a {@link RequestDetails.Component} key and the value of that is the previous status of the task.
* {@link RequestDetails.Component} contains the component name and the host name
*/
private Cache<Long,RequestDetails> auditlogRequestCache = CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES).concurrencyLevel(4).build();
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
@Inject
public ActionDBAccessorImpl(@Named("executionCommandCacheSize") long cacheLimit,
AmbariEventPublisher eventPublisher) {
this.cacheLimit = cacheLimit;
hostRoleCommandCache = CacheBuilder.newBuilder().
expireAfterAccess(5, TimeUnit.MINUTES).
build();
eventPublisher.register(this);
}
@Inject
void init() {
requestId = stageDAO.getLastRequestId();
}
/* (non-Javadoc)
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getStage(java.lang.String)
*/
@Override
public Stage getStage(String actionId) {
StageEntity stageEntity = stageDAO.findByActionId(actionId);
return stageEntity == null ? null : stageFactory.createExisting(stageEntity);
}
/* (non-Javadoc)
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAllStages(java.lang.String)
*/
@Override
public List<Stage> getAllStages(long requestId) {
List<StageEntity> stageEntities = stageDAO.findByRequestId(requestId);
List<Stage> stages = new ArrayList<>(stageEntities.size());
for (StageEntity stageEntity : stageEntities ){
stages.add(stageFactory.createExisting(stageEntity));
}
return stages;
}
@Override
public RequestEntity getRequestEntity(long requestId) {
return requestDAO.findByPK(requestId);
}
@Override
public Request getRequest(long requestId) {
RequestEntity requestEntity = getRequestEntity(requestId);
if (requestEntity != null) {
return requestFactory.createExisting(requestEntity);
} else {
return null;
}
}
/**
* {@inheritDoc}
*/
@Override
public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
Collection<HostRoleCommandEntity> abortedHostRoleCommands = Collections.emptyList();
long now = System.currentTimeMillis();
// only request commands which actually need to be aborted; requesting all
// commands here can cause OOM problems during large requests like upgrades
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
HostRoleStatus.SCHEDULED_STATES);
for (HostRoleCommandEntity command : commands) {
command.setStatus(HostRoleStatus.ABORTED);
command.setEndTime(now);
LOG.info("Aborting command. Hostname " + command.getHostName()
+ " role " + command.getRole()
+ " requestId " + command.getRequestId()
+ " taskId " + command.getTaskId()
+ " stageId " + command.getStageId());
auditLog(command, requestId);
}
// no need to merge if there's nothing to merge
if (!commands.isEmpty()) {
abortedHostRoleCommands = hostRoleCommandDAO.mergeAll(commands);
}
endRequest(requestId);
return abortedHostRoleCommands;
}
/* (non-Javadoc)
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#timeoutHostRole(long, long, org.apache.ambari.server.Role)
*/
@Override
public void timeoutHostRole(String host, long requestId, long stageId,
String role) {
timeoutHostRole(host, requestId, stageId, role, false, false);
}
@Override
public void timeoutHostRole(String host, long requestId, long stageId, String role,
boolean skipSupported, boolean hostUnknownState) {
long now = System.currentTimeMillis();
List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
if (skipSupported) {
command.setStatus(HostRoleStatus.SKIPPED_FAILED);
} else {
command.setStatus(command.isRetryAllowed() ? HostRoleStatus.HOLDING_TIMEDOUT :
hostUnknownState ? HostRoleStatus.ABORTED : HostRoleStatus.TIMEDOUT);
}
command.setEndTime(now);
auditLog(command, requestId);
}
// no need to merge if there's nothing to merge
if (!commands.isEmpty()) {
hostRoleCommandDAO.mergeAll(commands);
}
endRequestIfCompleted(requestId);
}
/**
* {@inheritDoc}
*/
@Override
public List<Stage> getStagesInProgressForRequest(Long requestId) {
List<StageEntity> stageEntities = stageDAO.findByRequestIdAndCommandStatuses(requestId, HostRoleStatus.IN_PROGRESS_STATUSES);
return getStagesForEntities(stageEntities);
}
/**
* {@inheritDoc}
*/
@Override
public List<Stage> getFirstStageInProgressPerRequest() {
List<StageEntity> stageEntities = stageDAO.findFirstStageByStatus(
HostRoleStatus.IN_PROGRESS_STATUSES);
List<Stage> stages = new ArrayList<>(stageEntities.size());
for (StageEntity stageEntity : stageEntities) {
stages.add(stageFactory.createExisting(stageEntity));
}
return stages;
}
private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) {
List<Stage> stages = new ArrayList<>(stageEntities.size());
for (StageEntity stageEntity : stageEntities) {
stages.add(stageFactory.createExisting(stageEntity));
}
return stages;
}
/**
* {@inheritDoc}
*/
@Override
public int getCommandsInProgressCount() {
Number count = hostRoleCommandDAO.getCountByStatus(HostRoleStatus.IN_PROGRESS_STATUSES);
if (null == count) {
return 0;
}
return count.intValue();
}
@Override
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public void persistActions(Request request) throws AmbariException {
RequestEntity requestEntity = request.constructNewPersistenceEntity();
Long clusterId = -1L;
String clusterName = null;
Long requestId = requestEntity.getRequestId();
ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId());
if (clusterEntity != null) {
clusterId = clusterEntity.getClusterId();
clusterName = clusterEntity.getClusterName();
}
requestEntity.setClusterId(clusterId);
requestDAO.create(requestEntity);
//TODO wire request to cluster
List<StageEntity> stageEntities = new ArrayList<>(request.getStages().size());
addRequestToAuditlogCache(request);
List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
for (Stage stage : request.getStages()) {
StageEntity stageEntity = stage.constructNewPersistenceEntity();
Long stageId = stageEntity.getStageId();
stageEntities.add(stageEntity);
stageEntity.setClusterId(clusterId);
stageEntity.setRequest(requestEntity);
stageDAO.create(stageEntity);
List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands();
List<HostRoleCommandEntity> hostRoleCommandEntities = new ArrayList<>();
for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) {
hostRoleCommand.setRequestId(requestId);
hostRoleCommand.setStageId(stageId);
HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
hostRoleCommandEntity.setStage(stageEntity);
hostRoleCommandDAO.create(hostRoleCommandEntity);
hostRoleCommandEntities.add(hostRoleCommandEntity);
hostRoleCommand.setTaskId(hostRoleCommandEntity.getTaskId());
String prefix = "";
String output = "output-" + hostRoleCommandEntity.getTaskId() + ".txt";
String error = "errors-" + hostRoleCommandEntity.getTaskId() + ".txt";
HostEntity hostEntity = null;
if (null != hostRoleCommandEntity.getHostId()) {
hostEntity = hostDAO.findById(hostRoleCommandEntity.getHostId());
if (hostEntity == null) {
String msg = String.format("Host %s doesn't exist in database", hostRoleCommandEntity.getHostName());
LOG.error(msg);
throw new AmbariException(msg);
}
hostRoleCommandEntity.setHostEntity(hostEntity);
try {
// Get the in-memory host object and its prefix to construct the output and error log paths.
Host hostObject = clusters.getHost(hostEntity.getHostName());
if (!StringUtils.isBlank(hostObject.getPrefix())) {
prefix = hostObject.getPrefix();
if (!prefix.endsWith("/")) {
prefix = prefix + "/";
}
}
} catch (AmbariException e) {
LOG.warn("Exception in getting prefix for host and setting output and error log files. Using no prefix");
}
}
hostRoleCommand.setOutputLog(prefix + output);
hostRoleCommand.setErrorLog(prefix + error);
hostRoleCommandEntity.setOutputLog(hostRoleCommand.getOutputLog());
hostRoleCommandEntity.setErrorLog(hostRoleCommand.getErrorLog());
ExecutionCommandEntity executionCommandEntity = hostRoleCommand.constructExecutionCommandEntity();
executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
executionCommandEntity.setTaskId(hostRoleCommandEntity.getTaskId());
hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity);
if (null != hostEntity) {
hostEntity = hostDAO.merge(hostEntity);
}
hostRoleCommands.add(hostRoleCommand);
}
for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
}
stageEntity.setHostRoleCommands(hostRoleCommandEntities);
stageEntity = stageDAO.merge(stageEntity);
}
requestEntity.setStages(stageEntities);
requestDAO.merge(requestEntity);
TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
taskEventPublisher.publish(taskCreateEvent);
List<HostRoleCommandEntity> hostRoleCommandEntities = hostRoleCommandDAO.findByRequest(requestEntity.getRequestId());
// "requests" STOMP topic is used for clusters related requests only.
// Requests without clusters (like host checks) should be posted to divided topic.
if (clusterName != null) {
STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
hostRoleCommandDAO, topologyManager, clusterName, hostRoleCommandEntities));
} else {
LOG.debug("No STOMP request update event was fired for new request due no cluster related, " +
"request id: {}, command name: {}",
requestEntity.getRequestId(),
requestEntity.getCommandName());
}
}
@Override
@Transactional
public void startRequest(long requestId) {
RequestEntity requestEntity = getRequestEntity(requestId);
if (requestEntity != null && requestEntity.getStartTime() == -1L) {
requestEntity.setStartTime(System.currentTimeMillis());
requestDAO.merge(requestEntity);
}
}
@Override
@Transactional
public void endRequest(long requestId) {
RequestEntity requestEntity = getRequestEntity(requestId);
if (requestEntity != null && requestEntity.getEndTime() == -1L) {
requestEntity.setEndTime(System.currentTimeMillis());
requestDAO.merge(requestEntity);
ambariEventPublisher.publish(new RequestFinishedEvent(requestEntity.getClusterId(), requestId));
}
}
public void endRequestIfCompleted(long requestId) {
if (requestDAO.isAllTasksCompleted(requestId)) {
endRequest(requestId);
}
}
@Override
@Transactional
public void setSourceScheduleForRequest(long requestId, long scheduleId) {
RequestEntity requestEntity = requestDAO.findByPK(requestId);
if (requestEntity != null) {
RequestScheduleEntity scheduleEntity = requestScheduleDAO.findById(scheduleId);
if (scheduleEntity != null) {
requestEntity.setRequestScheduleEntity(scheduleEntity);
//we may want to break entity graph here for perf purposes (when list size is too large)
scheduleEntity.getRequestEntities().add(requestEntity);
requestDAO.merge(requestEntity);
requestScheduleDAO.merge(scheduleEntity);
} else {
String message = String.format("Request Schedule with id=%s not found", scheduleId);
LOG.error(message);
throw new RuntimeException(message);
}
} else {
String message = String.format("Request with id=%s not found", scheduleId);
LOG.error(message);
throw new RuntimeException(message);
}
}
@Override
public void updateHostRoleStates(Collection<CommandReport> reports) {
Map<Long, CommandReport> taskReports = new HashMap<>();
for (CommandReport report : reports) {
taskReports.put(report.getTaskId(), report);
}
long now = System.currentTimeMillis();
List<Long> requestsToCheck = new ArrayList<>();
List<Long> abortedCommandUpdates = new ArrayList<>();
List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<>();
for (HostRoleCommandEntity commandEntity : commandEntities) {
CommandReport report = taskReports.get(commandEntity.getTaskId());
HostRoleStatus existingTaskStatus = commandEntity.getStatus();
HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus());
if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) {
// if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED
if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) {
reportedTaskStatus = HostRoleStatus.HOLDING_FAILED;
// tasks can be marked as skipped when they fail
if (commandEntity.isFailureAutoSkipped()) {
reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED;
}
}
// if TIMEOUT and marked for holding then set status = HOLDING_TIMEOUT
if (reportedTaskStatus == HostRoleStatus.TIMEDOUT && commandEntity.isRetryAllowed()){
reportedTaskStatus = HostRoleStatus.HOLDING_TIMEDOUT;
}
if (!existingTaskStatus.isCompletedState()) {
commandEntity.setStatus(reportedTaskStatus);
}
commandEntity.setStdOut(report.getStdOut() == null ? null : report.getStdOut().getBytes());
commandEntity.setStdError(report.getStdErr() == null ? null : report.getStdErr().getBytes());
commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
report.getStructuredOut().getBytes());
commandEntity.setExitcode(report.getExitCode());
if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
commandEntity.setEndTime(now);
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
long requestId = requestStageIds[0];
long stageId = requestStageIds[1];
auditLog(commandEntity, requestId);
if (requestDAO.getLastStageId(requestId).equals(stageId)) {
requestsToCheck.add(requestId);
}
}
commandEntitiesToMerge.add(commandEntity);
} else {
LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " +
"agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
}
}
// no need to merge if there's nothing to merge
if (!commandEntitiesToMerge.isEmpty()) {
hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
}
for (Long requestId : requestsToCheck) {
endRequestIfCompleted(requestId);
}
}
@Override
public void updateHostRoleState(String hostname, long requestId,
long stageId, String role, CommandReport report) {
boolean checkRequest = false;
if (LOG.isDebugEnabled()) {
LOG.debug("Update HostRoleState: HostName {} requestId {} stageId {} role {} report {}", hostname, requestId, stageId, role, report);
}
long now = System.currentTimeMillis();
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
hostname, requestId, stageId, role);
for (HostRoleCommandEntity command : commands) {
HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus());
// if FAILED and marked for holding then set status = HOLDING_FAILED
if (status == HostRoleStatus.FAILED && command.isRetryAllowed()) {
status = HostRoleStatus.HOLDING_FAILED;
// tasks can be marked as skipped when they fail
if (command.isFailureAutoSkipped()) {
status = HostRoleStatus.SKIPPED_FAILED;
}
}
// if TIMEOUT and marked for holding then set status = HOLDING_TIMEOUT
if (status == HostRoleStatus.TIMEDOUT && command.isRetryAllowed()){
status = HostRoleStatus.HOLDING_TIMEDOUT;
}
command.setStatus(status);
command.setStdOut(report.getStdOut().getBytes());
command.setStdError(report.getStdErr().getBytes());
command.setStructuredOut(report.getStructuredOut() == null ? null :
report.getStructuredOut().getBytes());
if (HostRoleStatus.getCompletedStates().contains(command.getStatus())) {
command.setEndTime(now);
if (requestDAO.getLastStageId(requestId).equals(stageId)) {
checkRequest = true;
}
}
command.setExitcode(report.getExitCode());
auditLog(command, requestId);
}
// no need to merge if there's nothing to merge
if (!commands.isEmpty()) {
hostRoleCommandDAO.mergeAll(commands);
}
if (checkRequest) {
endRequestIfCompleted(requestId);
}
}
@Override
public void abortHostRole(String host, long requestId, long stageId, String role) {
String reason = String.format("On host %s role %s in invalid state.", host, role);
abortHostRole(host, requestId, stageId, role, reason);
}
@Override
public void abortHostRole(String host, long requestId, long stageId, String role, String reason) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr(reason);
report.setStdOut("");
report.setStatus("ABORTED");
updateHostRoleState(host, requestId, stageId, role, report);
}
@Override
public long getLastPersistedRequestIdWhenInitialized() {
return requestId;
}
@Override
@Transactional
public void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands) {
for (ExecutionCommand command : commands) {
hostRoleScheduled(s, command.getHostname(), command.getRole());
}
}
@Override
@Transactional
public void bulkAbortHostRole(Stage s, Map<ExecutionCommand, String> commands) {
for (ExecutionCommand command : commands.keySet()) {
String reason = String.format("On host %s role %s in invalid state.\n%s",
command.getHostname(), command.getRole(), commands.get(command));
abortHostRole(command.getHostname(), s.getRequestId(), s.getStageId(), command.getRole(), reason);
}
}
@Override
@Transactional
public void hostRoleScheduled(Stage s, String hostname, String roleStr) {
HostRoleCommand hostRoleCommand = s.getHostRoleCommand(hostname, roleStr);
HostRoleCommandEntity entity = hostRoleCommandDAO.findByPK(hostRoleCommand.getTaskId());
if (entity != null) {
entity.setStartTime(hostRoleCommand.getStartTime());
if (entity.getOriginalStartTime() == null || entity.getOriginalStartTime() == -1) {
entity.setOriginalStartTime(System.currentTimeMillis());
}
entity.setLastAttemptTime(hostRoleCommand.getLastAttemptTime());
entity.setStatus(hostRoleCommand.getStatus());
entity.setAttemptCount(hostRoleCommand.getAttemptCount());
auditLog(entity, s.getRequestId());
hostRoleCommandDAO.merge(entity);
} else {
throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
}
}
@Override
public List<HostRoleCommand> getRequestTasks(long requestId) {
return getTasks(hostRoleCommandDAO.findTaskIdsByRequest(requestId));
}
@Override
public List<HostRoleCommand> getAllTasksByRequestIds(Collection<Long> requestIds) {
if (requestIds.isEmpty()) {
return Collections.emptyList();
}
return getTasks(
hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
);
}
@Override
public List<HostRoleCommand> getTasks(Collection<Long> taskIds) {
if (taskIds.isEmpty()) {
return Collections.emptyList();
}
List<HostRoleCommand> commands = new ArrayList<>();
Map<Long, HostRoleCommand> cached = hostRoleCommandCache.getAllPresent(taskIds);
commands.addAll(cached.values());
List<Long> absent = new ArrayList<>();
absent.addAll(taskIds);
absent.removeAll(cached.keySet());
if (!absent.isEmpty()) {
boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);
commands.add(hostRoleCommand);
if (allowStore) {
switch (hostRoleCommand.getStatus()) {
case ABORTED:
case COMPLETED:
case TIMEDOUT:
case FAILED:
hostRoleCommandCache.put(hostRoleCommand.getTaskId(), hostRoleCommand);
break;
}
}
}
}
Collections.sort(commands, new Comparator<HostRoleCommand>() {
@Override
public int compare(HostRoleCommand o1, HostRoleCommand o2) {
return (int) (o1.getTaskId()-o2.getTaskId());
}
});
return commands;
}
@Override
public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
return getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role, status));
}
@Override
public List<HostRoleCommand> getTasksByRoleAndStatus(String role, HostRoleStatus status) {
return getTasks(hostRoleCommandDAO.findTaskIdsByRoleAndStatus(role, status));
}
@Override
public HostRoleCommand getTask(long taskId) {
HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) taskId);
if (commandEntity == null) {
return null;
}
return hostRoleCommandFactory.createExisting(commandEntity);
}
@Override
public List<Long> getRequestsByStatus(RequestStatus status, int maxResults,
boolean ascOrder) {
if (null == status) {
return requestDAO.findAllRequestIds(maxResults, ascOrder);
}
EnumSet<HostRoleStatus> taskStatuses = null;
switch( status ){
case IN_PROGRESS:
taskStatuses = HostRoleStatus.IN_PROGRESS_STATUSES;
break;
case FAILED:
taskStatuses = HostRoleStatus.FAILED_STATUSES;
break;
case COMPLETED:
// !!! COMPLETED is special as all tasks in the request must be
// completed
return hostRoleCommandDAO.getCompletedRequests(maxResults, ascOrder);
}
return hostRoleCommandDAO.getRequestsByTaskStatus(taskStatuses, maxResults,
ascOrder);
}
@Override
public Map<Long, String> getRequestContext(List<Long> requestIds) {
return stageDAO.findRequestContext(requestIds);
}
@Override
public String getRequestContext(long requestId) {
return stageDAO.findRequestContext(requestId);
}
@Override
public List<Request> getRequests(Collection<Long> requestIds) {
List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
List<Request> requests = new ArrayList<>(requestEntities.size());
for (RequestEntity requestEntity : requestEntities) {
requests.add(requestFactory.createExisting(requestEntity));
}
return requests;
}
@Override
public void resubmitTasks(List<Long> taskIds) {
List<HostRoleCommandEntity> tasks = hostRoleCommandDAO.findByPKs(taskIds);
Set<RequestEntity> requestEntities = new HashSet<>();
Set<StageEntity> stageEntities = new HashSet<>();
for (HostRoleCommandEntity task : tasks) {
StageEntity stage = task.getStage();
stage.setStatus(HostRoleStatus.PENDING);
stageEntities.add(stage);
RequestEntity request = stage.getRequest();
request.setStatus(HostRoleStatus.IN_PROGRESS);
requestEntities.add(request);
task.setStatus(HostRoleStatus.PENDING);
// TODO HACK, shouldn't reset start time.
// Because it expects -1, RetryActionMonitor.java also had to set it to -1.
task.setStartTime(-1L);
task.setEndTime(-1L);
auditLog(task, task.getRequestId());
}
for (StageEntity stageEntity : stageEntities) {
stageDAO.merge(stageEntity);
}
for (RequestEntity requestEntity : requestEntities) {
requestDAO.merge(requestEntity);
}
// no need to merge if there's nothing to merge
if (!tasks.isEmpty()) {
hostRoleCommandDAO.mergeAll(tasks);
}
hostRoleCommandCache.invalidateAll(taskIds);
}
/**
* Invalidate cached HostRoleCommands if a host is deleted.
* @param event @HostRemovedEvent
*/
@Subscribe
public void invalidateCommandCacheOnHostRemove(HostsRemovedEvent event) {
LOG.info("Invalidating HRC cache after receiveing {}", event);
hostRoleCommandCache.invalidateAll();
}
/**
* Updates auditlog cache and returns the status of the latest task for the given component on the given host.
* @param commandEntity new entity with the new status. It also holds the component and the hostname
* @param requestId
* @return
*/
private HostRoleStatus updateAuditlogCache(HostRoleCommandEntity commandEntity, Long requestId) {
RequestDetails details = auditlogRequestCache.getIfPresent(requestId);
if(details == null) {
return null;
}
RequestDetails.Component component = new RequestDetails.Component(commandEntity.getRole(), commandEntity.getHostName());
HostRoleStatus lastTaskStatus = null;
if(details.getTasks().containsKey(component)) {
lastTaskStatus = details.getTasks().get(component);
}
details.getTasks().put(component, commandEntity.getStatus());
return lastTaskStatus;
}
/**
* Adds request to auditlog cache
* @param request
*/
private void addRequestToAuditlogCache(Request request) {
if(!auditLogger.isEnabled()) {
return;
}
if(auditlogRequestCache.getIfPresent(request.getRequestId()) == null) {
int numberOfTasks = 0;
for (Stage stage : request.getStages()) {
numberOfTasks += stage.getOrderedHostRoleCommands().size();
}
RequestDetails requestDetails = new RequestDetails();
requestDetails.setNumberOfTasks(numberOfTasks);
requestDetails.setUserName(AuthorizationHelper.getAuthenticatedName());
auditlogRequestCache.put(request.getRequestId(), requestDetails);
}
}
/**
* AuditLog operation status change
* @param requestId
*/
private void auditLog(HostRoleCommandEntity commandEntity, Long requestId) {
if(!auditLogger.isEnabled()) {
return;
}
if(requestId != null) {
HostRoleStatus lastTaskStatus = updateAuditlogCache(commandEntity, requestId);
// details must not be null
RequestDetails details = auditlogRequestCache.getIfPresent(requestId);
if (details != null) {
HostRoleStatus calculatedStatus = calculateStatus(requestId, details.getNumberOfTasks());
if (details.getLastStatus() != calculatedStatus) {
RequestEntity request = requestDAO.findByPK(requestId);
String context = request != null ? request.getRequestContext() : null;
AuditEvent auditEvent = OperationStatusAuditEvent.builder()
.withRequestId(String.valueOf(requestId))
.withStatus(String.valueOf(calculatedStatus))
.withRequestContext(context)
.withUserName(details.getUserName())
.withTimestamp(System.currentTimeMillis())
.build();
auditLogger.log(auditEvent);
details.setLastStatus(calculatedStatus);
}
}
logTask(commandEntity, requestId, lastTaskStatus);
}
}
/**
* Calculates summary status for the given request
* @param requestId
* @return
*/
private HostRoleStatus calculateStatus(Long requestId, int numberOfTasks) {
RequestDetails details = auditlogRequestCache.getIfPresent(requestId);
if(details == null) {
return HostRoleStatus.QUEUED;
}
Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses();
return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false);
}
/**
* Logs task status change
* @param commandEntity
* @param requestId
* @param lastTaskStatus
*/
private void logTask(HostRoleCommandEntity commandEntity, Long requestId, HostRoleStatus lastTaskStatus) {
RequestDetails.Component component = new RequestDetails.Component(commandEntity.getRole(), commandEntity.getHostName());
RequestDetails details = auditlogRequestCache.getIfPresent(requestId);
if(details == null) {
return;
}
HostRoleStatus cachedStatus = details.getTasks().get(component);
if(lastTaskStatus == null || cachedStatus != lastTaskStatus ) {
AuditEvent taskEvent = TaskStatusAuditEvent.builder()
.withTaskId(String.valueOf(commandEntity.getTaskId()))
.withHostName(commandEntity.getHostName())
.withUserName(details.getUserName())
.withOperation(commandEntity.getRoleCommand() + " " + commandEntity.getRole())
.withDetails(commandEntity.getCommandDetail())
.withStatus(commandEntity.getStatus().toString())
.withRequestId(String.valueOf(requestId))
.withTimestamp(System.currentTimeMillis())
.build();
auditLogger.log(taskEvent);
}
}
/**
* The purpose of this nested class is to store details about the request: lastStatus and the list of tasks
*/
private static class RequestDetails {
/**
* Last summary status of the request
*/
HostRoleStatus lastStatus = null;
/**
* The number of tasks that can be found in the request
*/
int numberOfTasks = 0;
/**
* The user who issued the request
*/
String userName;
/**
* Component and the status of the belonging task
* Size of this container might be less than {@link RequestDetails#numberOfTasks}
*/
Map<Component, HostRoleStatus> tasks = new HashMap<>();
public HostRoleStatus getLastStatus() {
return lastStatus;
}
public void setLastStatus(HostRoleStatus lastStatus) {
this.lastStatus = lastStatus;
}
public int getNumberOfTasks() {
return numberOfTasks;
}
public void setNumberOfTasks(int numberOfTasks) {
this.numberOfTasks = numberOfTasks;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public Map<Component, HostRoleStatus> getTasks() {
return tasks;
}
/**
* Returns the list of statuses from {@link RequestDetails#tasks}
* @return
*/
public Collection<HostRoleStatus> getTaskStatuses() {
return getTasks().values();
}
/**
* This nested class is the key for the {@link RequestDetails#tasks} map
*/
static class Component {
/**
* Component name
*/
private final Role role;
/**
* Host name
*/
private final String hostName;
Component(Role role, String hostName) {
this.role = role;
this.hostName = hostName;
}
public Role getRole() {
return role;
}
public String getHostName() {
return hostName;
}
/**
* Hash code generation
* @return
*/
@Override
public final int hashCode() {
int hash = 7;
String roleStr = role == null ? "null" : role.toString();
String hostNameStr = hostName == null ? "null" : hostName;
String str = roleStr.concat(hostNameStr);
for (int i = 0; i < str.length(); i++) {
hash = hash*31 + str.charAt(i);
}
return hash;
}
/**
* Two components are equal if their component name and host name are the same
* @param other
* @return
*/
@Override
public final boolean equals(final Object other) {
if(other instanceof Component) {
Component comp = (Component) other;
return Objects.equals(comp.role, role) && Objects.equals(comp.hostName, hostName);
}
return false;
}
}
}
}