blob: b7352572b1c682fc23b67d820e5e166cf871ed42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.orm.dao;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Order;
import javax.persistence.metamodel.SingularAttribute;
import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
import org.apache.ambari.annotations.TransactionalLock.LockType;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.api.query.JpaPredicateVisitor;
import org.apache.ambari.server.api.query.JpaSortBuilder;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.spi.PageRequest;
import org.apache.ambari.server.controller.spi.Predicate;
import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.utilities.PredicateHelper;
import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.TaskUpdateEvent;
import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.TransactionalLocks;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity_;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.eclipse.persistence.config.HintValues;
import org.eclipse.persistence.config.QueryHints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.google.inject.persist.Transactional;
@Singleton
public class HostRoleCommandDAO {
private static final Logger LOG = LoggerFactory.getLogger(HostRoleCommandDAO.class);
private static final String SUMMARY_DTO = String.format(
"SELECT NEW %s(" +
"MAX(hrc.stage.skippable), " +
"MIN(hrc.startTime), " +
"MAX(hrc.endTime), " +
"hrc.stageId, " +
"SUM(CASE WHEN hrc.status = :aborted THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :completed THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :failed THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :holding THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :holding_failed THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :holding_timedout THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :in_progress THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :pending THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :queued THEN 1 ELSE 0 END), " +
"SUM(CASE WHEN hrc.status = :timedout THEN 1 ELSE 0 END)," +
"SUM(CASE WHEN hrc.status = :skipped_failed THEN 1 ELSE 0 END)" +
") FROM HostRoleCommandEntity hrc " +
" GROUP BY hrc.requestId, hrc.stageId HAVING hrc.requestId = :requestId",
HostRoleCommandStatusSummaryDTO.class.getName());
/**
* SQL template to get requests that have at least one task in any of the
* specified statuses.
*/
private static final String REQUESTS_BY_TASK_STATUS_SQL = "SELECT DISTINCT task.requestId FROM HostRoleCommandEntity task WHERE task.status IN :taskStatuses ORDER BY task.requestId {0}";
/**
* SQL template to get all requests which have had all of their tasks
* COMPLETED
*/
private static final String COMPLETED_REQUESTS_SQL = "SELECT DISTINCT task.requestId FROM HostRoleCommandEntity task WHERE task.requestId NOT IN (SELECT task.requestId FROM HostRoleCommandEntity task WHERE task.status IN :notCompletedStatuses) ORDER BY task.requestId {0}";
/**
* A cache that holds {@link HostRoleCommandStatusSummaryDTO} grouped by stage
* id for requests by request id. The JPQL computing the host role command
* status summary for a request is rather expensive thus this cache helps
* reducing the load on the database.
* <p/>
* Methods which interact with this cache, including invalidation and
* population, should use the {@link TransactionalLock} annotation along with
* the {@link LockArea#HRC_STATUS_CACHE}. This will prevent stale data from
* being read during a transaction which has updated a
* {@link HostRoleCommandEntity}'s {@link HostRoleStatus} but has not
* committed yet.
* <p/>
* This cache cannot be a {@link LoadingCache} since there is an inherent
* problem with concurrency of reloads. Namely, if the entry has been read
* during a load, but not yet put into the cache and another invalidation is
* registered. The old value would eventually make it into the cache and the
* last invalidation would not invalidate anything since the cache was empty
* at the time.
*/
private final Cache<Long, Map<Long, HostRoleCommandStatusSummaryDTO>> hrcStatusSummaryCache;
/**
* Specifies whether caching for {@link HostRoleCommandStatusSummaryDTO} grouped by stage id for requests
* is enabled.
*/
private final boolean hostRoleCommandStatusSummaryCacheEnabled;
@Inject
private Provider<EntityManager> entityManagerProvider;
@Inject
private DaoUtils daoUtils;
@Inject
private Configuration configuration;
@Inject
HostRoleCommandFactory hostRoleCommandFactory;
@Inject
private TaskEventPublisher taskEventPublisher;
/**
* Used to ensure that methods which rely on the completion of
* {@link Transactional} can detect when they are able to run.
*
* @see TransactionalLock
*/
@Inject
private final TransactionalLocks transactionLocks = null;
public final static String HRC_STATUS_SUMMARY_CACHE_SIZE = "hostRoleCommandStatusSummaryCacheSize";
public final static String HRC_STATUS_SUMMARY_CACHE_EXPIRY_DURATION_MINUTES = "hostRoleCommandStatusCacheExpiryDurationMins";
public final static String HRC_STATUS_SUMMARY_CACHE_ENABLED = "hostRoleCommandStatusSummaryCacheEnabled";
/**
* Invalidates the host role command status summary cache entry that corresponds to the given request.
* @param requestId the key of the cache entry to be invalidated.
*/
protected void invalidateHostRoleCommandStatusSummaryCache(Long requestId) {
if (!hostRoleCommandStatusSummaryCacheEnabled ) {
return;
}
LOG.debug("Invalidating host role command status summary cache for request {} !", requestId);
hrcStatusSummaryCache.invalidate(requestId);
}
/**
* Invalidates the host role command status summary cache entry that
* corresponds to each request.
*
* @param requestIds
* the requests to invalidate
*/
protected void invalidateHostRoleCommandStatusSummaryCache(Set<Long> requestIds) {
for (Long requestId : requestIds) {
if (null != requestId) {
invalidateHostRoleCommandStatusSummaryCache(requestId);
}
}
}
/**
* Invalidates those entries in host role command status cache which are
* dependent on the passed
* {@link org.apache.ambari.server.orm.entities.HostRoleCommandEntity} entity.
*
* @param hostRoleCommandEntity
*/
protected void invalidateHostRoleCommandStatusSummaryCache(
HostRoleCommandEntity hostRoleCommandEntity) {
if ( !hostRoleCommandStatusSummaryCacheEnabled ) {
return;
}
if (hostRoleCommandEntity != null) {
Long requestId = hostRoleCommandEntity.getRequestId();
if (requestId == null) {
StageEntity stageEntity = hostRoleCommandEntity.getStage();
if (stageEntity != null) {
requestId = stageEntity.getRequestId();
}
}
if (requestId != null) {
invalidateHostRoleCommandStatusSummaryCache(requestId.longValue());
}
}
}
/**
* Loads the counts of tasks for a request and groups them by stage id.
* This allows for very efficient loading when there are a huge number of stages
* and tasks to iterate (for example, during a Stack Upgrade).
* @param requestId the request id
* @return the map of stage-to-summary objects
*/
@RequiresSession
private Map<Long, HostRoleCommandStatusSummaryDTO> loadAggregateCounts(Long requestId) {
Map<Long, HostRoleCommandStatusSummaryDTO> map = new HashMap<>();
EntityManager entityManager = entityManagerProvider.get();
TypedQuery<HostRoleCommandStatusSummaryDTO> query = entityManager.createQuery(SUMMARY_DTO,
HostRoleCommandStatusSummaryDTO.class);
query.setParameter("requestId", requestId);
query.setParameter("aborted", HostRoleStatus.ABORTED);
query.setParameter("completed", HostRoleStatus.COMPLETED);
query.setParameter("failed", HostRoleStatus.FAILED);
query.setParameter("holding", HostRoleStatus.HOLDING);
query.setParameter("holding_failed", HostRoleStatus.HOLDING_FAILED);
query.setParameter("holding_timedout", HostRoleStatus.HOLDING_TIMEDOUT);
query.setParameter("in_progress", HostRoleStatus.IN_PROGRESS);
query.setParameter("pending", HostRoleStatus.PENDING);
query.setParameter("queued", HostRoleStatus.QUEUED);
query.setParameter("timedout", HostRoleStatus.TIMEDOUT);
query.setParameter("skipped_failed", HostRoleStatus.SKIPPED_FAILED);
for (HostRoleCommandStatusSummaryDTO dto : daoUtils.selectList(query)) {
map.put(dto.getStageId(), dto);
}
return map;
}
@Inject
public HostRoleCommandDAO(
@Named(HRC_STATUS_SUMMARY_CACHE_ENABLED) boolean hostRoleCommandStatusSummaryCacheEnabled,
@Named(HRC_STATUS_SUMMARY_CACHE_SIZE) long hostRoleCommandStatusSummaryCacheLimit,
@Named(HRC_STATUS_SUMMARY_CACHE_EXPIRY_DURATION_MINUTES) long hostRoleCommandStatusSummaryCacheExpiryDurationMins) {
this.hostRoleCommandStatusSummaryCacheEnabled = hostRoleCommandStatusSummaryCacheEnabled;
LOG.info("Host role command status summary cache {} !", hostRoleCommandStatusSummaryCacheEnabled ? "enabled" : "disabled");
hrcStatusSummaryCache = CacheBuilder.newBuilder()
.maximumSize(hostRoleCommandStatusSummaryCacheLimit)
.expireAfterWrite(hostRoleCommandStatusSummaryCacheExpiryDurationMins, TimeUnit.MINUTES)
.build();
}
@RequiresSession
public HostRoleCommandEntity findByPK(long taskId) {
return entityManagerProvider.get().find(HostRoleCommandEntity.class, taskId);
}
@RequiresSession
public List<HostRoleCommandEntity> findByPKs(Collection<Long> taskIds) {
if (taskIds == null || taskIds.isEmpty()) {
return Collections.emptyList();
}
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
"SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " +
"ORDER BY task.taskId",
HostRoleCommandEntity.class);
if (taskIds.size() > configuration.getTaskIdListLimit()) {
List<HostRoleCommandEntity> result = new ArrayList<>();
List<List<Long>> lists = Lists.partition(new ArrayList<>(taskIds), configuration.getTaskIdListLimit());
for (List<Long> list : lists) {
result.addAll(daoUtils.selectList(query, list));
}
return result;
}
return daoUtils.selectList(query, taskIds);
}
@RequiresSession
public List<HostRoleCommandEntity> findByHostId(Long hostId) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findByHostId",
HostRoleCommandEntity.class);
query.setParameter("hostId", hostId);
return daoUtils.selectList(query);
}
@RequiresSession
public List<HostRoleCommandEntity> findByRequestIds(Collection<Long> requestIds) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
"SELECT task FROM HostRoleCommandEntity task " +
"WHERE task.requestId IN ?1 " +
"ORDER BY task.taskId", HostRoleCommandEntity.class);
return daoUtils.selectList(query, requestIds);
}
@RequiresSession
public List<HostRoleCommandEntity> findByRequestIdAndStatuses(Long requestId, Collection<HostRoleStatus> statuses) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findByRequestIdAndStatuses", HostRoleCommandEntity.class);
query.setParameter("requestId", requestId);
query.setParameter("statuses", statuses);
List<HostRoleCommandEntity> results = query.getResultList();
return results;
}
@RequiresSession
public List<Long> findTaskIdsByRequestIds(Collection<Long> requestIds) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery(
"SELECT task.taskId FROM HostRoleCommandEntity task " +
"WHERE task.requestId IN ?1 " +
"ORDER BY task.taskId", Long.class);
return daoUtils.selectList(query, requestIds);
}
@RequiresSession
public List<HostRoleCommandEntity> findByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
"SELECT DISTINCT task FROM HostRoleCommandEntity task " +
"WHERE task.requestId IN ?1 AND task.taskId IN ?2 " +
"ORDER BY task.taskId", HostRoleCommandEntity.class
);
return daoUtils.selectList(query, requestIds, taskIds);
}
@RequiresSession
public List<Long> findTaskIdsByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery(
"SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
"WHERE task.requestId IN ?1 AND task.taskId IN ?2 " +
"ORDER BY task.taskId", Long.class
);
if (taskIds.size() > configuration.getTaskIdListLimit()) {
List<Long> result = new ArrayList<>();
List<List<Long>> lists = Lists.partition(new ArrayList<>(taskIds), configuration.getTaskIdListLimit());
for (List<Long> taskIdList : lists) {
result.addAll(daoUtils.selectList(query, requestIds, taskIdList));
}
return result;
}
return daoUtils.selectList(query, requestIds, taskIds);
}
@RequiresSession
public List<Long> findTaskIdsByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery(
"SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
"WHERE task.hostEntity.hostName=?1 AND task.role=?2 AND task.status=?3 " +
"ORDER BY task.taskId", Long.class
);
return daoUtils.selectList(query, hostname, role, status);
}
@RequiresSession
public List<Long> findTaskIdsByRoleAndStatus(String role, HostRoleStatus status) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery(
"SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
"WHERE task.role=?1 AND task.status=?2 " +
"ORDER BY task.taskId", Long.class);
return daoUtils.selectList(query, role, status);
}
@RequiresSession
public List<HostRoleCommandEntity> findSortedCommandsByRequestIdAndCustomCommandName(Long requestId, String customCommandName) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
"FROM HostRoleCommandEntity hostRoleCommand " +
"WHERE hostRoleCommand.requestId=?1 AND hostRoleCommand.customCommandName=?2 " +
"ORDER BY hostRoleCommand.taskId", HostRoleCommandEntity.class);
return daoUtils.selectList(query, requestId, customCommandName);
}
@RequiresSession
public List<HostRoleCommandEntity> findSortedCommandsByStageAndHost(StageEntity stageEntity, HostEntity hostEntity) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
"FROM HostRoleCommandEntity hostRoleCommand " +
"WHERE hostRoleCommand.stage=?1 AND hostRoleCommand.hostEntity.hostName=?2 " +
"ORDER BY hostRoleCommand.taskId", HostRoleCommandEntity.class);
return daoUtils.selectList(query, stageEntity, hostEntity.getHostName());
}
@RequiresSession
public Map<String, List<HostRoleCommandEntity>> findSortedCommandsByStage(StageEntity stageEntity) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
"FROM HostRoleCommandEntity hostRoleCommand " +
"WHERE hostRoleCommand.stage=?1 " +
"ORDER BY hostRoleCommand.hostEntity.hostName, hostRoleCommand.taskId", HostRoleCommandEntity.class);
List<HostRoleCommandEntity> commandEntities = daoUtils.selectList(query, stageEntity);
Map<String, List<HostRoleCommandEntity>> hostCommands = new HashMap<>();
for (HostRoleCommandEntity commandEntity : commandEntities) {
if (!hostCommands.containsKey(commandEntity.getHostName())) {
hostCommands.put(commandEntity.getHostName(), new ArrayList<>());
}
hostCommands.get(commandEntity.getHostName()).add(commandEntity);
}
return hostCommands;
}
@RequiresSession
public List<Long> findTaskIdsByStage(long requestId, long stageId) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand.taskId " +
"FROM HostRoleCommandEntity hostRoleCommand " +
"WHERE hostRoleCommand.stage.requestId=?1 " +
"AND hostRoleCommand.stage.stageId=?2 "+
"ORDER BY hostRoleCommand.taskId", Long.class);
return daoUtils.selectList(query, requestId, stageId);
}
@RequiresSession
public Map<Long, Integer> getHostIdToCountOfCommandsWithStatus(Collection<HostRoleStatus> statuses) {
Map<Long, Integer> hostIdToCount = new HashMap<>();
String queryName = "SELECT command.hostId FROM HostRoleCommandEntity command WHERE command.status IN :statuses";
TypedQuery<Long> query = entityManagerProvider.get().createQuery(queryName, Long.class);
query.setParameter("statuses", statuses);
List<Long> results = query.getResultList();
for (Long hostId : results) {
if (hostIdToCount.containsKey(hostId)) {
hostIdToCount.put(hostId, hostIdToCount.get(hostId) + 1);
} else {
hostIdToCount.put(hostId, 1);
}
}
return hostIdToCount;
}
@RequiresSession
public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, String role) {
String queryName = (null == hostName) ? "HostRoleCommandEntity.findByHostRoleNullHost" :
"HostRoleCommandEntity.findByHostRole";
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
queryName, HostRoleCommandEntity.class);
if (null != hostName) {
query.setParameter("hostName", hostName);
}
query.setParameter("requestId", requestId);
query.setParameter("stageId", stageId);
query.setParameter("role", role);
return daoUtils.selectList(query);
}
@RequiresSession
public List<HostRoleCommandEntity> findByRequest(long requestId) {
return findByRequest(requestId, false);
}
@RequiresSession
public List<HostRoleCommandEntity> findByRequest(long requestId, boolean refreshHint) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findByRequestId",
HostRoleCommandEntity.class);
if (refreshHint) {
query.setHint(QueryHints.REFRESH, HintValues.TRUE);
}
query.setParameter("requestId", requestId);
return daoUtils.selectList(query);
}
@RequiresSession
public List<Long> findTaskIdsByRequest(long requestId) {
TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT command.taskId " +
"FROM HostRoleCommandEntity command " +
"WHERE command.requestId=?1 ORDER BY command.taskId", Long.class);
return daoUtils.selectList(query, requestId);
}
/**
* Gets the commands in a particular status.
*
* @param statuses
* the statuses to include (not {@code null}).
* @return the commands in the given set of statuses.
*/
@RequiresSession
public List<HostRoleCommandEntity> findByStatus(
Collection<HostRoleStatus> statuses) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findByCommandStatuses",
HostRoleCommandEntity.class);
query.setParameter("statuses", statuses);
return daoUtils.selectList(query);
}
/**
* Gets the number of commands in a particular status.
*
* @param statuses
* the statuses to include (not {@code null}).
* @return the count of commands in the given set of statuses.
*/
@RequiresSession
public Number getCountByStatus(Collection<HostRoleStatus> statuses) {
TypedQuery<Number> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findCountByCommandStatuses", Number.class);
query.setParameter("statuses", statuses);
return daoUtils.selectSingle(query);
}
@RequiresSession
public List<HostRoleCommandEntity> findAll() {
return daoUtils.selectAll(entityManagerProvider.get(), HostRoleCommandEntity.class);
}
/**
* Finds all the {@link HostRoleCommandEntity}s for the given request that are
* between the specified stage IDs and have the specified status.
*
* @param requestId
* the request ID
* @param status
* the command status to query for (not {@code null}).
* @param minStageId
* the lowest stage ID to requests tasks for.
* @param maxStageId
* the highest stage ID to request tasks for.
* @return the tasks that satisfy the specified parameters.
*/
@RequiresSession
public List<HostRoleCommandEntity> findByStatusBetweenStages(long requestId,
HostRoleStatus status, long minStageId, long maxStageId) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findByStatusBetweenStages", HostRoleCommandEntity.class);
query.setParameter("requestId", requestId);
query.setParameter("status", status);
query.setParameter("minStageId", minStageId);
query.setParameter("maxStageId", maxStageId);
return daoUtils.selectList(query);
}
/**
* Gets requests that have tasks in any of the specified statuses.
*
* @param statuses
* @param maxResults
* @param ascOrder
* @return
*/
@RequiresSession
public List<Long> getRequestsByTaskStatus(
Collection<HostRoleStatus> statuses, int maxResults, boolean ascOrder) {
String sortOrder = "ASC";
if (!ascOrder) {
sortOrder = "DESC";
}
String sql = MessageFormat.format(REQUESTS_BY_TASK_STATUS_SQL, sortOrder);
TypedQuery<Long> query = entityManagerProvider.get().createQuery(sql,
Long.class);
query.setParameter("taskStatuses", statuses);
return daoUtils.selectList(query);
}
@RequiresSession
public List<Long> getCompletedRequests(int maxResults, boolean ascOrder) {
String sortOrder = "ASC";
if (!ascOrder) {
sortOrder = "DESC";
}
String sql = MessageFormat.format(COMPLETED_REQUESTS_SQL, sortOrder);
TypedQuery<Long> query = entityManagerProvider.get().createQuery(sql,
Long.class);
query.setParameter("notCompletedStatuses",
HostRoleStatus.NOT_COMPLETED_STATUSES);
return daoUtils.selectList(query);
}
/**
* NB: You cannot rely on return value if batch write is enabled
*/
@Transactional
public int updateStatusByRequestId(long requestId, HostRoleStatus target, Collection<HostRoleStatus> sources) {
TypedQuery<HostRoleCommandEntity> selectQuery = entityManagerProvider.get().createQuery("SELECT command " +
"FROM HostRoleCommandEntity command " +
"WHERE command.requestId=?1 AND command.status IN ?2", HostRoleCommandEntity.class);
List<HostRoleCommandEntity> commandEntities = daoUtils.selectList(selectQuery, requestId, sources);
for (HostRoleCommandEntity entity : commandEntities) {
entity.setStatus(target);
merge(entity);
}
return commandEntities.size();
}
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public void create(HostRoleCommandEntity entity) {
EntityManager entityManager = entityManagerProvider.get();
entityManager.persist(entity);
invalidateHostRoleCommandStatusSummaryCache(entity);
}
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public HostRoleCommandEntity merge(HostRoleCommandEntity entity) {
entity = mergeWithoutPublishEvent(entity);
publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity)));
return entity;
}
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) {
EntityManager entityManager = entityManagerProvider.get();
entity = entityManager.merge(entity);
invalidateHostRoleCommandStatusSummaryCache(entity);
return entity;
}
@Transactional
public void removeByHostId(Long hostId) {
Collection<HostRoleCommandEntity> commands = findByHostId(hostId);
for (HostRoleCommandEntity cmd : commands) {
remove(cmd);
}
}
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public List<HostRoleCommandEntity> mergeAll(Collection<HostRoleCommandEntity> entities) {
Set<Long> requestsToInvalidate = new LinkedHashSet<>();
List<HostRoleCommandEntity> managedList = new ArrayList<>(entities.size());
for (HostRoleCommandEntity entity : entities) {
EntityManager entityManager = entityManagerProvider.get();
entity = entityManager.merge(entity);
managedList.add(entity);
Long requestId = entity.getRequestId();
if (requestId == null) {
StageEntity stageEntity = entity.getStage();
if (stageEntity != null) {
requestId = stageEntity.getRequestId();
}
}
requestsToInvalidate.add(requestId);
}
invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
publishTaskUpdateEvent(getHostRoleCommands(entities));
return managedList;
}
/**
*
* @param entities
*/
public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) {
Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () {
@Override
public HostRoleCommand apply(HostRoleCommandEntity entity) {
return hostRoleCommandFactory.createExisting(entity);
}
};
return FluentIterable.from(entities)
.transform(transform)
.toList();
}
/**
*
* @param hostRoleCommands
*/
public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) {
if (!hostRoleCommands.isEmpty()) {
TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands);
taskEventPublisher.publish(taskUpdateEvent);
}
}
/**
*
* @param hostRoleCommands
*/
public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) {
if (!hostRoleCommands.isEmpty()) {
TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
taskEventPublisher.publish(taskCreateEvent);
}
}
@Transactional
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
public void remove(HostRoleCommandEntity entity) {
EntityManager entityManager = entityManagerProvider.get();
entityManager.remove(entity);
invalidateHostRoleCommandStatusSummaryCache(entity);
}
@Transactional
public void removeByPK(int taskId) {
remove(findByPK(taskId));
}
/**
* Finds the counts of tasks for a request and groups them by stage id. If
* caching is enabled, this will first consult the cache. Cache misses will
* then defer to loading the data from the database and then caching the
* result.
*
* @param requestId
* the request id
* @return the map of stage-to-summary objects
*/
@RequiresSession
public Map<Long, HostRoleCommandStatusSummaryDTO> findAggregateCounts(Long requestId) {
if (!hostRoleCommandStatusSummaryCacheEnabled) {
return loadAggregateCounts(requestId);
}
Map<Long, HostRoleCommandStatusSummaryDTO> map = hrcStatusSummaryCache.getIfPresent(requestId);
if (null != map) {
return map;
}
// ensure that we wait for any running transactions working on this cache to
// complete
ReadWriteLock lock = transactionLocks.getLock(LockArea.HRC_STATUS_CACHE);
lock.readLock().lock();
try {
map = loadAggregateCounts(requestId);
hrcStatusSummaryCache.put(requestId, map);
return map;
} finally {
lock.readLock().unlock();
}
}
/**
* During Rolling and Express Upgrade, want to bubble up the error of the most recent failure, i.e., greatest
* task id, assuming that there are no other completed tasks after it.
* @param requestId upgrade request id
* @return Most recent task failure during stack upgrade, or null if one doesn't exist.
*/
@RequiresSession
public HostRoleCommandEntity findMostRecentFailure(Long requestId) {
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", HostRoleCommandEntity.class);
query.setParameter("requestId", requestId);
query.setParameter("statuses", HostRoleStatus.STACK_UPGRADE_FAILED_STATUSES);
List<HostRoleCommandEntity> results = query.getResultList();
if (!results.isEmpty()) {
HostRoleCommandEntity candidate = results.get(0);
// Ensure that there are no other completed tasks in a future stage to avoid returning an old error.
// During Express Upgrade, we can run multiple commands in the same stage, so it's possible to have
// COMPLETED tasks in the failed task's stage.
// During Rolling Upgrade, we run exactly one command per stage.
TypedQuery<Number> numberAlreadyRanTasksInFutureStage = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findNumTasksAlreadyRanInStage", Number.class);
numberAlreadyRanTasksInFutureStage.setParameter("requestId", requestId);
numberAlreadyRanTasksInFutureStage.setParameter("taskId", candidate.getTaskId());
numberAlreadyRanTasksInFutureStage.setParameter("stageId", candidate.getStageId());
numberAlreadyRanTasksInFutureStage.setParameter("statuses", HostRoleStatus.SCHEDULED_STATES);
Number result = daoUtils.selectSingle(numberAlreadyRanTasksInFutureStage);
if (result.longValue() == 0L) {
return candidate;
}
}
return null;
}
/**
* Updates the {@link HostRoleCommandEntity#isFailureAutoSkipped()} flag for
* all commands for the given request.
* <p/>
* This will update each entity to ensure that the cache is maintained in a
* correct state. A batch update doesn't always reflect in JPA-managed
* entities.
* <p/>
* Stages which do not support automatically skipped commands will be updated
* with a value of {@code false}.
*
* @param requestId
* the request ID of the commands to update
* @param skipOnFailure
* {@code true} to automatically skip failures, {@code false}
* otherwise.
* @param skipOnServiceCheckFailure
* {@code true} to skip service check failures
*
* @see StageEntity#isAutoSkipOnFailureSupported()
*/
@Transactional
public void updateAutomaticSkipOnFailure(long requestId,
boolean skipOnFailure, boolean skipOnServiceCheckFailure) {
List<HostRoleCommandEntity> tasks = findByRequest(requestId);
for (HostRoleCommandEntity task : tasks) {
// if the stage does not support automatically skipping its commands, then
// do nothing
StageEntity stage = task.getStage();
boolean isStageSkippable = stage.isSkippable();
boolean isAutoSkipSupportedOnStage = stage.isAutoSkipOnFailureSupported();
// if the stage is not skippable or it does not support auto skip
if (!isStageSkippable || !isAutoSkipSupportedOnStage) {
task.setAutoSkipOnFailure(false);
} else {
if (task.getRoleCommand() == RoleCommand.SERVICE_CHECK) {
task.setAutoSkipOnFailure(skipOnServiceCheckFailure);
} else {
task.setAutoSkipOnFailure(skipOnFailure);
}
}
// save changes
merge(task);
}
}
/**
* Finds all {@link HostRoleCommandEntity} that match the provided predicate.
* This method will make JPA do the heavy lifting of providing a slice of the
* result set.
*
* @param request
* @return
*/
@RequiresSession
public List<HostRoleCommandEntity> findAll(Request request, Predicate predicate) {
EntityManager entityManager = entityManagerProvider.get();
// convert the Ambari predicate into a JPA predicate
HostRoleCommandPredicateVisitor visitor = new HostRoleCommandPredicateVisitor();
PredicateHelper.visit(predicate, visitor);
CriteriaQuery<HostRoleCommandEntity> query = visitor.getCriteriaQuery();
javax.persistence.criteria.Predicate jpaPredicate = visitor.getJpaPredicate();
if (null != jpaPredicate) {
query.where(jpaPredicate);
}
// sorting
SortRequest sortRequest = request.getSortRequest();
if (null != sortRequest) {
JpaSortBuilder<HostRoleCommandEntity> sortBuilder = new JpaSortBuilder<>();
List<Order> sortOrders = sortBuilder.buildSortOrders(sortRequest, visitor);
query.orderBy(sortOrders);
}
TypedQuery<HostRoleCommandEntity> typedQuery = entityManager.createQuery(query);
// pagination
PageRequest pagination = request.getPageRequest();
if (null != pagination) {
typedQuery.setFirstResult(pagination.getOffset());
typedQuery.setMaxResults(pagination.getPageSize());
}
return daoUtils.selectList(typedQuery);
}
/**
* Gets a lists of hosts with commands in progress given a range of requests.
* The range of requests should include all requests with at least 1 stage in
* progress.
*
* @return the list of hosts with commands in progress.
* @see HostRoleStatus#IN_PROGRESS_STATUSES
*/
@RequiresSession
public List<String> getHostsWithPendingTasks(long iLowestRequestIdInProgress,
long iHighestRequestIdInProgress) {
TypedQuery<String> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findHostsByCommandStatus", String.class);
query.setParameter("iLowestRequestIdInProgress", iLowestRequestIdInProgress);
query.setParameter("iHighestRequestIdInProgress", iHighestRequestIdInProgress);
query.setParameter("statuses", HostRoleStatus.IN_PROGRESS_STATUSES);
return daoUtils.selectList(query);
}
/**
* Gets a lists of hosts with commands in progress which occurr before the
* specified request ID. This will only return commands which are not
* {@link AgentCommandType#BACKGROUND_EXECUTION_COMMAND} as thsee commands do
* not block future requests.
*
* @param lowerRequestIdInclusive
* the lowest request ID to consider (inclusive) when getting any
* blocking hosts.
* @param requestId
* the request ID to calculate any blocking hosts for (essentially,
* the upper limit exclusive)
* @return the list of hosts from older running requests which will block
* those same hosts in the specified request ID.
* @see HostRoleStatus#IN_PROGRESS_STATUSES
*/
@RequiresSession
public List<String> getBlockingHostsForRequest(long lowerRequestIdInclusive,
long requestId) {
TypedQuery<String> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.getBlockingHostsForRequest", String.class);
query.setParameter("lowerRequestIdInclusive", lowerRequestIdInclusive);
query.setParameter("upperRequestIdExclusive", requestId);
query.setParameter("statuses", HostRoleStatus.IN_PROGRESS_STATUSES);
return daoUtils.selectList(query);
}
/**
* Gets the most recently run service check grouped by the command's role
* (which is the only way to identify the service it was for!?)
*
* @param clusterId
* the ID of the cluster to get the service checks for.
*/
@RequiresSession
public List<LastServiceCheckDTO> getLatestServiceChecksByRole(long clusterId) {
TypedQuery<LastServiceCheckDTO> query = entityManagerProvider.get().createNamedQuery(
"HostRoleCommandEntity.findLatestServiceChecksByRole", LastServiceCheckDTO.class);
query.setParameter("clusterId", clusterId);
query.setParameter("roleCommand", RoleCommand.SERVICE_CHECK);
return daoUtils.selectList(query);
}
/**
* The {@link HostRoleCommandPredicateVisitor} is used to convert an Ambari
* {@link Predicate} into a JPA {@link javax.persistence.criteria.Predicate}.
*/
private final class HostRoleCommandPredicateVisitor
extends JpaPredicateVisitor<HostRoleCommandEntity> {
/**
* Constructor.
*
*/
public HostRoleCommandPredicateVisitor() {
super(entityManagerProvider.get(), HostRoleCommandEntity.class);
}
/**
* {@inheritDoc}
*/
@Override
public Class<HostRoleCommandEntity> getEntityClass() {
return HostRoleCommandEntity.class;
}
/**
* {@inheritDoc}
*/
@Override
public List<? extends SingularAttribute<?, ?>> getPredicateMapping(String propertyId) {
return HostRoleCommandEntity_.getPredicateMapping().get(propertyId);
}
}
public Set<Long> findTaskIdsByRequestStageIds(List<RequestDAO.StageEntityPK> requestStageIds) {
EntityManager entityManager = entityManagerProvider.get();
List<Long> taskIds = new ArrayList<>();
for (RequestDAO.StageEntityPK requestIds : requestStageIds) {
TypedQuery<Long> hostRoleCommandQuery =
entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class);
hostRoleCommandQuery.setParameter("requestId", requestIds.getRequestId());
hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId());
taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery));
}
return Sets.newHashSet(taskIds);
}
/**
* A simple DTO for storing the most recent service check time for a given
* {@link Role}.
*/
public static class LastServiceCheckDTO {
/**
* The role.
*/
public final String role;
/**
* The time that the service check ended.
*/
public final long endTime;
/**
* Constructor.
*
* @param role
* @param endTime
*/
public LastServiceCheckDTO(String role, long endTime) {
this.role = role;
this.endTime = endTime;
}
}
}