blob: 7fd6393e71f274de2832f8ffec5d58901c78db81 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.state.cluster;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.persistence.RollbackException;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ConfigGroupNotFoundException;
import org.apache.ambari.server.DuplicateResourceException;
import org.apache.ambari.server.ObjectNotFoundException;
import org.apache.ambari.server.ParentObjectNotFoundException;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariSessionManager;
import org.apache.ambari.server.controller.ClusterResponse;
import org.apache.ambari.server.controller.ConfigurationResponse;
import org.apache.ambari.server.controller.MaintenanceStateHelper;
import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
import org.apache.ambari.server.orm.RequiresSession;
import org.apache.ambari.server.orm.cache.HostConfigMapping;
import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ClusterStateDAO;
import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO;
import org.apache.ambari.server.orm.dao.HostConfigMappingDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.HostVersionDAO;
import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
import org.apache.ambari.server.orm.dao.RequestDAO;
import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
import org.apache.ambari.server.orm.dao.StackDAO;
import org.apache.ambari.server.orm.dao.TopologyRequestDAO;
import org.apache.ambari.server.orm.dao.UpgradeDAO;
import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
import org.apache.ambari.server.orm.entities.ClusterConfigMappingEntity;
import org.apache.ambari.server.orm.entities.ClusterEntity;
import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
import org.apache.ambari.server.orm.entities.ClusterStateEntity;
import org.apache.ambari.server.orm.entities.ClusterVersionEntity;
import org.apache.ambari.server.orm.entities.ConfigGroupEntity;
import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.HostVersionEntity;
import org.apache.ambari.server.orm.entities.PermissionEntity;
import org.apache.ambari.server.orm.entities.PrivilegeEntity;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
import org.apache.ambari.server.orm.entities.ResourceEntity;
import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
import org.apache.ambari.server.orm.entities.StackEntity;
import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
import org.apache.ambari.server.orm.entities.UpgradeEntity;
import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.ClusterHealthReport;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.ConfigFactory;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.DesiredConfig;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostHealthStatus;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.PropertyInfo;
import org.apache.ambari.server.state.RepositoryVersionState;
import org.apache.ambari.server.state.SecurityType;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
import org.apache.ambari.server.state.ServiceComponentHostEventType;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.StackInfo;
import org.apache.ambari.server.state.State;
import org.apache.ambari.server.state.configgroup.ConfigGroup;
import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
import org.apache.ambari.server.state.stack.upgrade.Direction;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary;
import org.apache.ambari.server.topology.TopologyRequest;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.persist.Transactional;
public class ClusterImpl implements Cluster {
private static final Logger LOG = LoggerFactory.getLogger(ClusterImpl.class);
private static final Logger configChangeLog = LoggerFactory.getLogger("configchange");
/**
* Prefix for cluster session attributes name.
*/
private static final String CLUSTER_SESSION_ATTRIBUTES_PREFIX = "cluster_session_attributes:";
@Inject
private Clusters clusters;
private StackId desiredStackVersion;
private volatile boolean desiredStackVersionSet = true;
private volatile Map<String, Service> services = null;
/**
* [ Config Type -> [ Config Version Tag -> Config ] ]
*/
private volatile Map<String, Map<String, Config>> allConfigs;
/**
* [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ]
*/
private Map<String, Map<String, Map<String, ServiceComponentHost>>>
serviceComponentHosts;
/**
* [ HostName -> [ ... ] ]
*/
private Map<String, List<ServiceComponentHost>>
serviceComponentHostsByHost;
/**
* Map of existing config groups
*/
private volatile Map<Long, ConfigGroup> clusterConfigGroups;
/**
* Map of Request schedules for this cluster
*/
private volatile Map<Long, RequestExecution> requestExecutions;
private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
// This is a lock for operations that do not need to be cluster global
private final ReentrantReadWriteLock hostTransitionStateLock = new ReentrantReadWriteLock();
private final Lock hostTransitionStateWriteLock = hostTransitionStateLock.writeLock();
private long clusterId;
private String clusterName;
@Inject
private ClusterDAO clusterDAO;
@Inject
private ClusterStateDAO clusterStateDAO;
@Inject
private ClusterVersionDAO clusterVersionDAO;
@Inject
private HostRoleCommandDAO hostRoleCommandDAO;
@Inject
private RequestDAO requestDAO;
@Inject
private HostDAO hostDAO;
@Inject
private HostVersionDAO hostVersionDAO;
@Inject
private ServiceFactory serviceFactory;
@Inject
private ConfigFactory configFactory;
@Inject
private HostConfigMappingDAO hostConfigMappingDAO;
@Inject
private ConfigGroupFactory configGroupFactory;
@Inject
private ConfigGroupHostMappingDAO configGroupHostMappingDAO;
@Inject
private RequestExecutionFactory requestExecutionFactory;
@Inject
private ConfigHelper configHelper;
@Inject
private MaintenanceStateHelper maintenanceStateHelper;
@Inject
private AmbariMetaInfo ambariMetaInfo;
@Inject
private ServiceConfigDAO serviceConfigDAO;
@Inject
private AlertDefinitionDAO alertDefinitionDAO;
@Inject
private AlertDispatchDAO alertDispatchDAO;
@Inject
private UpgradeDAO upgradeDAO;
@Inject
private RepositoryVersionDAO repositoryVersionDAO;
@Inject
private Configuration configuration;
@Inject
private AmbariSessionManager sessionManager;
@Inject
private TopologyRequestDAO topologyRequestDAO;
/**
* Data access object used for looking up stacks from the database.
*/
@Inject
private StackDAO stackDAO;
private volatile boolean svcHostsLoaded = false;
private volatile Multimap<String, String> serviceConfigTypes;
@Inject
public ClusterImpl(@Assisted ClusterEntity clusterEntity,
Injector injector) throws AmbariException {
injector.injectMembers(this);
clusterId = clusterEntity.getClusterId();
clusterName = clusterEntity.getClusterName();
serviceComponentHosts = new HashMap<>();
serviceComponentHostsByHost = new HashMap<>();
desiredStackVersion = new StackId(clusterEntity.getDesiredStack());
cacheConfigurations();
if (desiredStackVersion != null && !StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
loadServiceConfigTypes();
}
}
@Override
public ReadWriteLock getClusterGlobalLock() {
return clusterGlobalLock;
}
private void loadServiceConfigTypes() throws AmbariException {
try {
serviceConfigTypes = collectServiceConfigTypesMapping();
} catch (AmbariException e) {
LOG.error("Cannot load stack info:", e);
throw e;
}
LOG.info("Service config types loaded: {}", serviceConfigTypes);
}
/**
* Construct config type to service name mapping
* @throws AmbariException when stack or its part not found
*/
private Multimap<String, String> collectServiceConfigTypesMapping() throws AmbariException {
Multimap<String, String> serviceConfigTypes = HashMultimap.create();
Map<String, ServiceInfo> serviceInfoMap = null;
try {
serviceInfoMap = ambariMetaInfo.getServices(desiredStackVersion.getStackName(), desiredStackVersion.getStackVersion());
} catch (ParentObjectNotFoundException e) {
LOG.error("Service config versioning disabled due to exception: ", e);
return serviceConfigTypes;
}
for (Entry<String, ServiceInfo> entry : serviceInfoMap.entrySet()) {
String serviceName = entry.getKey();
ServiceInfo serviceInfo = entry.getValue();
Set<String> configTypes = serviceInfo.getConfigTypeAttributes().keySet();
for (String configType : configTypes) {
serviceConfigTypes.put(serviceName, configType);
}
}
return serviceConfigTypes;
}
/**
* Make sure we load all the service host components.
* We need this for live status checks.
*/
public void loadServiceHostComponents() {
loadServices();
if (svcHostsLoaded) {
return;
}
clusterGlobalLock.writeLock().lock();
try {
LOG.info("Loading Service Host Components");
if (svcHostsLoaded) {
return;
}
if (services != null) {
for (Entry<String, Service> serviceKV : services.entrySet()) {
/* get all the service component hosts **/
Service service = serviceKV.getValue();
if (!serviceComponentHosts.containsKey(service.getName())) {
serviceComponentHosts.put(service.getName(),
new HashMap<String, Map<String, ServiceComponentHost>>());
}
for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) {
ServiceComponent comp = svcComponent.getValue();
String componentName = svcComponent.getKey();
if (!serviceComponentHosts.get(service.getName()).containsKey(
componentName)) {
serviceComponentHosts.get(service.getName()).put(componentName,
new HashMap<String, ServiceComponentHost>());
}
/** Get Service Host Components **/
for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) {
String hostname = svchost.getKey();
ServiceComponentHost svcHostComponent = svchost.getValue();
if (!serviceComponentHostsByHost.containsKey(hostname)) {
serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
}
List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
compList.add(svcHostComponent);
if (!serviceComponentHosts.get(service.getName()).get(
componentName).containsKey(hostname)) {
serviceComponentHosts.get(service.getName()).get(componentName).put(
hostname, svcHostComponent);
}
}
}
}
}
svcHostsLoaded = true;
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
private void loadServices() {
if (services == null) {
clusterGlobalLock.writeLock().lock();
try {
if (services == null) {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
services = new TreeMap<String, Service>();
if (!clusterEntity.getClusterServiceEntities().isEmpty()) {
for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) {
StackId stackId = getCurrentStackVersion();
try {
if (ambariMetaInfo.getService(stackId.getStackName(),
stackId.getStackVersion(), serviceEntity.getServiceName()) != null) {
services.put(serviceEntity.getServiceName(),
serviceFactory.createExisting(this, serviceEntity));
}
} catch (AmbariException e) {
LOG.error(String.format(
"Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s",
stackId.getStackName(), stackId.getStackVersion(),
serviceEntity.getServiceName()));
}
}
}
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
}
private void loadConfigGroups() {
if (clusterConfigGroups == null) {
clusterGlobalLock.writeLock().lock();
try {
if (clusterConfigGroups == null) {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
clusterConfigGroups = new HashMap<Long, ConfigGroup>();
if (!clusterEntity.getConfigGroupEntities().isEmpty()) {
for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) {
clusterConfigGroups.put(configGroupEntity.getGroupId(),
configGroupFactory.createExisting(this, configGroupEntity));
}
}
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
}
private void loadRequestExecutions() {
if (requestExecutions == null) {
clusterGlobalLock.writeLock().lock();
try {
if (requestExecutions == null) {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
requestExecutions = new HashMap<Long, RequestExecution>();
if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) {
requestExecutions.put(scheduleEntity.getScheduleId(),
requestExecutionFactory.createExisting(this, scheduleEntity));
}
}
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
}
@Override
public void addConfigGroup(ConfigGroup configGroup) throws AmbariException {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
String hostList = "";
if(LOG.isDebugEnabled()) {
if (configGroup.getHosts() != null) {
for (Host host : configGroup.getHosts().values()) {
hostList += host.getHostName() + ", ";
}
}
}
LOG.debug("Adding a new Config group" + ", clusterName = "
+ getClusterName() + ", groupName = " + configGroup.getName()
+ ", tag = " + configGroup.getTag() + " with hosts " + hostList);
if (clusterConfigGroups.containsKey(configGroup.getId())) {
// The loadConfigGroups will load all groups to memory
LOG.debug("Config group already exists" + ", clusterName = "
+ getClusterName() + ", groupName = " + configGroup.getName()
+ ", groupId = " + configGroup.getId() + ", tag = "
+ configGroup.getTag());
} else {
clusterConfigGroups.put(configGroup.getId(), configGroup);
configHelper.invalidateStaleConfigsCache();
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Map<Long, ConfigGroup> getConfigGroups() {
loadConfigGroups();
clusterGlobalLock.readLock().lock();
try {
return Collections.unmodifiableMap(clusterConfigGroups);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Map<Long, ConfigGroup> getConfigGroupsByHostname(String hostname)
throws AmbariException {
loadConfigGroups();
Map<Long, ConfigGroup> configGroups = new HashMap<Long, ConfigGroup>();
clusterGlobalLock.readLock().lock();
try {
for (Entry<Long, ConfigGroup> groupEntry : clusterConfigGroups.entrySet()) {
Long id = groupEntry.getKey();
ConfigGroup group = groupEntry.getValue();
for (Host host : group.getHosts().values()) {
if (StringUtils.equals(hostname, host.getHostName())) {
configGroups.put(id, group);
break;
}
}
}
} finally {
clusterGlobalLock.readLock().unlock();
}
return configGroups;
}
@Override
public void addRequestExecution(RequestExecution requestExecution) throws AmbariException {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
LOG.info("Adding a new request schedule" + ", clusterName = "
+ getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
if (requestExecutions.containsKey(requestExecution.getId())) {
LOG.debug("Request schedule already exists" + ", clusterName = "
+ getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
} else {
requestExecutions.put(requestExecution.getId(), requestExecution);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Map<Long, RequestExecution> getAllRequestExecutions() {
loadRequestExecutions();
clusterGlobalLock.readLock().lock();
try {
return Collections.unmodifiableMap(requestExecutions);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void deleteRequestExecution(Long id) throws AmbariException {
loadRequestExecutions();
clusterGlobalLock.writeLock().lock();
try {
RequestExecution requestExecution = requestExecutions.get(id);
if (requestExecution == null) {
throw new AmbariException("Request schedule does not exists, "
+ "id = " + id);
}
LOG.info("Deleting request schedule" + ", clusterName = "
+ getClusterName() + ", id = " + requestExecution.getId()
+ ", description = " + requestExecution.getDescription());
requestExecution.delete();
requestExecutions.remove(id);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void deleteConfigGroup(Long id) throws AmbariException {
loadConfigGroups();
clusterGlobalLock.writeLock().lock();
try {
ConfigGroup configGroup = clusterConfigGroups.get(id);
if (configGroup == null) {
throw new ConfigGroupNotFoundException(getClusterName(), id.toString());
}
LOG.debug("Deleting Config group" + ", clusterName = " + getClusterName()
+ ", groupName = " + configGroup.getName() + ", groupId = "
+ configGroup.getId() + ", tag = " + configGroup.getTag());
configGroup.delete();
clusterConfigGroups.remove(id);
configHelper.invalidateStaleConfigsCache();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
public ServiceComponentHost getServiceComponentHost(String serviceName,
String serviceComponentName, String hostname) throws AmbariException {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
if (!serviceComponentHosts.containsKey(serviceName)
|| !serviceComponentHosts.get(serviceName).containsKey(
serviceComponentName)
|| !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey(
hostname)) {
throw new ServiceComponentHostNotFoundException(getClusterName(),
serviceName, serviceComponentName, hostname);
}
return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(
hostname);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public String getClusterName() {
return clusterName;
}
@Override
public void setClusterName(String clusterName) {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
String oldName = clusterEntity.getClusterName();
clusterEntity.setClusterName(clusterName);
// RollbackException possibility if UNIQUE constraint violated
clusterEntity = clusterDAO.merge(clusterEntity);
clusters.updateClusterName(oldName, clusterName);
this.clusterName = clusterName;
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void addServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
Service service = getService(serviceComponentHost.getServiceName());
ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentHost.getServiceComponentName());
serviceComponent.addServiceComponentHost(serviceComponentHost);
}
persistServiceComponentHosts(serviceComponentHosts);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Transactional
void persistServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) {
for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
serviceComponentHost.persist();
}
}
public void addServiceComponentHost(ServiceComponentHost svcCompHost)
throws AmbariException {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to add component {} of service {} on {} to the cache",
svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(),
svcCompHost.getHostName());
}
loadServiceHostComponents();
final String hostname = svcCompHost.getHostName();
final String serviceName = svcCompHost.getServiceName();
final String componentName = svcCompHost.getServiceComponentName();
Set<Cluster> cs = clusters.getClustersForHost(hostname);
clusterGlobalLock.writeLock().lock();
try {
boolean clusterFound = false;
Iterator<Cluster> iter = cs.iterator();
while (iter.hasNext()) {
Cluster c = iter.next();
if (c.getClusterId() == getClusterId()) {
clusterFound = true;
break;
}
}
if (!clusterFound) {
throw new AmbariException("Host does not belong this cluster"
+ ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
}
if (!serviceComponentHosts.containsKey(serviceName)) {
serviceComponentHosts.put(serviceName,
new HashMap<String, Map<String, ServiceComponentHost>>());
}
if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
serviceComponentHosts.get(serviceName).put(componentName,
new HashMap<String, ServiceComponentHost>());
}
if (serviceComponentHosts.get(serviceName).get(componentName).containsKey(
hostname)) {
throw new AmbariException("Duplicate entry for ServiceComponentHost"
+ ", serviceName=" + serviceName + ", serviceComponentName"
+ componentName + ", hostname= " + hostname);
}
if (!serviceComponentHostsByHost.containsKey(hostname)) {
serviceComponentHostsByHost.put(hostname,
new ArrayList<ServiceComponentHost>());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a new ServiceComponentHost" + ", clusterName="
+ getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + serviceName + ", serviceComponentName"
+ componentName + ", hostname= " + hostname);
}
serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
svcCompHost);
serviceComponentHostsByHost.get(hostname).add(svcCompHost);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void removeServiceComponentHost(ServiceComponentHost svcCompHost)
throws AmbariException {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Trying to remove component {} of service {} on {} from the cache",
svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(),
svcCompHost.getHostName());
}
loadServiceHostComponents();
final String hostname = svcCompHost.getHostName();
final String serviceName = svcCompHost.getServiceName();
final String componentName = svcCompHost.getServiceComponentName();
Set<Cluster> cs = clusters.getClustersForHost(hostname);
clusterGlobalLock.writeLock().lock();
try {
boolean clusterFound = false;
Iterator<Cluster> iter = cs.iterator();
while (iter.hasNext()) {
Cluster c = iter.next();
if (c.getClusterId() == getClusterId()) {
clusterFound = true;
break;
}
}
if (!clusterFound) {
throw new AmbariException("Host does not belong this cluster"
+ ", hostname=" + hostname + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId());
}
if (!serviceComponentHosts.containsKey(serviceName)
|| !serviceComponentHosts.get(serviceName).containsKey(componentName)
|| !serviceComponentHosts.get(serviceName).get(componentName).containsKey(
hostname)) {
throw new AmbariException("Invalid entry for ServiceComponentHost"
+ ", serviceName=" + serviceName + ", serviceComponentName"
+ componentName + ", hostname= " + hostname);
}
if (!serviceComponentHostsByHost.containsKey(hostname)) {
throw new AmbariException("Invalid host entry for ServiceComponentHost"
+ ", serviceName=" + serviceName + ", serviceComponentName"
+ componentName + ", hostname= " + hostname);
}
ServiceComponentHost schToRemove = null;
for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
if (sch.getServiceName().equals(serviceName)
&& sch.getServiceComponentName().equals(componentName)
&& sch.getHostName().equals(hostname)) {
schToRemove = sch;
break;
}
}
if (schToRemove == null) {
LOG.warn("Unavailable in per host cache. ServiceComponentHost"
+ ", serviceName=" + serviceName
+ ", serviceComponentName" + componentName
+ ", hostname= " + hostname);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing a ServiceComponentHost" + ", clusterName="
+ getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + serviceName + ", serviceComponentName"
+ componentName + ", hostname= " + hostname);
}
serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
if (schToRemove != null) {
serviceComponentHostsByHost.get(hostname).remove(schToRemove);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public long getClusterId() {
// Add cluster creates the managed entity before creating the Cluster
// instance so id would not be null.
return clusterId;
}
@Override
public List<ServiceComponentHost> getServiceComponentHosts(
String hostname) {
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
if (serviceComponentHostsByHost.containsKey(hostname)) {
return new CopyOnWriteArrayList<ServiceComponentHost>(
serviceComponentHostsByHost.get(hostname));
}
return new ArrayList<ServiceComponentHost>();
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Map<String, Set<String>> getServiceComponentHostMap(Set<String> hostNames, Set<String> serviceNames) {
Map<String, Set<String>> componentHostMap = new HashMap<String, Set<String>>();
Collection<Host> hosts = getHosts();
if(hosts != null) {
for (Host host : hosts) {
String hostname = host.getHostName();
// If this host is not filtered out, continue processing
if ((hostNames == null) || hostNames.contains(hostname)) {
List<ServiceComponentHost> serviceComponentHosts = getServiceComponentHosts(hostname);
if (serviceComponentHosts != null) {
for (ServiceComponentHost sch : serviceComponentHosts) {
// If the service for this ServiceComponentHost is not filtered out, continue processing
if ((serviceNames == null) || serviceNames.contains(sch.getServiceName())) {
String component = sch.getServiceComponentName();
Set<String> componentHosts = componentHostMap.get(component);
if (componentHosts == null) {
componentHosts = new HashSet<String>();
componentHostMap.put(component, componentHosts);
}
componentHosts.add(hostname);
}
}
}
}
}
}
return componentHostMap;
}
@Override
public List<ServiceComponentHost> getServiceComponentHosts(String serviceName, String componentName) {
ArrayList<ServiceComponentHost> foundItems = new ArrayList<ServiceComponentHost>();
loadServiceHostComponents();
clusterGlobalLock.readLock().lock();
try {
Map<String, Map<String, ServiceComponentHost>> foundByService = serviceComponentHosts.get(serviceName);
if (foundByService != null) {
if (componentName == null) {
for(Map<String, ServiceComponentHost> foundByComponent :foundByService.values()) {
foundItems.addAll(foundByComponent.values());
}
} else if (foundByService.containsKey(componentName)) {
foundItems.addAll(foundByService.get(componentName).values());
}
}
} finally {
clusterGlobalLock.readLock().unlock();
}
return foundItems;
}
@Override
public void addService(Service service)
throws AmbariException {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId() + ", serviceName="
+ service.getName());
}
if (services.containsKey(service.getName())) {
throw new AmbariException("Service already exists" + ", clusterName="
+ getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + service.getName());
}
services.put(service.getName(), service);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Service addService(String serviceName) throws AmbariException {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding a new Service" + ", clusterName=" + getClusterName()
+ ", clusterId=" + getClusterId() + ", serviceName=" + serviceName);
}
if (services.containsKey(serviceName)) {
throw new AmbariException("Service already exists" + ", clusterName="
+ getClusterName() + ", clusterId=" + getClusterId()
+ ", serviceName=" + serviceName);
}
Service s = serviceFactory.createNew(this, serviceName);
services.put(s.getName(), s);
return s;
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Service getService(String serviceName) throws AmbariException {
loadServices();
clusterGlobalLock.readLock().lock();
try {
if (!services.containsKey(serviceName)) {
throw new ServiceNotFoundException(getClusterName(), serviceName);
}
return services.get(serviceName);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Map<String, Service> getServices() {
loadServices();
clusterGlobalLock.readLock().lock();
try {
return new HashMap<String, Service>(services);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public StackId getDesiredStackVersion() {
loadStackVersion();
clusterGlobalLock.readLock().lock();
try {
return desiredStackVersion;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void setDesiredStackVersion(StackId stackId) throws AmbariException {
setDesiredStackVersion(stackId, false);
}
@Override
public void setDesiredStackVersion(StackId stackId, boolean cascade) throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Changing DesiredStackVersion of Cluster" + ", clusterName="
+ getClusterName() + ", clusterId=" + getClusterId()
+ ", currentDesiredStackVersion=" + desiredStackVersion
+ ", newDesiredStackVersion=" + stackId);
}
desiredStackVersion = stackId;
StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
clusterEntity.setDesiredStack(stackEntity);
clusterEntity = clusterDAO.merge(clusterEntity);
if (cascade) {
for (Service service : getServices().values()) {
service.setDesiredStackVersion(stackId);
for (ServiceComponent sc : service.getServiceComponents().values()) {
sc.setDesiredStackVersion(stackId);
for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) {
sch.setDesiredStackVersion(stackId);
}
}
}
}
loadServiceConfigTypes();
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public StackId getCurrentStackVersion() {
clusterGlobalLock.readLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity();
if (clusterStateEntity != null) {
StackEntity currentStackEntity = clusterStateEntity.getCurrentStack();
return new StackId(currentStackEntity);
}
}
return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public State getProvisioningState() {
clusterGlobalLock.readLock().lock();
State provisioningState = null;
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
provisioningState = clusterEntity.getProvisioningState();
if (null == provisioningState) {
provisioningState = State.INIT;
}
}
return provisioningState;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void setProvisioningState(State provisioningState) {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
clusterEntity.setProvisioningState(provisioningState);
clusterEntity = clusterDAO.merge(clusterEntity);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public SecurityType getSecurityType() {
clusterGlobalLock.readLock().lock();
SecurityType securityType = null;
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
securityType = clusterEntity.getSecurityType();
if (null == securityType) {
securityType = SecurityType.NONE;
}
}
return securityType;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void setSecurityType(SecurityType securityType) {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
clusterEntity.setSecurityType(securityType);
clusterEntity = clusterDAO.merge(clusterEntity);
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Get the ClusterVersionEntity object whose state is CURRENT.
* @return
*/
@Override
public ClusterVersionEntity getCurrentClusterVersion() {
Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities();
for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
if (clusterVersionEntity.getState() == RepositoryVersionState.CURRENT) {
// TODO assuming there's only 1 current version, return 1st found, exception was expected in previous implementation
return clusterVersionEntity;
}
}
return null;
}
/**
* Get any stack upgrade currently in progress.
* @return
*/
private UpgradeEntity getUpgradeInProgress() {
UpgradeEntity mostRecentUpgrade = upgradeDAO.findLastUpgradeOrDowngradeForCluster(this.getClusterId());
if (mostRecentUpgrade != null) {
List<HostRoleStatus> UNFINISHED_STATUSES = new ArrayList();
UNFINISHED_STATUSES.add(HostRoleStatus.PENDING);
UNFINISHED_STATUSES.add(HostRoleStatus.ABORTED);
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(mostRecentUpgrade.getRequestId(), UNFINISHED_STATUSES);
if (!commands.isEmpty()) {
return mostRecentUpgrade;
}
}
return null;
}
/**
* If no RU/EU is in progress, get the ClusterVersionEntity object whose state is CURRENT.
* If RU/EU is in progress, based on the direction and desired stack, determine which version to use.
* Assuming upgrading from HDP 2.2.0.0-1 to 2.3.0.0-2, then
* RU Upgrade: 2.3.0.0-2 (desired stack id)
* RU Downgrade: 2.2.0.0-1 (desired stack id)
* EU Upgrade: while stopping services and before changing desired stack, use 2.2.0.0-1, after, use 2.3.0.0-2
* EU Downgrade: while stopping services and before changing desired stack, use 2.3.0.0-2, after, use 2.2.0.0-1
* @return
*/
@Override
public ClusterVersionEntity getEffectiveClusterVersion() throws AmbariException {
// This is not reliable. Need to find the last upgrade request.
UpgradeEntity upgradeInProgress = this.getUpgradeInProgress();
if (upgradeInProgress == null) {
return this.getCurrentClusterVersion();
}
String effectiveVersion = null;
switch (upgradeInProgress.getUpgradeType()) {
case NON_ROLLING:
if (upgradeInProgress.getDirection() == Direction.UPGRADE) {
boolean pastChangingStack = this.isNonRollingUpgradePastUpgradingStack(upgradeInProgress);
effectiveVersion = pastChangingStack ? upgradeInProgress.getToVersion() : upgradeInProgress.getFromVersion();
} else {
// Should be the lower value during a Downgrade.
effectiveVersion = upgradeInProgress.getToVersion();
}
break;
case ROLLING:
default:
// Version will be higher on upgrade and lower on downgrade directions.
effectiveVersion = upgradeInProgress.getToVersion();
break;
}
if (effectiveVersion == null) {
throw new AmbariException("Unable to determine which version to use during Stack Upgrade, effectiveVersion is null.");
}
// Find the first cluster version whose repo matches the expected version.
Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities();
for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
if (clusterVersionEntity.getRepositoryVersion().getVersion().equals(effectiveVersion)) {
return clusterVersionEntity;
}
}
return null;
}
/**
* Given a NonRolling stack upgrade, determine if it has already crossed the point of using the newer version.
* @param upgrade Stack Upgrade
* @return Return true if should be using to_version, otherwise, false to mean the from_version.
*/
private boolean isNonRollingUpgradePastUpgradingStack(UpgradeEntity upgrade) {
for (UpgradeGroupEntity group : upgrade.getUpgradeGroups()) {
if (group.getName().equalsIgnoreCase(UpgradeResourceProvider.CONST_UPGRADE_GROUP_NAME)) {
for (UpgradeItemEntity item : group.getItems()) {
List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(upgrade.getRequestId(), item.getStageId());
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByPKs(taskIds);
for (HostRoleCommandEntity command : commands) {
if (command.getCustomCommandName() != null &&
command.getCustomCommandName().equalsIgnoreCase(UpgradeResourceProvider.CONST_CUSTOM_COMMAND_NAME) &&
command.getStatus() == HostRoleStatus.COMPLETED) {
return true;
}
}
}
return false;
}
}
return false;
}
/**
* Get all of the ClusterVersionEntity objects for the cluster.
* @return
*/
@Override
public Collection<ClusterVersionEntity> getAllClusterVersions() {
return clusterVersionDAO.findByCluster(getClusterName());
}
/**
* During the Finalize Action, want to transition all Host Versions from UPGRADED to CURRENT, and the last CURRENT one to INSTALLED.
* @param hostNames Collection of host names
* @param currentClusterVersion Entity that contains the cluster's current stack (with its name and version)
* @param desiredState Desired state must be {@link RepositoryVersionState#CURRENT} or {@link RepositoryVersionState#UPGRADING}
* @throws AmbariException
*/
@Override
public void mapHostVersions(Set<String> hostNames, ClusterVersionEntity currentClusterVersion, RepositoryVersionState desiredState) throws AmbariException {
if (currentClusterVersion == null) {
throw new AmbariException("Could not find current stack version of cluster " + getClusterName());
}
final Set<RepositoryVersionState> validStates = new HashSet<RepositoryVersionState>(){{
add(RepositoryVersionState.CURRENT);
}};
if (!validStates.contains(desiredState)) {
throw new AmbariException("The state must be one of [" + StringUtils.join(validStates, ", ") + "]");
}
clusterGlobalLock.writeLock().lock();
try {
StackEntity repoVersionStackEntity = currentClusterVersion.getRepositoryVersion().getStack();
StackId repoVersionStackId = new StackId(repoVersionStackEntity);
Map<String, HostVersionEntity> existingHostToHostVersionEntity = new HashMap<String, HostVersionEntity>();
List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(
getClusterName(), repoVersionStackId,
currentClusterVersion.getRepositoryVersion().getVersion());
if (existingHostVersionEntities != null) {
for (HostVersionEntity entity : existingHostVersionEntities) {
existingHostToHostVersionEntity.put(entity.getHostName(), entity);
}
}
Sets.SetView<String> intersection = Sets.intersection(
existingHostToHostVersionEntity.keySet(), hostNames);
for (String hostname : hostNames) {
List<HostVersionEntity> currentHostVersions = hostVersionDAO.findByClusterHostAndState(
getClusterName(), hostname, RepositoryVersionState.CURRENT);
HostVersionEntity currentHostVersionEntity = (currentHostVersions != null && currentHostVersions.size() == 1) ? currentHostVersions.get(0)
: null;
// Notice that if any hosts already have the desired stack and version, regardless of the state, we try
// to be robust and only insert records for the missing hosts.
if (!intersection.contains(hostname)) {
// According to the business logic, we don't create objects in a CURRENT state.
HostEntity hostEntity = hostDAO.findByName(hostname);
HostVersionEntity hostVersionEntity = new HostVersionEntity(hostEntity, currentClusterVersion.getRepositoryVersion(), desiredState);
hostVersionDAO.create(hostVersionEntity);
} else {
HostVersionEntity hostVersionEntity = existingHostToHostVersionEntity.get(hostname);
if (hostVersionEntity.getState() != desiredState) {
hostVersionEntity.setState(desiredState);
hostVersionEntity = hostVersionDAO.merge(hostVersionEntity);
}
// Maintain the invariant that only one HostVersionEntity is allowed
// to have a state of CURRENT.
if (currentHostVersionEntity != null
&& !currentHostVersionEntity.getRepositoryVersion().equals(
hostVersionEntity.getRepositoryVersion())
&& desiredState == RepositoryVersionState.CURRENT
&& currentHostVersionEntity.getState() == RepositoryVersionState.CURRENT) {
currentHostVersionEntity.setState(RepositoryVersionState.INSTALLED);
currentHostVersionEntity = hostVersionDAO.merge(currentHostVersionEntity);
}
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* {@inheritDoc}
*/
@Override
public void transitionHostsToInstalling(ClusterVersionEntity sourceClusterVersion) throws AmbariException {
if (sourceClusterVersion == null) {
throw new AmbariException("Could not find current stack version of cluster " + getClusterName());
}
if (RepositoryVersionState.INSTALLING != sourceClusterVersion.getState()) {
throw new AmbariException("Unable to transition cluster hosts into "
+ RepositoryVersionState.INSTALLING
+ ". The only valid state is INSTALLING");
}
Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
Set<String> existingHostsWithClusterStackAndVersion = new HashSet<String>();
HashMap<String, HostVersionEntity> existingHostStackVersions = new HashMap<String, HostVersionEntity>();
clusterGlobalLock.writeLock().lock();
try {
StackEntity repoVersionStackEntity = sourceClusterVersion.getRepositoryVersion().getStack();
StackId repoVersionStackId = new StackId(repoVersionStackEntity);
List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion(
getClusterName(), repoVersionStackId,
sourceClusterVersion.getRepositoryVersion().getVersion());
// for each host that already has a stack and version, keep track of them
for (HostVersionEntity entity : existingHostVersionEntities) {
String hostName = entity.getHostName();
existingHostsWithClusterStackAndVersion.add(hostName);
existingHostStackVersions.put(hostName, entity);
}
// find any hosts that do not have the stack/repo version already
Sets.SetView<String> hostsMissingRepoVersion = Sets.difference(
hosts.keySet(), existingHostsWithClusterStackAndVersion);
createOrUpdateHostVersionToState(sourceClusterVersion, hosts,
existingHostStackVersions, hostsMissingRepoVersion,
RepositoryVersionState.INSTALLING);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Moved out to a separate method due to performance reasons
* Iterates over all hosts and creates or transitions existing host versions
* to a given state. If host version for desired stack/version does not exist,
* host version is created and initialized to a given state. Otherwise, existing
* host version state is updated
* Hosts in maintenance mode are auto skipped.
*
* @param sourceClusterVersion cluster version to be queried for a stack
* name/version info when creating a new host version
* @param hosts list of all hosts
* @param existingHostStackVersions map of existing host versions to be updated
* @param hostsMissingRepoVersion set of hostnames of hosts that have no desired host version
* @param newState target host version state for transition
*/
@Transactional
void createOrUpdateHostVersionToState(ClusterVersionEntity sourceClusterVersion,
Map<String, Host> hosts, HashMap<String, HostVersionEntity> existingHostStackVersions,
Sets.SetView<String> hostsMissingRepoVersion, RepositoryVersionState newState) {
for (String hostname : hosts.keySet()) {
// start off with the requested new state for each host
RepositoryVersionState repositoryVersionState = newState;
// if the host is in maintenance mode, that's an explicit marker which
// indicates that it should not be transitioned to INSTALLING; instead
// they will be transitioned to OUT_OF_SYNC
Host host = hosts.get(hostname);
if (host.getMaintenanceState(getClusterId()) != MaintenanceState.OFF) {
repositoryVersionState = RepositoryVersionState.OUT_OF_SYNC;
}
if (hostsMissingRepoVersion.contains(hostname)) {
// Create new host stack version
HostEntity hostEntity = hostDAO.findByName(hostname);
HostVersionEntity hostVersionEntity = new HostVersionEntity(hostEntity,
sourceClusterVersion.getRepositoryVersion(), repositoryVersionState);
hostVersionDAO.create(hostVersionEntity);
} else {
// Update existing host stack version
HostVersionEntity hostVersionEntity = existingHostStackVersions.get(hostname);
hostVersionEntity.setState(repositoryVersionState);
hostVersionEntity = hostVersionDAO.merge(hostVersionEntity);
}
}
}
/**
* Calculate the effective Cluster Version State based on the state of its hosts.
*
* CURRENT: all hosts are CURRENT
* UPGRADE_FAILED: at least one host in UPGRADE_FAILED
* UPGRADED: all hosts are UPGRADED
* UPGRADING: at least one host is UPGRADING, and the rest in UPGRADING|INSTALLED
* UPGRADING: at least one host is UPGRADED, and the rest in UPGRADING|INSTALLED
* INSTALLED: all hosts in INSTALLED
* INSTALL_FAILED: at least one host in INSTALL_FAILED
* INSTALLING: all hosts in INSTALLING. Notice that if one host is CURRENT and another is INSTALLING, then the
* effective version will be OUT_OF_SYNC.
* OUT_OF_SYNC: otherwise
* @param stateToHosts Map from state to the collection of hosts with that state
* @return Return the effective Cluster Version State
*/
private RepositoryVersionState getEffectiveState(Map<RepositoryVersionState, Set<String>> stateToHosts) {
if (stateToHosts == null || stateToHosts.keySet().size() < 1) {
return null;
}
int totalHosts = 0;
for (Set<String> hosts : stateToHosts.values()) {
totalHosts += hosts.size();
}
if (stateToHosts.containsKey(RepositoryVersionState.CURRENT) && stateToHosts.get(RepositoryVersionState.CURRENT).size() == totalHosts) {
return RepositoryVersionState.CURRENT;
}
if (stateToHosts.containsKey(RepositoryVersionState.UPGRADE_FAILED) && !stateToHosts.get(RepositoryVersionState.UPGRADE_FAILED).isEmpty()) {
return RepositoryVersionState.UPGRADE_FAILED;
}
if (stateToHosts.containsKey(RepositoryVersionState.UPGRADED) && stateToHosts.get(RepositoryVersionState.UPGRADED).size() == totalHosts) {
return RepositoryVersionState.UPGRADED;
}
if (stateToHosts.containsKey(RepositoryVersionState.UPGRADING) && !stateToHosts.get(RepositoryVersionState.UPGRADING).isEmpty()) {
return RepositoryVersionState.UPGRADING;
}
if (stateToHosts.containsKey(RepositoryVersionState.UPGRADED)
&& !stateToHosts.get(RepositoryVersionState.UPGRADED).isEmpty()
&& stateToHosts.get(RepositoryVersionState.UPGRADED).size() != totalHosts) {
// It is possible that a host has transitioned to UPGRADED state even before any other host has transitioned to UPGRADING state.
// Example: Host with single component ZOOKEEPER Server on it which is the first component to be upgraded.
return RepositoryVersionState.UPGRADING;
}
if (stateToHosts.containsKey(RepositoryVersionState.INSTALLED) && stateToHosts.get(RepositoryVersionState.INSTALLED).size() == totalHosts) {
return RepositoryVersionState.INSTALLED;
}
if (stateToHosts.containsKey(RepositoryVersionState.INSTALL_FAILED) &&
!stateToHosts.get(RepositoryVersionState.INSTALL_FAILED).isEmpty()) {
// Installation failed on some host(s). But
// cluster version state should transition to Install Failed only after
// all hosts have finished installation. Otherwise, UI will misbehave
// (hide progress dialog before installation is finished)
if (! stateToHosts.containsKey(RepositoryVersionState.INSTALLING) ||
stateToHosts.get(RepositoryVersionState.INSTALLING).isEmpty()) {
return RepositoryVersionState.INSTALL_FAILED;
}
}
final int totalINSTALLING = stateToHosts.containsKey(RepositoryVersionState.INSTALLING) ? stateToHosts.get(RepositoryVersionState.INSTALLING).size() : 0;
final int totalINSTALLED = stateToHosts.containsKey(RepositoryVersionState.INSTALLED) ? stateToHosts.get(RepositoryVersionState.INSTALLED).size() : 0;
final int totalINSTALL_FAILED = stateToHosts.containsKey(RepositoryVersionState.INSTALL_FAILED) ? stateToHosts.get(RepositoryVersionState.INSTALL_FAILED).size() : 0;
if (totalINSTALLING + totalINSTALLED + totalINSTALL_FAILED== totalHosts) {
return RepositoryVersionState.INSTALLING;
}
// Also returns when have a mix of CURRENT and INSTALLING|INSTALLED|UPGRADING|UPGRADED
LOG.warn("have a mix of CURRENT and INSTALLING|INSTALLED|UPGRADING|UPGRADED host versions, " +
"returning OUT_OF_SYNC as cluster version. Host version states: " + stateToHosts.toString());
return RepositoryVersionState.OUT_OF_SYNC;
}
/**
* {@inheritDoc}
*/
@Override
public void recalculateClusterVersionState(RepositoryVersionEntity repositoryVersion) throws AmbariException {
if (repositoryVersion == null) {
return;
}
StackId stackId = repositoryVersion.getStackId();
String version = repositoryVersion.getVersion();
Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName());
clusterGlobalLock.writeLock().lock();
try {
// Part 1, bootstrap cluster version if necessary.
ClusterVersionEntity clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
getClusterName(), stackId, version);
boolean performingInitialBootstrap = false;
if (clusterVersion == null) {
if (clusterVersionDAO.findByCluster(getClusterName()).isEmpty()) {
// During an Ambari Upgrade from 1.7.0 -> 2.0.0, the Cluster Version
// will not exist, so bootstrap it.
// This can still fail if the Repository Version has not yet been created,
// which can happen if the first HostComponentState to trigger this method
// cannot advertise a version.
performingInitialBootstrap = true;
createClusterVersionInternal(
stackId,
version,
AuthorizationHelper.getAuthenticatedName(configuration.getAnonymousAuditName()),
RepositoryVersionState.UPGRADING);
clusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
getClusterName(), stackId, version);
if (clusterVersion == null) {
LOG.warn(String.format(
"Could not create a cluster version for cluster %s and stack %s using repo version %s",
getClusterName(), stackId.getStackId(), repositoryVersion));
return;
}
} else {
LOG.warn(String.format(
"Repository version %s not found for cluster %s",
repositoryVersion, getClusterName()));
return;
}
}
// Ignore if cluster version is CURRENT or UPGRADE_FAILED
if (clusterVersion.getState() != RepositoryVersionState.INSTALL_FAILED &&
clusterVersion.getState() != RepositoryVersionState.OUT_OF_SYNC &&
clusterVersion.getState() != RepositoryVersionState.INSTALLING &&
clusterVersion.getState() != RepositoryVersionState.INSTALLED &&
clusterVersion.getState() != RepositoryVersionState.UPGRADING &&
clusterVersion.getState() != RepositoryVersionState.UPGRADED) {
// anything else is not supported as of now
return;
}
// Part 2, check for transitions.
Set<String> hostsWithoutHostVersion = new HashSet<String>();
Map<RepositoryVersionState, Set<String>> stateToHosts = new HashMap<RepositoryVersionState, Set<String>>();
//hack until better hostversion integration into in-memory cluster structure
List<HostVersionEntity> hostVersionEntities =
hostVersionDAO.findByClusterStackAndVersion(getClusterName(), stackId, version);
Set<String> hostsWithState = new HashSet<String>();
Set<String> hostsInMaintenanceState = new HashSet<>();
for (HostVersionEntity hostVersionEntity : hostVersionEntities) {
String hostname = hostVersionEntity.getHostEntity().getHostName();
Host host = hosts.get(hostname);
if(host != null && host.getMaintenanceState(getClusterId()) == MaintenanceState.ON) {
hostsInMaintenanceState.add(hostname);
continue;
}
hostsWithState.add(hostname);
RepositoryVersionState hostState = hostVersionEntity.getState();
if (stateToHosts.containsKey(hostState)) {
stateToHosts.get(hostState).add(hostname);
} else {
Set<String> hostsInState = new HashSet<String>();
hostsInState.add(hostname);
stateToHosts.put(hostState, hostsInState);
}
}
hostsWithoutHostVersion.addAll(hosts.keySet());
hostsWithoutHostVersion.removeAll(hostsWithState);
hostsWithoutHostVersion.removeAll(hostsInMaintenanceState);
// Ensure that all of the hosts without a Host Version only have
// Components that do not advertise a version.
// Otherwise, operations are still in progress.
for (String hostname : hostsWithoutHostVersion) {
HostEntity hostEntity = hostDAO.findByName(hostname);
// During initial bootstrap, unhealthy hosts are ignored
// so we boostrap the CURRENT version anyway
if (performingInitialBootstrap &&
hostEntity.getHostStateEntity().getCurrentState() != HostState.HEALTHY) {
continue;
}
final Collection<HostComponentStateEntity> allHostComponents = hostEntity.getHostComponentStateEntities();
for (HostComponentStateEntity hostComponentStateEntity : allHostComponents) {
if (hostComponentStateEntity.getVersion().equalsIgnoreCase(
State.UNKNOWN.toString())) {
// Some Components cannot advertise a version. E.g., ZKF, AMBARI_METRICS,
// Kerberos
ComponentInfo compInfo = ambariMetaInfo.getComponent(
stackId.getStackName(), stackId.getStackVersion(),
hostComponentStateEntity.getServiceName(),
hostComponentStateEntity.getComponentName());
if (compInfo.isVersionAdvertised()) {
LOG.debug("Skipping transitioning the cluster version because host "
+ hostname + " does not have a version yet.");
return;
}
}
}
}
RepositoryVersionState effectiveClusterVersionState = getEffectiveState(stateToHosts);
if (effectiveClusterVersionState != null
&& effectiveClusterVersionState != clusterVersion.getState()) {
// Any mismatch will be caught while transitioning, and raise an
// exception.
try {
transitionClusterVersion(stackId, version,
effectiveClusterVersionState);
} catch (AmbariException e) {
;
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Transition the Host Version across states.
* @param host Host object
* @param repositoryVersion Repository Version with stack and version information
* @param stack Stack information
* @throws AmbariException
*/
@Override
@Transactional
public HostVersionEntity transitionHostVersionState(HostEntity host, final RepositoryVersionEntity repositoryVersion, final StackId stack) throws AmbariException {
StackEntity repoVersionStackEntity = repositoryVersion.getStack();
StackId repoVersionStackId = new StackId(repoVersionStackEntity);
HostVersionEntity hostVersionEntity = hostVersionDAO.findByClusterStackVersionAndHost(
getClusterId(), repoVersionStackId, repositoryVersion.getVersion(),
host.getHostId());
hostTransitionStateWriteLock.lock();
try {
// Create one if it doesn't already exist. It will be possible to make further transitions below.
boolean performingInitialBootstrap = false;
if (hostVersionEntity == null) {
if (hostVersionDAO.findByClusterAndHost(getClusterName(), host.getHostName()).isEmpty()) {
// That is an initial bootstrap
performingInitialBootstrap = true;
}
hostVersionEntity = new HostVersionEntity(host, repositoryVersion, RepositoryVersionState.UPGRADING);
hostVersionDAO.create(hostVersionEntity);
}
HostVersionEntity currentVersionEntity = hostVersionDAO.findByHostAndStateCurrent(getClusterId(), host.getHostId());
boolean isCurrentPresent = (currentVersionEntity != null);
final ServiceComponentHostSummary hostSummary = new ServiceComponentHostSummary(ambariMetaInfo, host, stack);
if (!isCurrentPresent) {
// Transition from UPGRADING -> CURRENT. This is allowed because Host Version Entity is bootstrapped in an UPGRADING state.
// Alternatively, transition to CURRENT during initial bootstrap if at least one host component advertised a version
if (hostSummary.isUpgradeFinished() && hostVersionEntity.getState().equals(RepositoryVersionState.UPGRADING) || performingInitialBootstrap) {
hostVersionEntity.setState(RepositoryVersionState.CURRENT);
hostVersionEntity = hostVersionDAO.merge(hostVersionEntity);
}
} else {
// Handle transitions during a Stack Upgrade
// If a host only has one Component to update, that single report can still transition the host version from
// INSTALLED->UPGRADING->UPGRADED in one shot.
if (hostSummary.isUpgradeInProgress(currentVersionEntity.getRepositoryVersion().getVersion()) && hostVersionEntity.getState().equals(RepositoryVersionState.INSTALLED)) {
hostVersionEntity.setState(RepositoryVersionState.UPGRADING);
hostVersionEntity = hostVersionDAO.merge(hostVersionEntity);
}
if (hostSummary.isUpgradeFinished() && hostVersionEntity.getState().equals(RepositoryVersionState.UPGRADING)) {
hostVersionEntity.setState(RepositoryVersionState.UPGRADED);
hostVersionEntity = hostVersionDAO.merge(hostVersionEntity);
}
}
} finally {
hostTransitionStateWriteLock.unlock();
}
return hostVersionEntity;
}
@Override
public void recalculateAllClusterVersionStates() throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
List<ClusterVersionEntity> clusterVersionEntities = clusterVersionDAO.findByCluster(getClusterName());
StackId currentStackId = getCurrentStackVersion();
for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) {
RepositoryVersionEntity repositoryVersionEntity = clusterVersionEntity.getRepositoryVersion();
StackId repoVersionStackId = repositoryVersionEntity.getStackId();
if (repoVersionStackId.equals(currentStackId)
&& clusterVersionEntity.getState() != RepositoryVersionState.CURRENT) {
recalculateClusterVersionState(clusterVersionEntity.getRepositoryVersion());
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void createClusterVersion(StackId stackId, String version,
String userName, RepositoryVersionState state) throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
createClusterVersionInternal(stackId, version, userName, state);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* See {@link #createClusterVersion}
*
* This method is intended to be called only when cluster lock is already acquired.
*/
private void createClusterVersionInternal(StackId stackId, String version,
String userName, RepositoryVersionState state) throws AmbariException {
Set<RepositoryVersionState> allowedStates = new HashSet<RepositoryVersionState>();
Collection<ClusterVersionEntity> allClusterVersions = getAllClusterVersions();
if (allClusterVersions == null || allClusterVersions.isEmpty()) {
allowedStates.add(RepositoryVersionState.UPGRADING);
} else {
allowedStates.add(RepositoryVersionState.INSTALLING);
}
if (!allowedStates.contains(state)) {
throw new AmbariException("The allowed state for a new cluster version must be within " + allowedStates);
}
ClusterVersionEntity existing = clusterVersionDAO.findByClusterAndStackAndVersion(
getClusterName(), stackId, version);
if (existing != null) {
throw new DuplicateResourceException(
"Duplicate item, a cluster version with stack=" + stackId
+ ", version=" +
version + " for cluster " + getClusterName() + " already exists");
}
RepositoryVersionEntity repositoryVersionEntity = repositoryVersionDAO.findByStackAndVersion(
stackId, version);
if (repositoryVersionEntity == null) {
LOG.warn("Could not find repository version for stack=" + stackId
+ ", version=" + version);
return;
}
ClusterEntity clusterEntity = getClusterEntity();
ClusterVersionEntity clusterVersionEntity = new ClusterVersionEntity(
clusterEntity, repositoryVersionEntity, state,
System.currentTimeMillis(), System.currentTimeMillis(), userName);
clusterVersionDAO.create(clusterVersionEntity);
clusterEntity.getClusterVersionEntities().add(clusterVersionEntity);
clusterEntity = clusterDAO.merge(clusterEntity);
}
/**
* Transition an existing cluster version from one state to another. The
* following are some of the steps that are taken when transitioning between
* specific states:
* <ul>
* <li>UPGRADING/UPGRADED --> CURRENT</lki>: Set the current stack to the
* desired stack, ensure all hosts with the desired stack are CURRENT as well.
* </ul>
* <li>UPGRADING/UPGRADED --> CURRENT</lki>: Set the current stack to the
* desired stack. </ul>
*
* @param stackId
* Stack ID
* @param version
* Stack version
* @param state
* Desired state
* @throws AmbariException
*/
@Override
@Transactional
public void transitionClusterVersion(StackId stackId, String version,
RepositoryVersionState state) throws AmbariException {
Set<RepositoryVersionState> allowedStates = new HashSet<RepositoryVersionState>();
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
ClusterVersionEntity existingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(
getClusterName(), stackId, version);
if (existingClusterVersion == null) {
throw new AmbariException(
"Existing cluster version not found for cluster="
+ getClusterName() + ", stack=" + stackId + ", version="
+ version);
}
// NOOP
if (existingClusterVersion.getState() == state) {
return;
}
switch (existingClusterVersion.getState()) {
case CURRENT:
// If CURRENT state is changed here cluster will not have CURRENT
// state.
// CURRENT state will be changed to INSTALLED when another CURRENT
// state is added.
// allowedStates.add(RepositoryVersionState.INSTALLED);
break;
case INSTALLING:
allowedStates.add(RepositoryVersionState.INSTALLED);
allowedStates.add(RepositoryVersionState.INSTALL_FAILED);
allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
break;
case INSTALL_FAILED:
allowedStates.add(RepositoryVersionState.INSTALLING);
break;
case INSTALLED:
allowedStates.add(RepositoryVersionState.INSTALLING);
allowedStates.add(RepositoryVersionState.UPGRADING);
allowedStates.add(RepositoryVersionState.OUT_OF_SYNC);
break;
case OUT_OF_SYNC:
allowedStates.add(RepositoryVersionState.INSTALLING);
break;
case UPGRADING:
allowedStates.add(RepositoryVersionState.UPGRADED);
allowedStates.add(RepositoryVersionState.UPGRADE_FAILED);
if (clusterVersionDAO.findByClusterAndStateCurrent(getClusterName()) == null) {
allowedStates.add(RepositoryVersionState.CURRENT);
}
break;
case UPGRADED:
allowedStates.add(RepositoryVersionState.CURRENT);
break;
case UPGRADE_FAILED:
allowedStates.add(RepositoryVersionState.UPGRADING);
break;
}
if (!allowedStates.contains(state)) {
throw new AmbariException("Invalid cluster version transition from "
+ existingClusterVersion.getState() + " to " + state);
}
// There must be at most one cluster version whose state is CURRENT at
// all times.
if (state == RepositoryVersionState.CURRENT) {
ClusterVersionEntity currentVersion = clusterVersionDAO.findByClusterAndStateCurrent(getClusterName());
if (currentVersion != null) {
currentVersion.setState(RepositoryVersionState.INSTALLED);
currentVersion = clusterVersionDAO.merge(currentVersion);
}
}
existingClusterVersion.setState(state);
existingClusterVersion.setEndTime(System.currentTimeMillis());
existingClusterVersion = clusterVersionDAO.merge(existingClusterVersion);
if (state == RepositoryVersionState.CURRENT) {
for (HostEntity hostEntity : clusterEntity.getHostEntities()) {
if (hostHasReportables(existingClusterVersion.getRepositoryVersion(), hostEntity)) {
continue;
}
Collection<HostVersionEntity> versions = hostVersionDAO.findByHost(hostEntity.getHostName());
HostVersionEntity target = null;
if (null != versions) {
// Set anything that was previously marked CURRENT as INSTALLED, and
// the matching version as CURRENT
for (HostVersionEntity entity : versions) {
if (entity.getRepositoryVersion().getId().equals(
existingClusterVersion.getRepositoryVersion().getId())) {
target = entity;
target.setState(state);
target = hostVersionDAO.merge(target);
} else if (entity.getState() == RepositoryVersionState.CURRENT) {
entity.setState(RepositoryVersionState.INSTALLED);
entity = hostVersionDAO.merge(entity);
}
}
}
if (null == target) {
// If no matching version was found, create one with the desired
// state
HostVersionEntity hve = new HostVersionEntity(hostEntity,
existingClusterVersion.getRepositoryVersion(), state);
hostVersionDAO.create(hve);
}
}
// when setting the cluster's state to current, we must also
// bring the desired stack and current stack in line with each other
StackEntity desiredStackEntity = clusterEntity.getDesiredStack();
StackId desiredStackId = new StackId(desiredStackEntity);
// if the desired stack ID doesn't match the target when setting the
// cluster to CURRENT, then there's a problem
if (!desiredStackId.equals(stackId)) {
String message = MessageFormat.format(
"The desired stack ID {0} must match {1} when transitioning the cluster''s state to {2}",
desiredStackId, stackId, RepositoryVersionState.CURRENT);
throw new AmbariException(message);
}
setCurrentStackVersion(stackId);
}
}
} catch (RollbackException e) {
String message = MessageFormat.format(
"Unable to transition stack {0} at version {1} for cluster {2} to state {3}",
stackId, version, getClusterName(), state);
LOG.warn(message);
throw new AmbariException(message, e);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Checks if the host has any components reporting version information.
* @param repoVersion the repo version
* @param host the host entity
* @return {@code true} if the host has any component that report version
* @throws AmbariException
*/
private boolean hostHasReportables(RepositoryVersionEntity repoVersion, HostEntity host)
throws AmbariException {
for (HostComponentStateEntity hcse : host.getHostComponentStateEntities()) {
ComponentInfo ci = ambariMetaInfo.getComponent(
repoVersion.getStackName(),
repoVersion.getStackVersion(),
hcse.getServiceName(),
hcse.getComponentName());
if (ci.isVersionAdvertised()) {
return true;
}
}
return false;
}
@Override
@Transactional
public void setCurrentStackVersion(StackId stackId) throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
StackEntity stackEntity = stackDAO.find(stackId.getStackName(),
stackId.getStackVersion());
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK(clusterEntity.getClusterId());
if (clusterStateEntity == null) {
clusterStateEntity = new ClusterStateEntity();
clusterStateEntity.setClusterId(clusterEntity.getClusterId());
clusterStateEntity.setCurrentStack(stackEntity);
clusterStateEntity.setClusterEntity(clusterEntity);
clusterStateDAO.create(clusterStateEntity);
clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
clusterEntity.setClusterStateEntity(clusterStateEntity);
clusterEntity = clusterDAO.merge(clusterEntity);
} else {
clusterStateEntity.setCurrentStack(stackEntity);
clusterStateEntity = clusterStateDAO.merge(clusterStateEntity);
clusterEntity = clusterDAO.merge(clusterEntity);
}
}
} catch (RollbackException e) {
LOG.warn("Unable to set version " + stackId + " for cluster "
+ getClusterName());
throw new AmbariException("Unable to set" + " version=" + stackId
+ " for cluster " + getClusterName(), e);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Map<String, Config> getConfigsByType(String configType) {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)) {
return null;
}
return Collections.unmodifiableMap(allConfigs.get(configType));
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Config getConfig(String configType, String versionTag) {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)
|| !allConfigs.get(configType).containsKey(versionTag)) {
return null;
}
return allConfigs.get(configType).get(versionTag);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Config getConfigByVersion(String configType, Long configVersion) {
clusterGlobalLock.readLock().lock();
try {
if (!allConfigs.containsKey(configType)) {
return null;
}
for(Map.Entry<String, Config> entry: allConfigs.get(configType).entrySet()) {
if(entry.getValue().getVersion().equals(configVersion)) {
return entry.getValue();
}
}
return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void addConfig(Config config) {
loadConfigurations();
clusterGlobalLock.writeLock().lock();
try {
if (config.getType() == null || config.getType().isEmpty()) {
throw new IllegalArgumentException("Config type cannot be empty");
}
if (!allConfigs.containsKey(config.getType())) {
allConfigs.put(config.getType(), new HashMap<String, Config>());
}
allConfigs.get(config.getType()).put(config.getTag(), config);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Collection<Config> getAllConfigs() {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<Config> list = new ArrayList<Config>();
for (Entry<String, Map<String, Config>> entry : allConfigs.entrySet()) {
for (Config config : entry.getValue().values()) {
list.add(config);
}
}
return Collections.unmodifiableList(list);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public ClusterResponse convertToResponse()
throws AmbariException {
loadStackVersion();
String clusterName = getClusterName();
Map<String, Host> hosts = clusters.getHostsForCluster(clusterName);
clusterGlobalLock.readLock().lock();
try {
return new ClusterResponse(getClusterId(), clusterName,
getProvisioningState(), getSecurityType(), hosts.keySet(),
hosts.size(), getDesiredStackVersion().getStackId(),
getClusterHealthReport(hosts));
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public void debugDump(StringBuilder sb) {
loadServices();
loadStackVersion();
clusterGlobalLock.readLock().lock();
try {
sb.append("Cluster={ clusterName=").append(getClusterName()).append(
", clusterId=").append(getClusterId()).append(
", desiredStackVersion=").append(desiredStackVersion.getStackId()).append(
", services=[ ");
boolean first = true;
for (Service s : services.values()) {
if (!first) {
sb.append(" , ");
}
first = false;
sb.append("\n ");
s.debugDump(sb);
sb.append(' ');
}
sb.append(" ] }");
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
@Transactional
public void refresh() {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
clusterDAO.refresh(clusterEntity);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
@Transactional
public void deleteAllServices() throws AmbariException {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
LOG.info("Deleting all services for cluster" + ", clusterName="
+ getClusterName());
for (Service service : services.values()) {
if (!service.canBeRemoved()) {
throw new AmbariException(
"Found non removable service when trying to"
+ " all services from cluster" + ", clusterName="
+ getClusterName() + ", serviceName=" + service.getName());
}
}
for (Service service : services.values()) {
deleteService(service);
}
services.clear();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public void deleteService(String serviceName)
throws AmbariException {
loadServices();
clusterGlobalLock.writeLock().lock();
try {
Service service = getService(serviceName);
LOG.info("Deleting service for cluster" + ", clusterName="
+ getClusterName() + ", serviceName=" + service.getName());
// FIXME check dependencies from meta layer
if (!service.canBeRemoved()) {
throw new AmbariException("Could not delete service from cluster"
+ ", clusterName=" + getClusterName()
+ ", serviceName=" + service.getName());
}
deleteService(service);
services.remove(serviceName);
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Deletes the specified service also removes references to it from {@link this.serviceComponentHosts}
* and references to ServiceComponentHost objects that belong to the service from {@link this.serviceComponentHostsByHost}
* <p>
* Note: This method must be called only with write lock acquired.
* </p>
* @param service the service to be deleted
* @throws AmbariException
* @see ServiceComponentHost
*/
private void deleteService(Service service) throws AmbariException {
final String serviceName = service.getName();
service.delete();
serviceComponentHosts.remove(serviceName);
for (List<ServiceComponentHost> serviceComponents: serviceComponentHostsByHost.values()){
Iterables.removeIf(serviceComponents, new Predicate<ServiceComponentHost>() {
@Override
public boolean apply(ServiceComponentHost serviceComponentHost) {
return serviceComponentHost.getServiceName().equals(serviceName);
}
});
}
}
@Override
public boolean canBeRemoved() {
loadServices();
clusterGlobalLock.readLock().lock();
try {
boolean safeToRemove = true;
for (Service service : services.values()) {
if (!service.canBeRemoved()) {
safeToRemove = false;
LOG.warn("Found non removable service" + ", clusterName="
+ getClusterName() + ", serviceName=" + service.getName());
}
}
return safeToRemove;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
@Transactional
public void delete() throws AmbariException {
clusterGlobalLock.writeLock().lock();
try {
refresh();
deleteAllServices();
removeEntities();
allConfigs.clear();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Transactional
protected void removeEntities() throws AmbariException {
long clusterId = getClusterId();
alertDefinitionDAO.removeAll(clusterId);
alertDispatchDAO.removeAllGroups(clusterId);
upgradeDAO.removeAll(clusterId);
topologyRequestDAO.removeAll(clusterId);
clusterDAO.removeByPK(clusterId);
}
@Override
public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs) {
return addDesiredConfig(user, configs, null);
}
@Override
public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs, String serviceConfigVersionNote) {
if (null == user) {
throw new NullPointerException("User must be specified.");
}
clusterGlobalLock.writeLock().lock();
try {
if (configs == null) {
return null;
}
Iterator<Config> configIterator = configs.iterator();
while (configIterator.hasNext()) {
Config config = configIterator.next();
if (config == null) {
configIterator.remove();
continue;
}
Config currentDesired = getDesiredConfigByType(config.getType());
// do not set if it is already the current
if (null != currentDesired
&& currentDesired.getTag().equals(config.getTag())) {
configIterator.remove();
}
}
ServiceConfigVersionResponse serviceConfigVersionResponse = applyConfigs(
configs, user, serviceConfigVersionNote);
configHelper.invalidateStaleConfigsCache();
return serviceConfigVersionResponse;
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Gets all versions of the desired configurations for the cluster.
* @return a map of type-to-configuration information.
*/
@Override
public Map<String, Set<DesiredConfig>> getAllDesiredConfigVersions() {
return getDesiredConfigs(true, false);
}
/**
* Gets the active desired configurations for the cluster.
* @return a map of type-to-configuration information.
*/
@Override
public Map<String, DesiredConfig> getDesiredConfigs() {
return getDesiredConfigs(false);
}
@Override
public Map<String, DesiredConfig> getDesiredConfigs(boolean bypassCache) {
Map<String, Set<DesiredConfig>> activeConfigsByType = getDesiredConfigs(false, bypassCache);
return Maps.transformEntries(
activeConfigsByType,
new Maps.EntryTransformer<String, Set<DesiredConfig>, DesiredConfig>() {
@Override
public DesiredConfig transformEntry(@Nullable String key, @Nullable Set<DesiredConfig> value) {
return value.iterator().next();
}
});
}
/**
* Gets desired configurations for the cluster.
* @param allVersions specifies if all versions of the desired configurations to be returned
* or only the active ones. It is expected that there is one and only one active
* desired configuration per config type.
* @return a map of type-to-configuration information.
*/
private Map<String, Set<DesiredConfig>> getDesiredConfigs(boolean allVersions, boolean bypassCache) {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
Map<String, Set<DesiredConfig>> map = new HashMap<>();
Collection<String> types = new HashSet<>();
Collection<ClusterConfigMappingEntity> entities =
bypassCache ?
clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId()) :
getClusterEntity().getConfigMappingEntities();
for (ClusterConfigMappingEntity e : entities) {
if (allVersions || e.isSelected() > 0) {
DesiredConfig c = new DesiredConfig();
c.setServiceName(null);
c.setTag(e.getTag());
c.setUser(e.getUser());
if(!allConfigs.containsKey(e.getType())) {
LOG.error("Config inconsistency exists:" +
" unknown configType=" + e.getType());
continue;
}
Map<String, Config> configMap = allConfigs.get(e.getType());
if(!configMap.containsKey(e.getTag())) {
LOG.debug("Config inconsistency exists for typeName=" +
e.getType() +
", unknown versionTag=" + e.getTag());
continue;
}
Config config = configMap.get(e.getTag());
c.setVersion(config.getVersion());
Set<DesiredConfig> configs = map.get(e.getType());
if (configs == null) {
configs = new HashSet<>();
}
configs.add(c);
map.put(e.getType(), configs);
types.add(e.getType());
}
}
// TODO AMBARI-10679, need efficient caching from hostId to hostName...
Map<Long, String> hostIdToName = new HashMap<Long, String>();
if (!map.isEmpty()) {
Map<String, List<HostConfigMapping>> hostMappingsByType =
hostConfigMappingDAO.findSelectedHostsByTypes(clusterId, types);
for (Entry<String, Set<DesiredConfig>> entry : map.entrySet()) {
List<DesiredConfig.HostOverride> hostOverrides = new ArrayList<DesiredConfig.HostOverride>();
for (HostConfigMapping mappingEntity : hostMappingsByType.get(entry.getKey())) {
if (!hostIdToName.containsKey(mappingEntity.getHostId())) {
HostEntity hostEntity = hostDAO.findById(mappingEntity.getHostId());
hostIdToName.put(mappingEntity.getHostId(), hostEntity.getHostName());
}
hostOverrides.add(new DesiredConfig.HostOverride(
hostIdToName.get(mappingEntity.getHostId()), mappingEntity.getVersion()));
}
for (DesiredConfig c: entry.getValue()) {
c.setHostOverrides(hostOverrides);
}
}
}
return map;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public ServiceConfigVersionResponse createServiceConfigVersion(
String serviceName, String user, String note, ConfigGroup configGroup) {
// Create next service config version
ServiceConfigEntity serviceConfigEntity = new ServiceConfigEntity();
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
// set config group
if (configGroup != null) {
serviceConfigEntity.setGroupId(configGroup.getId());
Collection<Config> configs = configGroup.getConfigurations().values();
List<ClusterConfigEntity> configEntities = new ArrayList<ClusterConfigEntity>(configs.size());
for (Config config : configs) {
configEntities.add(clusterDAO.findConfig(getClusterId(), config.getType(), config.getTag()));
}
serviceConfigEntity.setClusterConfigEntities(configEntities);
} else {
List<ClusterConfigEntity> configEntities = getClusterConfigEntitiesByService(serviceName);
serviceConfigEntity.setClusterConfigEntities(configEntities);
}
long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion(clusterId, serviceName);
serviceConfigEntity.setServiceName(serviceName);
serviceConfigEntity.setClusterEntity(clusterEntity);
serviceConfigEntity.setVersion(nextServiceConfigVersion);
serviceConfigEntity.setUser(user);
serviceConfigEntity.setNote(note);
serviceConfigEntity.setStack(clusterEntity.getDesiredStack());
serviceConfigDAO.create(serviceConfigEntity);
if (configGroup != null) {
serviceConfigEntity.setHostIds(new ArrayList<Long>(configGroup.getHosts().keySet()));
serviceConfigEntity = serviceConfigDAO.merge(serviceConfigEntity);
}
}
} finally {
clusterGlobalLock.writeLock().unlock();
}
configChangeLog.info("Cluster '{}' changed by: '{}'; service_name='{}' config_group='{}' config_group_id='{}' " +
"version='{}'", getClusterName(), user, serviceName,
configGroup == null ? ServiceConfigVersionResponse.DEFAULT_CONFIG_GROUP_NAME : configGroup.getName(),
configGroup == null ? "-1" : configGroup.getId(),
serviceConfigEntity.getVersion());
String configGroupName = configGroup != null ? configGroup.getName() : ServiceConfigVersionResponse.DEFAULT_CONFIG_GROUP_NAME;
ServiceConfigVersionResponse response = new ServiceConfigVersionResponse(
serviceConfigEntity, configGroupName);
return response;
}
@Override
public String getServiceForConfigTypes(Collection<String> configTypes) {
//debug
LOG.info("Looking for service for config types {}", configTypes);
String serviceName = null;
for (String configType : configTypes) {
for (Entry<String, String> entry : serviceConfigTypes.entries()) {
if (StringUtils.equals(entry.getValue(), configType)) {
if (serviceName != null) {
if (entry.getKey()!=null && !StringUtils.equals(serviceName, entry.getKey())) {
throw new IllegalArgumentException("Config type {} belongs to {} service, " +
"but config group qualified for {}");
}
} else {
serviceName = entry.getKey();
}
}
}
}
LOG.info("Service {} returning", serviceName);
return serviceName;
}
public String getServiceByConfigType(String configType) {
for (Entry<String, String> entry : serviceConfigTypes.entries()) {
String serviceName = entry.getKey();
String type = entry.getValue();
if (StringUtils.equals(type, configType)) {
return serviceName;
}
}
return null;
}
@Override
public ServiceConfigVersionResponse setServiceConfigVersion(String serviceName, Long version, String user, String note) throws AmbariException {
if (null == user) {
throw new NullPointerException("User must be specified.");
}
clusterGlobalLock.writeLock().lock();
try {
ServiceConfigVersionResponse serviceConfigVersionResponse = applyServiceConfigVersion(
serviceName, version, user, note);
configHelper.invalidateStaleConfigsCache();
return serviceConfigVersionResponse;
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
@Override
public Map<String, Collection<ServiceConfigVersionResponse>> getActiveServiceConfigVersions() {
clusterGlobalLock.readLock().lock();
try {
Map<String, Collection<ServiceConfigVersionResponse>> map = new HashMap<String, Collection<ServiceConfigVersionResponse>>();
Set<ServiceConfigVersionResponse> responses = getActiveServiceConfigVersionSet();
for (ServiceConfigVersionResponse response : responses) {
if (map.get(response.getServiceName()) == null) {
map.put(response.getServiceName(),
new ArrayList<ServiceConfigVersionResponse>());
}
map.get(response.getServiceName()).add(response);
}
return map;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public List<ServiceConfigVersionResponse> getServiceConfigVersions() {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<ServiceConfigVersionResponse>();
List<ServiceConfigEntity> serviceConfigs = serviceConfigDAO.getServiceConfigs(getClusterId());
// Gather for each service in each config group the active service config response as we
// iterate through all service config responses
Map<String, Map<String, ServiceConfigVersionResponse>> activeServiceConfigResponses = new HashMap<>();
for (ServiceConfigEntity serviceConfigEntity : serviceConfigs) {
ServiceConfigVersionResponse serviceConfigVersionResponse = convertToServiceConfigVersionResponse(serviceConfigEntity);
Map<String, ServiceConfigVersionResponse> activeServiceConfigResponseGroups = activeServiceConfigResponses.get(serviceConfigVersionResponse.getServiceName());
if (activeServiceConfigResponseGroups == null) {
Map<String, ServiceConfigVersionResponse> serviceConfigGroups = new HashMap<>();
activeServiceConfigResponses.put(serviceConfigVersionResponse.getServiceName(), serviceConfigGroups);
activeServiceConfigResponseGroups = serviceConfigGroups;
}
// the active config within a group
ServiceConfigVersionResponse activeServiceConfigResponse = activeServiceConfigResponseGroups.get(serviceConfigVersionResponse.getGroupName());
if (activeServiceConfigResponse == null && !ServiceConfigVersionResponse.DELETED_CONFIG_GROUP_NAME.equals(serviceConfigVersionResponse.getGroupName())) {
// service config version with deleted group should always be marked is not current
activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse);
activeServiceConfigResponse = serviceConfigVersionResponse;
}
serviceConfigVersionResponse.setConfigurations(new ArrayList<ConfigurationResponse>());
if (serviceConfigEntity.getGroupId() == null) {
if (serviceConfigVersionResponse.getCreateTime() > activeServiceConfigResponse.getCreateTime()) {
activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse);
}
}
else if (clusterConfigGroups != null && clusterConfigGroups.containsKey(serviceConfigEntity.getGroupId())){
if (serviceConfigVersionResponse.getVersion() > activeServiceConfigResponse.getVersion()) {
activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse);
}
}
serviceConfigVersionResponse.setIsCurrent(false);
List<ClusterConfigEntity> clusterConfigEntities = serviceConfigEntity.getClusterConfigEntities();
for (ClusterConfigEntity clusterConfigEntity : clusterConfigEntities) {
Config config = allConfigs.get(clusterConfigEntity.getType()).get(
clusterConfigEntity.getTag());
serviceConfigVersionResponse.getConfigurations().add(
new ConfigurationResponse(getClusterName(), config.getStackId(),
config.getType(), config.getTag(), config.getVersion(),
config.getProperties(), config.getPropertiesAttributes(), config.getPropertiesTypes()));
}
serviceConfigVersionResponses.add(serviceConfigVersionResponse);
}
for (Map<String, ServiceConfigVersionResponse> serviceConfigVersionResponseGroup: activeServiceConfigResponses.values()) {
for (ServiceConfigVersionResponse serviceConfigVersionResponse : serviceConfigVersionResponseGroup.values()) {
serviceConfigVersionResponse.setIsCurrent(true);
}
}
return serviceConfigVersionResponses;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
private Set<ServiceConfigVersionResponse> getActiveServiceConfigVersionSet() {
Set<ServiceConfigVersionResponse> responses = new HashSet<ServiceConfigVersionResponse>();
List<ServiceConfigEntity> activeServiceConfigVersions = getActiveServiceConfigVersionEntities();
for (ServiceConfigEntity lastServiceConfig : activeServiceConfigVersions) {
ServiceConfigVersionResponse response = convertToServiceConfigVersionResponse(lastServiceConfig);
response.setIsCurrent(true); //mark these as current, as they are
responses.add(response);
}
return responses;
}
private List<ServiceConfigEntity> getActiveServiceConfigVersionEntities() {
List<ServiceConfigEntity> activeServiceConfigVersions = new ArrayList<ServiceConfigEntity>();
//for services
activeServiceConfigVersions.addAll(serviceConfigDAO.getLastServiceConfigs(getClusterId()));
//for config groups
if (clusterConfigGroups != null) {
activeServiceConfigVersions.addAll(
serviceConfigDAO.getLastServiceConfigVersionsForGroups(clusterConfigGroups.keySet()));
}
return activeServiceConfigVersions;
}
@RequiresSession
ServiceConfigVersionResponse getActiveServiceConfigVersion(String serviceName) {
ServiceConfigEntity lastServiceConfig = serviceConfigDAO.getLastServiceConfig(getClusterId(), serviceName);
if (lastServiceConfig == null) {
LOG.debug("No service config version found for service {}", serviceName);
return null;
}
return convertToServiceConfigVersionResponse(lastServiceConfig);
}
@RequiresSession
ServiceConfigVersionResponse convertToServiceConfigVersionResponse(ServiceConfigEntity serviceConfigEntity) {
Long groupId = serviceConfigEntity.getGroupId();
String groupName;
if (groupId != null) {
ConfigGroup configGroup = null;
if (clusterConfigGroups != null) {
configGroup = clusterConfigGroups.get(groupId);
}
if (configGroup != null) {
groupName = configGroup.getName();
} else {
groupName = ServiceConfigVersionResponse.DELETED_CONFIG_GROUP_NAME;
}
} else {
groupName = ServiceConfigVersionResponse.DEFAULT_CONFIG_GROUP_NAME;
}
ServiceConfigVersionResponse serviceConfigVersionResponse = new ServiceConfigVersionResponse(
serviceConfigEntity, groupName);
return serviceConfigVersionResponse;
}
@Transactional
ServiceConfigVersionResponse applyServiceConfigVersion(String serviceName, Long serviceConfigVersion, String user,
String serviceConfigVersionNote) throws AmbariException {
ServiceConfigEntity serviceConfigEntity = serviceConfigDAO.findByServiceAndVersion(serviceName, serviceConfigVersion);
if (serviceConfigEntity == null) {
throw new ObjectNotFoundException("Service config version with serviceName={} and version={} not found");
}
//disable all configs related to service
if (serviceConfigEntity.getGroupId() == null) {
Collection<String> configTypes = serviceConfigTypes.get(serviceName);
List<ClusterConfigMappingEntity> mappingEntities =
clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId());
for (ClusterConfigMappingEntity entity : mappingEntities) {
if (configTypes.contains(entity.getType()) && entity.isSelected() > 0) {
entity.setSelected(0);
entity = clusterDAO.mergeConfigMapping(entity);
}
}
for (ClusterConfigEntity configEntity : serviceConfigEntity.getClusterConfigEntities()) {
selectConfig(configEntity.getType(), configEntity.getTag(), user);
}
} else {
Long configGroupId = serviceConfigEntity.getGroupId();
ConfigGroup configGroup = clusterConfigGroups.get(configGroupId);
if (configGroup != null) {
Map<String, Config> groupDesiredConfigs = new HashMap<String, Config>();
for (ClusterConfigEntity entity : serviceConfigEntity.getClusterConfigEntities()) {
Config config = allConfigs.get(entity.getType()).get(entity.getTag());
groupDesiredConfigs.put(config.getType(), config);
}
configGroup.setConfigurations(groupDesiredConfigs);
Map<Long, Host> groupDesiredHosts = new HashMap<Long, Host>();
if (serviceConfigEntity.getHostIds() != null) {
for (Long hostId : serviceConfigEntity.getHostIds()) {
Host host = clusters.getHostById(hostId);
if (host != null) {
groupDesiredHosts.put(hostId, host);
} else {
LOG.warn("Host with id {} doesn't exist anymore, skipping", hostId);
}
}
}
configGroup.setHosts(groupDesiredHosts);
configGroup.persist();
} else {
throw new IllegalArgumentException("Config group {} doesn't exist");
}
}
ClusterEntity clusterEntity = getClusterEntity();
long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion(
clusterEntity.getClusterId(), serviceName);
ServiceConfigEntity serviceConfigEntityClone = new ServiceConfigEntity();
serviceConfigEntityClone.setCreateTimestamp(System.currentTimeMillis());
serviceConfigEntityClone.setUser(user);
serviceConfigEntityClone.setServiceName(serviceName);
serviceConfigEntityClone.setClusterEntity(clusterEntity);
serviceConfigEntityClone.setStack(serviceConfigEntity.getStack());
serviceConfigEntityClone.setClusterConfigEntities(serviceConfigEntity.getClusterConfigEntities());
serviceConfigEntityClone.setClusterId(serviceConfigEntity.getClusterId());
serviceConfigEntityClone.setHostIds(serviceConfigEntity.getHostIds());
serviceConfigEntityClone.setGroupId(serviceConfigEntity.getGroupId());
serviceConfigEntityClone.setNote(serviceConfigVersionNote);
serviceConfigEntityClone.setVersion(nextServiceConfigVersion);
serviceConfigDAO.create(serviceConfigEntityClone);
return convertToServiceConfigVersionResponse(serviceConfigEntityClone);
}
@Transactional
void selectConfig(String type, String tag, String user) {
Collection<ClusterConfigMappingEntity> entities =
clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId());
//disable previous config
for (ClusterConfigMappingEntity e : entities) {
if (e.isSelected() > 0 && e.getType().equals(type)) {
e.setSelected(0);
e = clusterDAO.mergeConfigMapping(e);
}
}
ClusterEntity clusterEntity = getClusterEntity();
ClusterConfigMappingEntity entity = new ClusterConfigMappingEntity();
entity.setClusterEntity(clusterEntity);
entity.setClusterId(clusterEntity.getClusterId());
entity.setCreateTimestamp(System.currentTimeMillis());
entity.setSelected(1);
entity.setUser(user);
entity.setType(type);
entity.setTag(tag);
clusterDAO.persistConfigMapping(entity);
clusterEntity.getConfigMappingEntities().add(entity);
clusterDAO.merge(clusterEntity);
}
@Transactional
ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, String serviceConfigVersionNote) {
String serviceName = null;
for (Config config : configs) {
for (Entry<String, String> entry : serviceConfigTypes.entries()) {
if (StringUtils.equals(entry.getValue(), config.getType())) {
if (serviceName == null) {
serviceName = entry.getKey();
break;
} else if (!serviceName.equals(entry.getKey())) {
String error = String.format("Updating configs for multiple services by a " +
"single API request isn't supported. Conflicting services %s and %s for %s",
serviceName, entry.getKey(), config.getType());
IllegalArgumentException exception = new IllegalArgumentException(error);
LOG.error(error + ", config version not created for {}", serviceName);
throw exception;
} else {
break;
}
}
}
}
for (Config config: configs) {
selectConfig(config.getType(), config.getTag(), user);
}
if (serviceName == null) {
ArrayList<String> configTypes = new ArrayList<>();
for (Config config: configs) {
configTypes.add(config.getType());
}
LOG.error("No service found for config types '{}', service config version not created", configTypes);
return null;
} else {
return createServiceConfigVersion(serviceName, user, serviceConfigVersionNote);
}
}
private ServiceConfigVersionResponse createServiceConfigVersion(String serviceName, String user,
String serviceConfigVersionNote) {
//create next service config version
return createServiceConfigVersion(serviceName, user, serviceConfigVersionNote, null);
}
private List<ClusterConfigEntity> getClusterConfigEntitiesByService(String serviceName) {
List<ClusterConfigEntity> configEntities = new ArrayList<ClusterConfigEntity>();
//add configs from this service
Collection<String> configTypes = serviceConfigTypes.get(serviceName);
for (ClusterConfigMappingEntity mappingEntity : clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId())) {
if (mappingEntity.isSelected() > 0 && configTypes.contains(mappingEntity.getType())) {
ClusterConfigEntity configEntity =
clusterDAO.findConfig(getClusterId(), mappingEntity.getType(), mappingEntity.getTag());
if (configEntity != null) {
configEntities.add(configEntity);
} else {
LOG.error("Desired cluster config type={}, tag={} is not present in database," +
" unable to add to service config version");
}
}
}
return configEntities;
}
@Override
public Config getDesiredConfigByType(String configType) {
loadConfigurations();
clusterGlobalLock.readLock().lock();
try {
for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId())) {
if (e.isSelected() > 0 && e.getType().equals(configType)) {
return getConfig(e.getType(), e.getTag());
}
}
return null;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public boolean isConfigTypeExists(String configType) {
clusterGlobalLock.readLock().lock();
try {
for (ClusterConfigMappingEntity e : clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId())) {
if (e.getType().equals(configType)) {
return true;
}
}
return false;
} finally {
clusterGlobalLock.readLock().unlock();
}
}
@Override
public Map<Long, Map<String, DesiredConfig>> getHostsDesiredConfigs(Collection<Long> hostIds) {
if (hostIds == null || hostIds.isEmpty()) {
return Collections.emptyMap();
}
Set<HostConfigMapping> mappingEntities =
hostConfigMappingDAO.findSelectedByHosts(hostIds);
Map<Long, Map<String, DesiredConfig>> desiredConfigsByHost = new HashMap<Long, Map<String, DesiredConfig>>();
for (Long hostId : hostIds) {
desiredConfigsByHost.put(hostId, new HashMap<String, DesiredConfig>());
}
for (HostConfigMapping mappingEntity : mappingEntities) {
DesiredConfig desiredConfig = new DesiredConfig();
desiredConfig.setTag(mappingEntity.getVersion());
desiredConfig.setServiceName(mappingEntity.getServiceName());
desiredConfig.setUser(mappingEntity.getUser());
desiredConfigsByHost.get(mappingEntity.getHostId()).put(mappingEntity.getType(), desiredConfig);
}
return desiredConfigsByHost;
}
@Override
public Map<Long, Map<String, DesiredConfig>> getAllHostsDesiredConfigs() {
Collection<Long> hostIds;
try {
hostIds = clusters.getHostIdsForCluster(clusterName).keySet();
} catch (AmbariException ignored) {
return Collections.emptyMap();
}
return getHostsDesiredConfigs(hostIds);
}
/**
* {@inheritDoc}
*/
@Override
public Long getNextConfigVersion(String type) {
return clusterDAO.findNextConfigVersion(clusterId, type);
}
/**
* {@inheritDoc}
*/
@Override
public Map<ServiceComponentHostEvent, String> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap) {
clusterGlobalLock.readLock().lock();
try {
return processServiceComponentHostEventsInSingleTransaction(eventMap);
} finally {
clusterGlobalLock.readLock().unlock();
}
}
/**
* Bulk handle service component host events, wrapping all handling in a
* single transaction. This allows
* {@link #processServiceComponentHostEvents(ListMultimap)} to lock around the
* AoP {@link Transactional} annotation so that the lock is not released
* before the transaction is committed.
*
* @param eventMap
* @return the events which failed to be processed.
*/
@Transactional
protected Map<ServiceComponentHostEvent, String> processServiceComponentHostEventsInSingleTransaction(
ListMultimap<String, ServiceComponentHostEvent> eventMap) {
Map<ServiceComponentHostEvent, String> failedEvents = new HashMap<ServiceComponentHostEvent, String>();
for (Entry<String, ServiceComponentHostEvent> entry : eventMap.entries()) {
String serviceName = entry.getKey();
ServiceComponentHostEvent event = entry.getValue();
String serviceComponentName = event.getServiceComponentName();
// server-side events either don't have a service name or are AMBARI;
// either way they are not handled by this method since it expects a
// real service and component
if (StringUtils.isBlank(serviceName) || Services.AMBARI.name().equals(serviceName)) {
continue;
}
if (StringUtils.isBlank(serviceComponentName)) {
continue;
}
try {
Service service = getService(serviceName);
ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentName);
ServiceComponentHost serviceComponentHost = serviceComponent.getServiceComponentHost(
event.getHostName());
serviceComponentHost.handleEvent(event);
} catch (ServiceNotFoundException e) {
String message = String.format(
"ServiceComponentHost lookup exception. Service not found for Service: %s. Error: %s",
serviceName, e.getMessage());
LOG.error(message);
failedEvents.put(event, message);
} catch (ServiceComponentNotFoundException e) {
String message = String.format(
"ServiceComponentHost lookup exception. Service Component not found for Service: %s, Component: %s. Error: %s",
serviceName, serviceComponentName, e.getMessage());
LOG.error(message);
failedEvents.put(event, message);
} catch (ServiceComponentHostNotFoundException e) {
String message = String.format(
"ServiceComponentHost lookup exception. Service Component Host not found for Service: %s, Component: %s, Host: %s. Error: %s",
serviceName, serviceComponentName, event.getHostName(), e.getMessage());
LOG.error(message);
failedEvents.put(event, message);
} catch (AmbariException e) {
String message = String.format("ServiceComponentHost lookup exception %s", e.getMessage());
LOG.error(message);
failedEvents.put(event, message);
} catch (InvalidStateTransitionException e) {
LOG.error("Invalid transition ", e);
boolean isFailure = true;
Enum<?> currentState = e.getCurrentState();
Enum<?> failedEvent = e.getEvent();
// skip adding this as a failed event, to work around stack ordering
// issues with Hive
if (currentState == State.STARTED &&
failedEvent == ServiceComponentHostEventType.HOST_SVCCOMP_START){
isFailure = false;
LOG.warn(
"The start request for {} is invalid since the component is already started. Ignoring the request.",
serviceComponentName);
}
// unknown hosts should be able to be put back in progress and let the
// action scheduler fail it; don't abort the entire stage just because
// this happens
if (currentState == State.UNKNOWN
&& failedEvent == ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS) {
isFailure = false;
LOG.warn("The host {} is in an unknown state; attempting to put {} back in progress.",
event.getHostName(),
serviceComponentName);
}
// fail the event, causing it to automatically abort
if (isFailure) {
failedEvents.put(event, String.format("Invalid transition. %s", e.getMessage()));
}
}
}
return failedEvents;
}
/**
* @param serviceName name of the service
* @param componentName name of the component
* @return the set of hosts for the provided service and component
*/
@Override
public Set<String> getHosts(String serviceName, String componentName) {
Map<String, Service> services = getServices();
if (!services.containsKey(serviceName)) {
return Collections.emptySet();
}
Service service = services.get(serviceName);
Map<String, ServiceComponent> components = service.getServiceComponents();
if (!components.containsKey(componentName) ||
components.get(componentName).getServiceComponentHosts().size() == 0) {
return Collections.emptySet();
}
return components.get(componentName).getServiceComponentHosts().keySet();
}
@Override
public Collection<Host> getHosts() {
Map<String, Host> hosts;
try {
//todo: why the hell does this method throw AmbariException???
//todo: this is ridiculous that I need to get hosts for this cluster from Clusters!!!
//todo: should I getHosts using the same logic as the other getHosts call? At least that doesn't throw AmbariException.
hosts = clusters.getHostsForCluster(clusterName);
} catch (AmbariException e) {
//todo: in what conditions is AmbariException thrown?
throw new RuntimeException("Unable to get hosts for cluster: " + clusterName, e);
}
return hosts == null ? Collections.<Host>emptyList() : hosts.values();
}
private ClusterHealthReport getClusterHealthReport(
Map<String, Host> clusterHosts) throws AmbariException {
int staleConfigsHosts = 0;
int maintenanceStateHosts = 0;
int healthyStateHosts = 0;
int unhealthyStateHosts = 0;
int initStateHosts = 0;
int healthyStatusHosts = 0;
int unhealthyStatusHosts = 0;
int unknownStatusHosts = 0;
int alertStatusHosts = 0;
int heartbeatLostStateHosts = 0;
Collection<Host> hosts = clusterHosts.values();
Iterator<Host> iterator = hosts.iterator();
while (iterator.hasNext()) {
Host host = iterator.next();
String hostName = host.getHostName();
switch (host.getState()) {
case HEALTHY:
healthyStateHosts++;
break;
case UNHEALTHY:
unhealthyStateHosts++;
break;
case INIT:
initStateHosts++;
break;
case HEARTBEAT_LOST:
heartbeatLostStateHosts++;
break;
}
switch (HostHealthStatus.HealthStatus.valueOf(host.getStatus())) {
case HEALTHY:
healthyStatusHosts++;
break;
case UNHEALTHY:
unhealthyStatusHosts++;
break;
case UNKNOWN:
unknownStatusHosts++;
break;
case ALERT:
alertStatusHosts++;
break;
}
boolean staleConfig = false;
boolean maintenanceState = false;
if (serviceComponentHostsByHost.containsKey(hostName)) {
for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostName)) {
staleConfig = staleConfig || configHelper.isStaleConfigs(sch);
maintenanceState = maintenanceState ||
maintenanceStateHelper.getEffectiveState(sch) != MaintenanceState.OFF;
}
}
if (staleConfig) {
staleConfigsHosts++;
}
if (maintenanceState) {
maintenanceStateHosts++;
}
}
ClusterHealthReport chr = new ClusterHealthReport();
chr.setAlertStatusHosts(alertStatusHosts);
chr.setHealthyStateHosts(healthyStateHosts);
chr.setUnknownStatusHosts(unknownStatusHosts);
chr.setUnhealthyStatusHosts(unhealthyStatusHosts);
chr.setUnhealthyStateHosts(unhealthyStateHosts);
chr.setStaleConfigsHosts(staleConfigsHosts);
chr.setMaintenanceStateHosts(maintenanceStateHosts);
chr.setInitStateHosts(initStateHosts);
chr.setHeartbeatLostStateHosts(heartbeatLostStateHosts);
chr.setHealthyStatusHosts(healthyStatusHosts);
return chr;
}
@Override
public boolean checkPermission(PrivilegeEntity privilegeEntity, boolean readOnly) {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
ResourceEntity resourceEntity = clusterEntity.getResource();
if (resourceEntity != null) {
Integer permissionId = privilegeEntity.getPermission().getId();
// CLUSTER.READ or CLUSTER.OPERATE for the given cluster resource.
if (privilegeEntity.getResource().equals(resourceEntity)) {
if ((readOnly && permissionId.equals(PermissionEntity.CLUSTER_READ_PERMISSION)) ||
permissionId.equals(PermissionEntity.CLUSTER_OPERATE_PERMISSION)) {
return true;
}
}
}
}
return false;
}
@Override
public void addSessionAttributes(Map<String, Object> attributes) {
if (attributes != null && !attributes.isEmpty()) {
Map<String, Object> sessionAttributes = new HashMap<String, Object>(getSessionAttributes());
sessionAttributes.putAll(attributes);
setSessionAttributes(attributes);
}
}
@Override
public void setSessionAttribute(String key, Object value){
if (key != null && !key.isEmpty()) {
Map<String, Object> sessionAttributes = new HashMap<String, Object>(getSessionAttributes());
sessionAttributes.put(key, value);
setSessionAttributes(sessionAttributes);
}
}
@Override
public void removeSessionAttribute(String key) {
if (key != null && !key.isEmpty()) {
Map<String, Object> sessionAttributes = new HashMap<String, Object>(getSessionAttributes());
sessionAttributes.remove(key);
setSessionAttributes(sessionAttributes);
}
}
@Override
public Map<String, Object> getSessionAttributes() {
Map<String, Object> attributes =
(Map<String, Object>) getSessionManager().getAttribute(getClusterSessionAttributeName());
return attributes == null ? Collections.<String, Object>emptyMap() : attributes;
}
/**
* Get the associated session manager.
*
* @return the session manager
*/
protected AmbariSessionManager getSessionManager() {
return sessionManager;
}
/**
* Set the map of session attributes for this cluster.
* <p/>
* This is a private method so that it may be used as a utility for add and update operations.
*
* @param sessionAttributes the map of session attributes for this cluster; never null
*/
private void setSessionAttributes(Map<String, Object> sessionAttributes) {
getSessionManager().setAttribute(getClusterSessionAttributeName(), sessionAttributes);
}
/**
* Generates and returns the cluster-specific attribute name to use to set and get cluster-specific
* session attributes.
*
* @return the name of the cluster-specific session attribute
*/
private String getClusterSessionAttributeName() {
return CLUSTER_SESSION_ATTRIBUTES_PREFIX + getClusterName();
}
/**
* {@inheritDoc}
*/
@Override
public void applyLatestConfigurations(StackId stackId) {
clusterGlobalLock.writeLock().lock();
try {
ClusterEntity clusterEntity = getClusterEntity();
Collection<ClusterConfigMappingEntity> configMappingEntities = clusterEntity.getConfigMappingEntities();
// disable all configs
for (ClusterConfigMappingEntity e : configMappingEntities) {
LOG.debug("{} with tag {} is unselected", e.getType(), e.getTag());
e.setSelected(0);
}
List<ClusterConfigMappingEntity> clusterConfigMappingsForStack = clusterDAO.getClusterConfigMappingsByStack(
clusterEntity.getClusterId(), stackId);
Collection<ClusterConfigMappingEntity> latestConfigMappingByStack = getLatestConfigMapping(
clusterConfigMappingsForStack);
// loop through all configs and set the latest to enabled for the
// specified stack
for(ClusterConfigMappingEntity e: configMappingEntities){
String type = e.getType();
String tag = e.getTag();
for (ClusterConfigMappingEntity latest : latestConfigMappingByStack) {
String t = latest.getType();
String tagLatest = latest.getTag();
if(type.equals(t) && tag.equals(tagLatest) ){//find the latest config of a given mapping entity
LOG.info("{} with version tag {} is selected for stack {}", type, tag, stackId.toString());
e.setSelected(1);
}
}
}
clusterEntity.setConfigMappingEntities(configMappingEntities);
clusterEntity = clusterDAO.merge(clusterEntity);
clusterDAO.mergeConfigMappings(configMappingEntities);
cacheConfigurations();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
public Collection<ClusterConfigMappingEntity> getLatestConfigMapping(List<ClusterConfigMappingEntity> clusterConfigMappingEntities){
Map<String, ClusterConfigMappingEntity> temp = new HashMap<String, ClusterConfigMappingEntity>();
for (ClusterConfigMappingEntity e : clusterConfigMappingEntities) {
String type = e.getType();
if(temp.containsKey(type)){
ClusterConfigMappingEntity entityStored = temp.get(type);
Long timestampStored = entityStored.getCreateTimestamp();
Long timestamp = e.getCreateTimestamp();
if(timestamp > timestampStored){
temp.put(type, e); //find a newer config for the given type
}
} else {
temp.put(type, e); //first time encounter a type, add it
}
}
return temp.values();
}
/**
* {@inheritDoc}
*/
@Override
public Map<PropertyInfo.PropertyType, Set<String>> getConfigPropertiesTypes(String configType){
try {
StackId stackId = getCurrentStackVersion();
StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion());
return stackInfo.getConfigPropertiesTypes(configType);
} catch (AmbariException e) {
}
return new HashMap<>();
}
/**
* Removes all configurations associated with the specified stack. The caller
* should make sure the cluster global write lock is acquired.
*
* @param stackId
* @see Cluster#getClusterGlobalLock()
*/
@Transactional
void removeAllConfigsForStack(StackId stackId) {
ClusterEntity clusterEntity = getClusterEntity();
// make sure the entity isn't stale in the current unit of work.
clusterDAO.refresh(clusterEntity);
long clusterId = clusterEntity.getClusterId();
// this will keep track of cluster config mappings that need removal
// since there is no relationship between configs and their mappings, we
// have to do it manually
List<ClusterConfigEntity> removedClusterConfigs = new ArrayList<ClusterConfigEntity>(50);
Collection<ClusterConfigEntity> clusterConfigEntities = clusterEntity.getClusterConfigEntities();
List<ServiceConfigEntity> serviceConfigs = serviceConfigDAO.getAllServiceConfigsForClusterAndStack(
clusterId, stackId);
// remove all service configurations and associated configs
Collection<ServiceConfigEntity> serviceConfigEntities = clusterEntity.getServiceConfigEntities();
for (ServiceConfigEntity serviceConfig : serviceConfigs) {
for (ClusterConfigEntity configEntity : serviceConfig.getClusterConfigEntities()) {
clusterConfigEntities.remove(configEntity);
clusterDAO.removeConfig(configEntity);
removedClusterConfigs.add(configEntity);
}
serviceConfig.getClusterConfigEntities().clear();
serviceConfigDAO.remove(serviceConfig);
serviceConfigEntities.remove(serviceConfig);
}
// remove any leftover cluster configurations that don't have a service
// configuration (like cluster-env)
List<ClusterConfigEntity> clusterConfigs = clusterDAO.getAllConfigurations(
clusterId, stackId);
for (ClusterConfigEntity clusterConfig : clusterConfigs) {
clusterConfigEntities.remove(clusterConfig);
clusterDAO.removeConfig(clusterConfig);
removedClusterConfigs.add(clusterConfig);
}
clusterEntity.setClusterConfigEntities(clusterConfigEntities);
clusterEntity = clusterDAO.merge(clusterEntity);
// remove config mappings
Collection<ClusterConfigMappingEntity> configMappingEntities =
clusterDAO.getClusterConfigMappingEntitiesByCluster(getClusterId());
for (ClusterConfigEntity removedClusterConfig : removedClusterConfigs) {
String removedClusterConfigType = removedClusterConfig.getType();
String removedClusterConfigTag = removedClusterConfig.getTag();
Iterator<ClusterConfigMappingEntity> clusterConfigMappingIterator = configMappingEntities.iterator();
while (clusterConfigMappingIterator.hasNext()) {
ClusterConfigMappingEntity clusterConfigMapping = clusterConfigMappingIterator.next();
String mappingType = clusterConfigMapping.getType();
String mappingTag = clusterConfigMapping.getTag();
if (removedClusterConfigTag.equals(mappingTag)
&& removedClusterConfigType.equals(mappingType)) {
clusterConfigMappingIterator.remove();
clusterDAO.removeConfigMapping(clusterConfigMapping);
}
}
}
clusterEntity.setConfigMappingEntities(configMappingEntities);
clusterEntity = clusterDAO.merge(clusterEntity);
}
/**
* {@inheritDoc}
*/
@Override
public void removeConfigurations(StackId stackId) {
clusterGlobalLock.writeLock().lock();
try {
removeAllConfigsForStack(stackId);
cacheConfigurations();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Caches all of the {@link ClusterConfigEntity}s in {@link #allConfigs}.
*/
private void cacheConfigurations() {
ClusterEntity clusterEntity = getClusterEntity();
if (clusterEntity != null) {
if (null == allConfigs) {
allConfigs = new HashMap<String, Map<String, Config>>();
}
allConfigs.clear();
if (!clusterEntity.getClusterConfigEntities().isEmpty()) {
for (ClusterConfigEntity entity : clusterEntity.getClusterConfigEntities()) {
if (!allConfigs.containsKey(entity.getType())) {
allConfigs.put(entity.getType(), new HashMap<String, Config>());
}
Config config = configFactory.createExisting(this, entity);
allConfigs.get(entity.getType()).put(entity.getTag(), config);
}
}
}
}
private void loadConfigurations() {
if (allConfigs != null) {
return;
}
clusterGlobalLock.writeLock().lock();
try {
if (allConfigs != null) {
return;
}
cacheConfigurations();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
private void loadStackVersion() {
if (desiredStackVersionSet) {
return;
}
clusterGlobalLock.writeLock().lock();
try {
if (desiredStackVersionSet) {
return;
}
desiredStackVersion = new StackId(getClusterEntity().getDesiredStack());
if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) && !
StringUtils.isEmpty(desiredStackVersion.getStackVersion())) {
try {
loadServiceConfigTypes();
} catch (AmbariException e) {
//TODO recheck wrapping exception here, required for lazy loading after invalidation
throw new RuntimeException(e);
}
}
desiredStackVersionSet = true;
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Purpose of this method is to clear all cached data to re-read it from database.
* To be used in case of desync.
*/
@Override
public void invalidateData() {
clusterGlobalLock.writeLock().lock();
try {
allConfigs = null;
services = null;
desiredStackVersionSet = false;
serviceComponentHosts.clear();
serviceComponentHostsByHost.clear();
svcHostsLoaded = false;
clusterConfigGroups = null;
//TODO investigate reset request executions, it has separate api which is not too heavy
refresh();
} finally {
clusterGlobalLock.writeLock().unlock();
}
}
/**
* Returns whether this cluster was provisioned by a Blueprint or not.
* @return true if the cluster was deployed with a Blueprint otherwise false.
*/
@Override
public boolean isBluePrintDeployed() {
List<TopologyRequestEntity> topologyRequests = topologyRequestDAO.findByClusterId(getClusterId());
// Iterate through the topology requests associated with this cluster and look for PROVISION request
for (TopologyRequestEntity topologyRequest: topologyRequests) {
TopologyRequest.Type requestAction = TopologyRequest.Type.valueOf(topologyRequest.getAction());
if (requestAction == TopologyRequest.Type.PROVISION) {
return true;
}
}
return false;
}
private ClusterEntity getClusterEntity() {
return clusterDAO.findById(clusterId);
}
/**
* {@inheritDoc}
*/
@Override
public boolean isUpgradeSuspended() {
UpgradeEntity lastUpgradeItemForCluster = upgradeDAO.findLastUpgradeForCluster(clusterId);
if (null != lastUpgradeItemForCluster) {
return lastUpgradeItemForCluster.isSuspended();
}
return false;
}
}