| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ambari.server.state.cluster; |
| |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.locks.ReadWriteLock; |
| |
| import javax.annotation.Nullable; |
| import javax.persistence.EntityManager; |
| import javax.persistence.RollbackException; |
| |
| import org.apache.ambari.annotations.Experimental; |
| import org.apache.ambari.annotations.ExperimentalFeature; |
| import org.apache.ambari.server.AmbariException; |
| import org.apache.ambari.server.ConfigGroupNotFoundException; |
| import org.apache.ambari.server.ObjectNotFoundException; |
| import org.apache.ambari.server.ParentObjectNotFoundException; |
| import org.apache.ambari.server.RoleCommand; |
| import org.apache.ambari.server.ServiceComponentHostNotFoundException; |
| import org.apache.ambari.server.ServiceComponentNotFoundException; |
| import org.apache.ambari.server.ServiceNotFoundException; |
| import org.apache.ambari.server.agent.ExecutionCommand.KeyNames; |
| import org.apache.ambari.server.api.services.AmbariMetaInfo; |
| import org.apache.ambari.server.controller.AmbariSessionManager; |
| import org.apache.ambari.server.controller.ClusterResponse; |
| import org.apache.ambari.server.controller.ConfigurationResponse; |
| import org.apache.ambari.server.controller.MaintenanceStateHelper; |
| import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; |
| import org.apache.ambari.server.controller.ServiceConfigVersionResponse; |
| import org.apache.ambari.server.events.AmbariEvent.AmbariEventType; |
| import org.apache.ambari.server.events.ClusterConfigChangedEvent; |
| import org.apache.ambari.server.events.ClusterEvent; |
| import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent; |
| import org.apache.ambari.server.events.jpa.JPAEvent; |
| import org.apache.ambari.server.events.publishers.AmbariEventPublisher; |
| import org.apache.ambari.server.events.publishers.JPAEventPublisher; |
| import org.apache.ambari.server.logging.LockFactory; |
| import org.apache.ambari.server.metadata.RoleCommandOrder; |
| import org.apache.ambari.server.metadata.RoleCommandOrderProvider; |
| import org.apache.ambari.server.orm.RequiresSession; |
| import org.apache.ambari.server.orm.cache.HostConfigMapping; |
| import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; |
| import org.apache.ambari.server.orm.dao.AlertDispatchDAO; |
| import org.apache.ambari.server.orm.dao.ClusterDAO; |
| import org.apache.ambari.server.orm.dao.ClusterStateDAO; |
| import org.apache.ambari.server.orm.dao.HostConfigMappingDAO; |
| import org.apache.ambari.server.orm.dao.HostDAO; |
| import org.apache.ambari.server.orm.dao.HostVersionDAO; |
| import org.apache.ambari.server.orm.dao.ServiceConfigDAO; |
| import org.apache.ambari.server.orm.dao.StackDAO; |
| import org.apache.ambari.server.orm.dao.TopologyRequestDAO; |
| import org.apache.ambari.server.orm.dao.UpgradeDAO; |
| import org.apache.ambari.server.orm.entities.ClusterConfigEntity; |
| import org.apache.ambari.server.orm.entities.ClusterEntity; |
| import org.apache.ambari.server.orm.entities.ClusterServiceEntity; |
| import org.apache.ambari.server.orm.entities.ClusterStateEntity; |
| import org.apache.ambari.server.orm.entities.ConfigGroupEntity; |
| import org.apache.ambari.server.orm.entities.HostEntity; |
| import org.apache.ambari.server.orm.entities.HostVersionEntity; |
| import org.apache.ambari.server.orm.entities.PermissionEntity; |
| import org.apache.ambari.server.orm.entities.PrivilegeEntity; |
| import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; |
| import org.apache.ambari.server.orm.entities.RequestScheduleEntity; |
| import org.apache.ambari.server.orm.entities.ResourceEntity; |
| import org.apache.ambari.server.orm.entities.ServiceConfigEntity; |
| import org.apache.ambari.server.orm.entities.StackEntity; |
| import org.apache.ambari.server.orm.entities.TopologyRequestEntity; |
| import org.apache.ambari.server.orm.entities.UpgradeEntity; |
| import org.apache.ambari.server.security.authorization.AuthorizationException; |
| import org.apache.ambari.server.state.Cluster; |
| import org.apache.ambari.server.state.ClusterHealthReport; |
| import org.apache.ambari.server.state.Clusters; |
| import org.apache.ambari.server.state.Config; |
| import org.apache.ambari.server.state.ConfigFactory; |
| import org.apache.ambari.server.state.ConfigHelper; |
| import org.apache.ambari.server.state.DesiredConfig; |
| import org.apache.ambari.server.state.Host; |
| import org.apache.ambari.server.state.HostHealthStatus; |
| import org.apache.ambari.server.state.MaintenanceState; |
| import org.apache.ambari.server.state.PropertyInfo; |
| import org.apache.ambari.server.state.RepositoryType; |
| import org.apache.ambari.server.state.RepositoryVersionState; |
| import org.apache.ambari.server.state.SecurityType; |
| import org.apache.ambari.server.state.Service; |
| import org.apache.ambari.server.state.ServiceComponent; |
| import org.apache.ambari.server.state.ServiceComponentHost; |
| import org.apache.ambari.server.state.ServiceComponentHostEvent; |
| import org.apache.ambari.server.state.ServiceComponentHostEventType; |
| import org.apache.ambari.server.state.ServiceFactory; |
| import org.apache.ambari.server.state.ServiceInfo; |
| import org.apache.ambari.server.state.StackId; |
| import org.apache.ambari.server.state.StackInfo; |
| import org.apache.ambari.server.state.State; |
| import org.apache.ambari.server.state.UpgradeContext; |
| import org.apache.ambari.server.state.UpgradeContextFactory; |
| import org.apache.ambari.server.state.configgroup.ConfigGroup; |
| import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; |
| import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; |
| import org.apache.ambari.server.state.repository.ClusterVersionSummary; |
| import org.apache.ambari.server.state.repository.VersionDefinitionXml; |
| import org.apache.ambari.server.state.scheduler.RequestExecution; |
| import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; |
| import org.apache.ambari.server.topology.TopologyRequest; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Functions; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.ListMultimap; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.eventbus.Subscribe; |
| import com.google.inject.Inject; |
| import com.google.inject.Injector; |
| import com.google.inject.assistedinject.Assisted; |
| import com.google.inject.persist.Transactional; |
| |
| public class ClusterImpl implements Cluster { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ClusterImpl.class); |
| private static final Logger configChangeLog = LoggerFactory.getLogger("configchange"); |
| |
| /** |
| * Prefix for cluster session attributes name. |
| */ |
| private static final String CLUSTER_SESSION_ATTRIBUTES_PREFIX = "cluster_session_attributes:"; |
| |
| @Inject |
| private Clusters clusters; |
| |
| private StackId desiredStackVersion; |
| |
| private final ConcurrentSkipListMap<String, Service> services = new ConcurrentSkipListMap<>(); |
| |
| /** |
| * [ Config Type -> [ Config Version Tag -> Config ] ] |
| */ |
| private final ConcurrentMap<String, ConcurrentMap<String, Config>> allConfigs = new ConcurrentHashMap<>(); |
| |
| /** |
| * [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ] |
| */ |
| private final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ServiceComponentHost>>> serviceComponentHosts = new ConcurrentHashMap<>(); |
| |
| /** |
| * [ HostName -> [ ... ] ] |
| */ |
| private final ConcurrentMap<String, List<ServiceComponentHost>> serviceComponentHostsByHost = new ConcurrentHashMap<>(); |
| |
| /** |
| * Map of existing config groups |
| */ |
| private final Map<Long, ConfigGroup> clusterConfigGroups = new ConcurrentHashMap<>(); |
| |
| /** |
| * Map of Request schedules for this cluster |
| */ |
| private final Map<Long, RequestExecution> requestExecutions = new ConcurrentHashMap<>(); |
| |
| private final ReadWriteLock clusterGlobalLock; |
| |
| /** |
| * The unique ID of the {@link @ClusterEntity}. |
| */ |
| private final long clusterId; |
| |
| private String clusterName; |
| |
| @Inject |
| private ClusterDAO clusterDAO; |
| |
| @Inject |
| private ClusterStateDAO clusterStateDAO; |
| |
| @Inject |
| private HostDAO hostDAO; |
| |
| @Inject |
| private HostVersionDAO hostVersionDAO; |
| |
| @Inject |
| private ServiceFactory serviceFactory; |
| |
| @Inject |
| private ConfigFactory configFactory; |
| |
| @Inject |
| private LockFactory lockFactory; |
| |
| @Inject |
| private HostConfigMappingDAO hostConfigMappingDAO; |
| |
| @Inject |
| private ConfigGroupFactory configGroupFactory; |
| |
| @Inject |
| private RequestExecutionFactory requestExecutionFactory; |
| |
| @Inject |
| private ConfigHelper configHelper; |
| |
| @Inject |
| private MaintenanceStateHelper maintenanceStateHelper; |
| |
| @Inject |
| private AmbariMetaInfo ambariMetaInfo; |
| |
| @Inject |
| private ServiceConfigDAO serviceConfigDAO; |
| |
| @Inject |
| private AlertDefinitionDAO alertDefinitionDAO; |
| |
| @Inject |
| private AlertDispatchDAO alertDispatchDAO; |
| |
| @Inject |
| private UpgradeDAO upgradeDAO; |
| |
| @Inject |
| private AmbariSessionManager sessionManager; |
| |
| @Inject |
| private TopologyRequestDAO topologyRequestDAO; |
| |
| /** |
| * Data access object used for looking up stacks from the database. |
| */ |
| @Inject |
| private StackDAO stackDAO; |
| |
| private volatile Multimap<String, String> serviceConfigTypes; |
| |
| /** |
| * Used to publish events relating to cluster CRUD operations and to receive |
| * information about cluster operations. |
| */ |
| private AmbariEventPublisher eventPublisher; |
| |
| /** |
| * Used for broadcasting {@link JPAEvent}s. |
| */ |
| @Inject |
| private JPAEventPublisher jpaEventPublisher; |
| |
| /** |
| * Used for getting instances of {@link RoleCommand} for this cluster. |
| */ |
| @Inject |
| private RoleCommandOrderProvider roleCommandOrderProvider; |
| |
| /** |
| * Used to create instances of {@link UpgradeContext} with injected |
| * dependencies. The {@link UpgradeContext} is used to populate the command |
| * with upgrade information on the command/role maps if the upgrade is |
| * suspended. |
| */ |
| @Inject |
| private UpgradeContextFactory upgradeContextFactory; |
| |
| /** |
| * A simple cache for looking up {@code cluster-env} properties for a cluster. |
| * This map is changed whenever {{cluster-env}} is changed and we receive a |
| * {@link ClusterConfigChangedEvent}. |
| */ |
| private Map<String, String> m_clusterPropertyCache = new ConcurrentHashMap<>(); |
| |
| @Inject |
| public ClusterImpl(@Assisted ClusterEntity clusterEntity, Injector injector, |
| AmbariEventPublisher eventPublisher) |
| throws AmbariException { |
| |
| clusterId = clusterEntity.getClusterId(); |
| clusterName = clusterEntity.getClusterName(); |
| |
| injector.injectMembers(this); |
| |
| clusterGlobalLock = lockFactory.newReadWriteLock("clusterGlobalLock"); |
| |
| loadStackVersion(); |
| loadServices(); |
| loadServiceHostComponents(); |
| |
| // cache configurations before loading configuration groups |
| cacheConfigurations(); |
| loadConfigGroups(); |
| |
| loadRequestExecutions(); |
| |
| if (desiredStackVersion != null && !StringUtils.isEmpty(desiredStackVersion.getStackName()) && ! |
| StringUtils.isEmpty(desiredStackVersion.getStackVersion())) { |
| loadServiceConfigTypes(); |
| } |
| |
| // register to receive stuff |
| eventPublisher.register(this); |
| this.eventPublisher = eventPublisher; |
| } |
| |
| private void loadServiceConfigTypes() throws AmbariException { |
| try { |
| serviceConfigTypes = collectServiceConfigTypesMapping(); |
| } catch (AmbariException e) { |
| LOG.error("Cannot load stack info:", e); |
| throw e; |
| } |
| LOG.info("Service config types loaded: {}", serviceConfigTypes); |
| } |
| |
| /** |
| * Construct config type to service name mapping |
| * @throws AmbariException when stack or its part not found |
| */ |
| private Multimap<String, String> collectServiceConfigTypesMapping() throws AmbariException { |
| Multimap<String, String> serviceConfigTypes = HashMultimap.create(); |
| |
| Map<String, ServiceInfo> serviceInfoMap = null; |
| try { |
| serviceInfoMap = ambariMetaInfo.getServices(desiredStackVersion.getStackName(), desiredStackVersion.getStackVersion()); |
| } catch (ParentObjectNotFoundException e) { |
| LOG.error("Service config versioning disabled due to exception: ", e); |
| return serviceConfigTypes; |
| } |
| for (Entry<String, ServiceInfo> entry : serviceInfoMap.entrySet()) { |
| String serviceName = entry.getKey(); |
| ServiceInfo serviceInfo = entry.getValue(); |
| Set<String> configTypes = serviceInfo.getConfigTypeAttributes().keySet(); |
| for (String configType : configTypes) { |
| serviceConfigTypes.put(serviceName, configType); |
| } |
| } |
| |
| return serviceConfigTypes; |
| } |
| |
| /** |
| * Make sure we load all the service host components. |
| * We need this for live status checks. |
| */ |
| private void loadServiceHostComponents() { |
| for (Entry<String, Service> serviceKV : services.entrySet()) { |
| /* get all the service component hosts **/ |
| Service service = serviceKV.getValue(); |
| if (!serviceComponentHosts.containsKey(service.getName())) { |
| serviceComponentHosts.put(service.getName(), |
| new ConcurrentHashMap<String, ConcurrentMap<String, ServiceComponentHost>>()); |
| } |
| |
| for (Entry<String, ServiceComponent> svcComponent : service.getServiceComponents().entrySet()) { |
| ServiceComponent comp = svcComponent.getValue(); |
| String componentName = svcComponent.getKey(); |
| if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) { |
| serviceComponentHosts.get(service.getName()).put(componentName, |
| new ConcurrentHashMap<String, ServiceComponentHost>()); |
| } |
| |
| // Get Service Host Components |
| for (Entry<String, ServiceComponentHost> svchost : comp.getServiceComponentHosts().entrySet()) { |
| String hostname = svchost.getKey(); |
| ServiceComponentHost svcHostComponent = svchost.getValue(); |
| if (!serviceComponentHostsByHost.containsKey(hostname)) { |
| serviceComponentHostsByHost.put(hostname, |
| new CopyOnWriteArrayList<ServiceComponentHost>()); |
| } |
| |
| List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname); |
| compList.add(svcHostComponent); |
| |
| if (!serviceComponentHosts.get(service.getName()).get(componentName).containsKey( |
| hostname)) { |
| serviceComponentHosts.get(service.getName()).get(componentName).put(hostname, |
| svcHostComponent); |
| } |
| } |
| } |
| } |
| } |
| |
| private void loadServices() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| if (CollectionUtils.isEmpty(clusterEntity.getClusterServiceEntities())) { |
| return; |
| } |
| |
| for (ClusterServiceEntity serviceEntity : clusterEntity.getClusterServiceEntities()) { |
| StackId stackId = getCurrentStackVersion(); |
| try { |
| if (ambariMetaInfo.getService(stackId.getStackName(), |
| stackId.getStackVersion(), serviceEntity.getServiceName()) != null) { |
| services.put(serviceEntity.getServiceName(), |
| serviceFactory.createExisting(this, serviceEntity)); |
| } |
| } catch (AmbariException e) { |
| LOG.error(String.format( |
| "Can not get service info: stackName=%s, stackVersion=%s, serviceName=%s", |
| stackId.getStackName(), stackId.getStackVersion(), |
| serviceEntity.getServiceName())); |
| } |
| } |
| } |
| |
| private void loadConfigGroups() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| if (!clusterEntity.getConfigGroupEntities().isEmpty()) { |
| for (ConfigGroupEntity configGroupEntity : clusterEntity.getConfigGroupEntities()) { |
| clusterConfigGroups.put(configGroupEntity.getGroupId(), |
| configGroupFactory.createExisting(this, configGroupEntity)); |
| } |
| } |
| } |
| |
| private void loadRequestExecutions() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| if (!clusterEntity.getRequestScheduleEntities().isEmpty()) { |
| for (RequestScheduleEntity scheduleEntity : clusterEntity.getRequestScheduleEntities()) { |
| requestExecutions.put(scheduleEntity.getScheduleId(), |
| requestExecutionFactory.createExisting(this, scheduleEntity)); |
| } |
| } |
| } |
| |
| @Override |
| public void addConfigGroup(ConfigGroup configGroup) throws AmbariException { |
| String hostList = ""; |
| if(LOG.isDebugEnabled()) { |
| if (configGroup.getHosts() != null) { |
| for (Host host : configGroup.getHosts().values()) { |
| hostList += host.getHostName() + ", "; |
| } |
| } |
| } |
| |
| LOG.debug("Adding a new Config group, clusterName = {}, groupName = {}, tag = {} with hosts {}", |
| getClusterName(), configGroup.getName(), configGroup.getTag(), hostList); |
| |
| if (clusterConfigGroups.containsKey(configGroup.getId())) { |
| // The loadConfigGroups will load all groups to memory |
| LOG.debug("Config group already exists, clusterName = {}, groupName = {}, groupId = {}, tag = {}", |
| getClusterName(), configGroup.getName(), configGroup.getId(), configGroup.getTag()); |
| } else { |
| clusterConfigGroups.put(configGroup.getId(), configGroup); |
| } |
| } |
| |
| @Override |
| public Map<Long, ConfigGroup> getConfigGroups() { |
| return Collections.unmodifiableMap(clusterConfigGroups); |
| } |
| |
| @Override |
| public Map<Long, ConfigGroup> getConfigGroupsByHostname(String hostname) |
| throws AmbariException { |
| Map<Long, ConfigGroup> configGroups = new HashMap<>(); |
| |
| for (Entry<Long, ConfigGroup> groupEntry : clusterConfigGroups.entrySet()) { |
| Long id = groupEntry.getKey(); |
| ConfigGroup group = groupEntry.getValue(); |
| for (Host host : group.getHosts().values()) { |
| if (StringUtils.equals(hostname, host.getHostName())) { |
| configGroups.put(id, group); |
| break; |
| } |
| } |
| } |
| return configGroups; |
| } |
| |
| @Override |
| public void addRequestExecution(RequestExecution requestExecution) throws AmbariException { |
| LOG.info("Adding a new request schedule" + ", clusterName = " + getClusterName() + ", id = " |
| + requestExecution.getId() + ", description = " + requestExecution.getDescription()); |
| |
| if (requestExecutions.containsKey(requestExecution.getId())) { |
| LOG.debug("Request schedule already exists, clusterName = {}, id = {}, description = {}", |
| getClusterName(), requestExecution.getId(), requestExecution.getDescription()); |
| } else { |
| requestExecutions.put(requestExecution.getId(), requestExecution); |
| } |
| } |
| |
| @Override |
| public Map<Long, RequestExecution> getAllRequestExecutions() { |
| return Collections.unmodifiableMap(requestExecutions); |
| } |
| |
| @Override |
| public void deleteRequestExecution(Long id) throws AmbariException { |
| RequestExecution requestExecution = requestExecutions.get(id); |
| if (requestExecution == null) { |
| throw new AmbariException("Request schedule does not exists, " + "id = " + id); |
| } |
| LOG.info("Deleting request schedule" + ", clusterName = " + getClusterName() + ", id = " |
| + requestExecution.getId() + ", description = " + requestExecution.getDescription()); |
| |
| requestExecution.delete(); |
| requestExecutions.remove(id); |
| } |
| |
| @Override |
| public void deleteConfigGroup(Long id) throws AmbariException, AuthorizationException { |
| ConfigGroup configGroup = clusterConfigGroups.get(id); |
| if (configGroup == null) { |
| throw new ConfigGroupNotFoundException(getClusterName(), id.toString()); |
| } |
| |
| LOG.debug("Deleting Config group, clusterName = {}, groupName = {}, groupId = {}, tag = {}", |
| getClusterName(), configGroup.getName(), configGroup.getId(), configGroup.getTag()); |
| |
| configGroup.delete(); |
| clusterConfigGroups.remove(id); |
| } |
| |
| public ServiceComponentHost getServiceComponentHost(String serviceName, |
| String serviceComponentName, String hostname) throws AmbariException { |
| if (!serviceComponentHosts.containsKey(serviceName) |
| || !serviceComponentHosts.get(serviceName).containsKey( |
| serviceComponentName) |
| || !serviceComponentHosts.get(serviceName).get(serviceComponentName).containsKey( |
| hostname)) { |
| throw new ServiceComponentHostNotFoundException(getClusterName(), |
| serviceName, serviceComponentName, hostname); |
| } |
| return serviceComponentHosts.get(serviceName).get(serviceComponentName).get(hostname); |
| } |
| |
| @Override |
| public List<ServiceComponentHost> getServiceComponentHosts() { |
| List<ServiceComponentHost> serviceComponentHosts = new ArrayList<>(); |
| if (!serviceComponentHostsByHost.isEmpty()) { |
| for (List<ServiceComponentHost> schList : serviceComponentHostsByHost.values()) { |
| serviceComponentHosts.addAll(schList); |
| } |
| } |
| return Collections.unmodifiableList(serviceComponentHosts); |
| } |
| |
| @Override |
| public String getClusterName() { |
| return clusterName; |
| } |
| |
| @Override |
| public void setClusterName(String clusterName) { |
| String oldName = null; |
| ClusterEntity clusterEntity = getClusterEntity(); |
| oldName = clusterEntity.getClusterName(); |
| clusterEntity.setClusterName(clusterName); |
| |
| // RollbackException possibility if UNIQUE constraint violated |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| clusters.updateClusterName(oldName, clusterName); |
| this.clusterName = clusterName; |
| |
| // if the name changed, fire an event |
| if (!StringUtils.equals(oldName, clusterName)) { |
| ClusterEvent clusterNameChangedEvent = new ClusterEvent(AmbariEventType.CLUSTER_RENAME, clusterId); |
| eventPublisher.publish(clusterNameChangedEvent); |
| } |
| } |
| |
| @Override |
| public Long getResourceId() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| |
| ResourceEntity resourceEntity = clusterEntity.getResource(); |
| if (resourceEntity == null) { |
| LOG.warn( |
| "There is no resource associated with this cluster:\n\tCluster Name: {}\n\tCluster ID: {}", |
| getClusterName(), getClusterId()); |
| return null; |
| } else { |
| return resourceEntity.getId(); |
| } |
| } |
| |
| @Override |
| @Transactional |
| public void addServiceComponentHosts(Collection<ServiceComponentHost> serviceComponentHosts) throws AmbariException { |
| for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) { |
| Service service = getService(serviceComponentHost.getServiceName()); |
| ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentHost.getServiceComponentName()); |
| serviceComponent.addServiceComponentHost(serviceComponentHost); |
| } |
| } |
| |
| public void addServiceComponentHost(ServiceComponentHost svcCompHost) |
| throws AmbariException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Trying to add component {} of service {} on {} to the cache", |
| svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(), |
| svcCompHost.getHostName()); |
| } |
| |
| final String hostname = svcCompHost.getHostName(); |
| final String serviceName = svcCompHost.getServiceName(); |
| final String componentName = svcCompHost.getServiceComponentName(); |
| |
| Set<Cluster> cs = clusters.getClustersForHost(hostname); |
| |
| boolean clusterFound = false; |
| Iterator<Cluster> iter = cs.iterator(); |
| while (iter.hasNext()) { |
| Cluster c = iter.next(); |
| if (c.getClusterId() == getClusterId()) { |
| clusterFound = true; |
| break; |
| } |
| } |
| |
| if (!clusterFound) { |
| throw new AmbariException("Host does not belong this cluster" |
| + ", hostname=" + hostname + ", clusterName=" + getClusterName() |
| + ", clusterId=" + getClusterId()); |
| } |
| |
| if (!serviceComponentHosts.containsKey(serviceName)) { |
| serviceComponentHosts.put(serviceName, |
| new ConcurrentHashMap<String, ConcurrentMap<String, ServiceComponentHost>>()); |
| } |
| |
| if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) { |
| serviceComponentHosts.get(serviceName).put(componentName, |
| new ConcurrentHashMap<String, ServiceComponentHost>()); |
| } |
| |
| if (serviceComponentHosts.get(serviceName).get(componentName).containsKey( |
| hostname)) { |
| throw new AmbariException("Duplicate entry for ServiceComponentHost" |
| + ", serviceName=" + serviceName + ", serviceComponentName" |
| + componentName + ", hostname= " + hostname); |
| } |
| |
| if (!serviceComponentHostsByHost.containsKey(hostname)) { |
| serviceComponentHostsByHost.put(hostname, |
| new CopyOnWriteArrayList<ServiceComponentHost>()); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding a new ServiceComponentHost, clusterName={}, clusterId={}, serviceName={}, serviceComponentName{}, hostname= {}", |
| getClusterName(), getClusterId(), serviceName, componentName, hostname); |
| } |
| |
| serviceComponentHosts.get(serviceName).get(componentName).put(hostname, |
| svcCompHost); |
| |
| serviceComponentHostsByHost.get(hostname).add(svcCompHost); |
| } |
| |
| @Override |
| public void removeServiceComponentHost(ServiceComponentHost svcCompHost) |
| throws AmbariException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "Trying to remove component {} of service {} on {} from the cache", |
| svcCompHost.getServiceComponentName(), svcCompHost.getServiceName(), |
| svcCompHost.getHostName()); |
| } |
| |
| final String hostname = svcCompHost.getHostName(); |
| final String serviceName = svcCompHost.getServiceName(); |
| final String componentName = svcCompHost.getServiceComponentName(); |
| Set<Cluster> cs = clusters.getClustersForHost(hostname); |
| |
| boolean clusterFound = false; |
| Iterator<Cluster> iter = cs.iterator(); |
| while (iter.hasNext()) { |
| Cluster c = iter.next(); |
| if (c.getClusterId() == getClusterId()) { |
| clusterFound = true; |
| break; |
| } |
| } |
| |
| if (!clusterFound) { |
| throw new AmbariException("Host does not belong this cluster" |
| + ", hostname=" + hostname + ", clusterName=" + getClusterName() |
| + ", clusterId=" + getClusterId()); |
| } |
| |
| if (!serviceComponentHosts.containsKey(serviceName) |
| || !serviceComponentHosts.get(serviceName).containsKey(componentName) |
| || !serviceComponentHosts.get(serviceName).get(componentName).containsKey( |
| hostname)) { |
| throw new AmbariException("Invalid entry for ServiceComponentHost" |
| + ", serviceName=" + serviceName + ", serviceComponentName" |
| + componentName + ", hostname= " + hostname); |
| } |
| |
| if (!serviceComponentHostsByHost.containsKey(hostname)) { |
| throw new AmbariException("Invalid host entry for ServiceComponentHost" |
| + ", serviceName=" + serviceName + ", serviceComponentName" |
| + componentName + ", hostname= " + hostname); |
| } |
| |
| ServiceComponentHost schToRemove = null; |
| for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) { |
| if (sch.getServiceName().equals(serviceName) |
| && sch.getServiceComponentName().equals(componentName) |
| && sch.getHostName().equals(hostname)) { |
| schToRemove = sch; |
| break; |
| } |
| } |
| |
| if (schToRemove == null) { |
| LOG.warn("Unavailable in per host cache. ServiceComponentHost" |
| + ", serviceName=" + serviceName |
| + ", serviceComponentName" + componentName |
| + ", hostname= " + hostname); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removing a ServiceComponentHost, clusterName={}, clusterId={}, serviceName={}, serviceComponentName{}, hostname= {}", |
| getClusterName(), getClusterId(), serviceName, componentName, hostname); |
| } |
| |
| serviceComponentHosts.get(serviceName).get(componentName).remove(hostname); |
| if (schToRemove != null) { |
| serviceComponentHostsByHost.get(hostname).remove(schToRemove); |
| } |
| } |
| |
| @Override |
| public long getClusterId() { |
| // Add cluster creates the managed entity before creating the Cluster |
| // instance so id would not be null. |
| return clusterId; |
| } |
| |
| @Override |
| public List<ServiceComponentHost> getServiceComponentHosts(String hostname) { |
| List<ServiceComponentHost> serviceComponentHosts = serviceComponentHostsByHost.get(hostname); |
| if (null != serviceComponentHosts) { |
| return new CopyOnWriteArrayList<>(serviceComponentHosts); |
| } |
| |
| return new ArrayList<>(); |
| } |
| |
| @Override |
| public Map<String, Set<String>> getServiceComponentHostMap(Set<String> hostNames, Set<String> serviceNames) { |
| Map<String, Set<String>> componentHostMap = new HashMap<>(); |
| |
| Collection<Host> hosts = getHosts(); |
| |
| if(hosts != null) { |
| for (Host host : hosts) { |
| String hostname = host.getHostName(); |
| |
| // If this host is not filtered out, continue processing |
| if ((hostNames == null) || hostNames.contains(hostname)) { |
| List<ServiceComponentHost> serviceComponentHosts = getServiceComponentHosts(hostname); |
| |
| if (serviceComponentHosts != null) { |
| for (ServiceComponentHost sch : serviceComponentHosts) { |
| // If the service for this ServiceComponentHost is not filtered out, continue processing |
| if ((serviceNames == null) || serviceNames.contains(sch.getServiceName())) { |
| String component = sch.getServiceComponentName(); |
| Set<String> componentHosts = componentHostMap.get(component); |
| |
| if (componentHosts == null) { |
| componentHosts = new HashSet<>(); |
| componentHostMap.put(component, componentHosts); |
| } |
| |
| componentHosts.add(hostname); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| return componentHostMap; |
| } |
| |
| @Override |
| public List<ServiceComponentHost> getServiceComponentHosts(String serviceName, String componentName) { |
| ArrayList<ServiceComponentHost> foundItems = new ArrayList<>(); |
| |
| ConcurrentMap<String, ConcurrentMap<String, ServiceComponentHost>> foundByService = serviceComponentHosts.get( |
| serviceName); |
| if (foundByService != null) { |
| if (componentName == null) { |
| for (Map<String, ServiceComponentHost> foundByComponent : foundByService.values()) { |
| foundItems.addAll(foundByComponent.values()); |
| } |
| } else if (foundByService.containsKey(componentName)) { |
| foundItems.addAll(foundByService.get(componentName).values()); |
| } |
| } |
| |
| return foundItems; |
| } |
| |
| @Override |
| public void addService(Service service) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding a new Service, clusterName={}, clusterId={}, serviceName={}", getClusterName(), getClusterId(), service.getName()); |
| } |
| services.put(service.getName(), service); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Service addService(String serviceName, RepositoryVersionEntity repositoryVersion) throws AmbariException { |
| if (services.containsKey(serviceName)) { |
| String message = MessageFormat.format("The {0} service already exists in {1}", serviceName, |
| getClusterName()); |
| |
| throw new AmbariException(message); |
| } |
| |
| @Experimental(feature = ExperimentalFeature.PATCH_UPGRADES) |
| Service service = serviceFactory.createNew(this, serviceName, repositoryVersion); |
| addService(service); |
| |
| return service; |
| } |
| |
| @Override |
| public Service getService(String serviceName) throws AmbariException { |
| Service service = services.get(serviceName); |
| if (null == service) { |
| throw new ServiceNotFoundException(getClusterName(), serviceName); |
| } |
| |
| return service; |
| } |
| |
| @Override |
| public Map<String, Service> getServices() { |
| return new HashMap<>(services); |
| } |
| |
| @Override |
| public Service getServiceByComponentName(String componentName) throws AmbariException { |
| for (Service service : services.values()) { |
| for (ServiceComponent component : service.getServiceComponents().values()) { |
| if (component.getName().equals(componentName)) { |
| return service; |
| } |
| } |
| } |
| |
| throw new ServiceNotFoundException(getClusterName(), "component: " + componentName); |
| } |
| |
| |
| @Override |
| public StackId getDesiredStackVersion() { |
| return desiredStackVersion; |
| } |
| |
| @Override |
| public void setDesiredStackVersion(StackId stackId) throws AmbariException { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Changing DesiredStackVersion of Cluster, clusterName={}, clusterId={}, currentDesiredStackVersion={}, newDesiredStackVersion={}", |
| getClusterName(), getClusterId(), desiredStackVersion, stackId); |
| } |
| |
| desiredStackVersion = stackId; |
| StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion()); |
| |
| ClusterEntity clusterEntity = getClusterEntity(); |
| |
| clusterEntity.setDesiredStack(stackEntity); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| |
| loadServiceConfigTypes(); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public StackId getCurrentStackVersion() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| |
| ClusterStateEntity clusterStateEntity = clusterEntity.getClusterStateEntity(); |
| if (clusterStateEntity != null) { |
| StackEntity currentStackEntity = clusterStateEntity.getCurrentStack(); |
| return new StackId(currentStackEntity); |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public State getProvisioningState() { |
| State provisioningState = null; |
| ClusterEntity clusterEntity = getClusterEntity(); |
| provisioningState = clusterEntity.getProvisioningState(); |
| |
| if (null == provisioningState) { |
| provisioningState = State.INIT; |
| } |
| |
| return provisioningState; |
| } |
| |
| @Override |
| public void setProvisioningState(State provisioningState) { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| clusterEntity.setProvisioningState(provisioningState); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| } |
| |
| @Override |
| public SecurityType getSecurityType() { |
| SecurityType securityType = null; |
| ClusterEntity clusterEntity = getClusterEntity(); |
| securityType = clusterEntity.getSecurityType(); |
| |
| if (null == securityType) { |
| securityType = SecurityType.NONE; |
| } |
| |
| return securityType; |
| } |
| |
| @Override |
| public void setSecurityType(SecurityType securityType) { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| clusterEntity.setSecurityType(securityType); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @Transactional |
| public List<Host> transitionHostsToInstalling(RepositoryVersionEntity repoVersionEntity, |
| VersionDefinitionXml versionDefinitionXml, boolean forceInstalled) throws AmbariException { |
| |
| |
| // the hosts to return so that INSTALL commands can be generated for them |
| final List<Host> hostsRequiringInstallation; |
| |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| |
| // get this once for easy lookup later |
| Map<String, Host> hosts = clusters.getHostsForCluster(getClusterName()); |
| hostsRequiringInstallation = new ArrayList<>(hosts.size()); |
| |
| // for every host, either create or update the host version to the right |
| // state - starting with STATE |
| Collection<HostEntity> hostEntities = getClusterEntity().getHostEntities(); |
| |
| for (HostEntity hostEntity : hostEntities) { |
| |
| // start with INSTALLING |
| RepositoryVersionState state = RepositoryVersionState.INSTALLING; |
| if (forceInstalled) { |
| state = RepositoryVersionState.INSTALLED; |
| } |
| |
| // is this host version not required b/c of versionable components |
| Host host = hosts.get(hostEntity.getHostName()); |
| if (!host.hasComponentsAdvertisingVersions(desiredStackVersion)) { |
| state = RepositoryVersionState.NOT_REQUIRED; |
| } |
| |
| // if the repository is still required, check against the repo type |
| if (state != RepositoryVersionState.NOT_REQUIRED) { |
| if (repoVersionEntity.getType() != RepositoryType.STANDARD) { |
| // does the host gets a different repo state based on VDF and repo |
| // type |
| boolean hostRequiresRepository = false; |
| ClusterVersionSummary clusterSummary = versionDefinitionXml.getClusterSummary(this); |
| Set<String> servicesInUpgrade = clusterSummary.getAvailableServiceNames(); |
| |
| List<ServiceComponentHost> schs = getServiceComponentHosts(hostEntity.getHostName()); |
| for (ServiceComponentHost serviceComponentHost : schs) { |
| String serviceName = serviceComponentHost.getServiceName(); |
| if (servicesInUpgrade.contains(serviceName)) { |
| hostRequiresRepository = true; |
| break; |
| } |
| } |
| |
| // if not required, then move onto the next host |
| if (!hostRequiresRepository) { |
| state = RepositoryVersionState.NOT_REQUIRED; |
| } |
| } |
| } |
| |
| // last check if it's still required - check for MM |
| if (state != RepositoryVersionState.NOT_REQUIRED) { |
| if (host.getMaintenanceState(clusterId) != MaintenanceState.OFF) { |
| state = RepositoryVersionState.OUT_OF_SYNC; |
| } |
| } |
| |
| // now that the correct state is determdined for the host version, |
| // either update or create it |
| HostVersionEntity hostVersionEntity = null; |
| Collection<HostVersionEntity> hostVersions = hostEntity.getHostVersionEntities(); |
| for (HostVersionEntity existingHostVersion : hostVersions) { |
| if (existingHostVersion.getRepositoryVersion().getId() == repoVersionEntity.getId()) { |
| hostVersionEntity = existingHostVersion; |
| break; |
| } |
| } |
| |
| if (null == hostVersionEntity) { |
| hostVersionEntity = new HostVersionEntity(hostEntity, repoVersionEntity, state); |
| hostVersionDAO.create(hostVersionEntity); |
| |
| // bi-directional association update |
| hostVersions.add(hostVersionEntity); |
| hostDAO.merge(hostEntity); |
| } else { |
| hostVersionEntity.setState(state); |
| hostVersionEntity = hostVersionDAO.merge(hostVersionEntity); |
| } |
| |
| LOG.info("Created host version for {}, state={}, repository version={} (repo_id={})", |
| hostVersionEntity.getHostName(), hostVersionEntity.getState(), |
| repoVersionEntity.getVersion(), repoVersionEntity.getId()); |
| |
| if (state == RepositoryVersionState.INSTALLING) { |
| hostsRequiringInstallation.add(host); |
| } |
| } |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| |
| return hostsRequiringInstallation; |
| } |
| |
| @Override |
| @Transactional |
| public void setCurrentStackVersion(StackId stackId) throws AmbariException { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| StackEntity stackEntity = stackDAO.find(stackId.getStackName(), |
| stackId.getStackVersion()); |
| |
| ClusterEntity clusterEntity = getClusterEntity(); |
| ClusterStateEntity clusterStateEntity = clusterStateDAO.findByPK( |
| clusterEntity.getClusterId()); |
| if (clusterStateEntity == null) { |
| clusterStateEntity = new ClusterStateEntity(); |
| clusterStateEntity.setClusterId(clusterEntity.getClusterId()); |
| clusterStateEntity.setCurrentStack(stackEntity); |
| clusterStateEntity.setClusterEntity(clusterEntity); |
| clusterStateDAO.create(clusterStateEntity); |
| clusterStateEntity = clusterStateDAO.merge(clusterStateEntity); |
| clusterEntity.setClusterStateEntity(clusterStateEntity); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| } else { |
| clusterStateEntity.setCurrentStack(stackEntity); |
| clusterStateEntity = clusterStateDAO.merge(clusterStateEntity); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| } |
| } catch (RollbackException e) { |
| LOG.warn("Unable to set version " + stackId + " for cluster " |
| + getClusterName()); |
| throw new AmbariException("Unable to set" + " version=" + stackId |
| + " for cluster " + getClusterName(), e); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Map<String, Config> getConfigsByType(String configType) { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| if (!allConfigs.containsKey(configType)) { |
| return null; |
| } |
| |
| return Collections.unmodifiableMap(allConfigs.get(configType)); |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Config getConfig(String configType, String versionTag) { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| if (!allConfigs.containsKey(configType) |
| || !allConfigs.get(configType).containsKey(versionTag)) { |
| return null; |
| } |
| return allConfigs.get(configType).get(versionTag); |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Config getConfigByVersion(String configType, Long configVersion) { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| if (!allConfigs.containsKey(configType)) { |
| return null; |
| } |
| |
| for (Map.Entry<String, Config> entry : allConfigs.get(configType).entrySet()) { |
| if (entry.getValue().getVersion().equals(configVersion)) { |
| return entry.getValue(); |
| } |
| } |
| |
| return null; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public void addConfig(Config config) { |
| if (config.getType() == null || config.getType().isEmpty()) { |
| throw new IllegalArgumentException("Config type cannot be empty"); |
| } |
| |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| if (!allConfigs.containsKey(config.getType())) { |
| allConfigs.put(config.getType(), new ConcurrentHashMap<String, Config>()); |
| } |
| |
| allConfigs.get(config.getType()).put(config.getTag(), config); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Collection<Config> getAllConfigs() { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| List<Config> list = new ArrayList<>(); |
| for (Entry<String, ConcurrentMap<String, Config>> entry : allConfigs.entrySet()) { |
| for (Config config : entry.getValue().values()) { |
| list.add(config); |
| } |
| } |
| return Collections.unmodifiableList(list); |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public ClusterResponse convertToResponse() |
| throws AmbariException { |
| String clusterName = getClusterName(); |
| Map<String, Host> hosts = clusters.getHostsForCluster(clusterName); |
| |
| return new ClusterResponse(getClusterId(), clusterName, |
| getProvisioningState(), getSecurityType(), hosts.keySet(), |
| hosts.size(), getDesiredStackVersion().getStackId(), |
| getClusterHealthReport(hosts)); |
| } |
| |
| @Override |
| public void debugDump(StringBuilder sb) { |
| sb.append("Cluster={ clusterName=").append(getClusterName()).append(", clusterId=").append( |
| getClusterId()).append(", desiredStackVersion=").append( |
| desiredStackVersion.getStackId()).append(", services=[ "); |
| boolean first = true; |
| for (Service s : services.values()) { |
| if (!first) { |
| sb.append(" , "); |
| } |
| first = false; |
| sb.append("\n "); |
| s.debugDump(sb); |
| sb.append(' '); |
| } |
| sb.append(" ] }"); |
| lockFactory.debugDump(sb); |
| } |
| |
| @Override |
| @Transactional |
| public void refresh() { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| clusterDAO.refresh(clusterEntity); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| @Transactional |
| public void deleteAllServices() throws AmbariException { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| LOG.info("Deleting all services for cluster" + ", clusterName=" |
| + getClusterName()); |
| for (Service service : services.values()) { |
| if (!service.canBeRemoved()) { |
| throw new AmbariException( |
| "Found non removable service when trying to" |
| + " all services from cluster" + ", clusterName=" |
| + getClusterName() + ", serviceName=" + service.getName()); |
| } |
| } |
| |
| for (Service service : services.values()) { |
| deleteService(service); |
| } |
| services.clear(); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public void deleteService(String serviceName) |
| throws AmbariException { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| Service service = getService(serviceName); |
| LOG.info("Deleting service for cluster" + ", clusterName=" |
| + getClusterName() + ", serviceName=" + service.getName()); |
| // FIXME check dependencies from meta layer |
| if (!service.canBeRemoved()) { |
| throw new AmbariException("Could not delete service from cluster" |
| + ", clusterName=" + getClusterName() |
| + ", serviceName=" + service.getName()); |
| } |
| deleteService(service); |
| services.remove(serviceName); |
| |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Deletes the specified service also removes references to it from {@link this.serviceComponentHosts} |
| * and references to ServiceComponentHost objects that belong to the service from {@link this.serviceComponentHostsByHost} |
| * <p> |
| * Note: This method must be called only with write lock acquired. |
| * </p> |
| * @param service the service to be deleted |
| * @throws AmbariException |
| * @see ServiceComponentHost |
| */ |
| void deleteService(Service service) throws AmbariException { |
| final String serviceName = service.getName(); |
| |
| service.delete(); |
| |
| serviceComponentHosts.remove(serviceName); |
| |
| for (List<ServiceComponentHost> serviceComponents: serviceComponentHostsByHost.values()){ |
| Iterables.removeIf(serviceComponents, new Predicate<ServiceComponentHost>() { |
| @Override |
| public boolean apply(ServiceComponentHost serviceComponentHost) { |
| return serviceComponentHost.getServiceName().equals(serviceName); |
| } |
| }); |
| } |
| |
| // Delete config groups that belong to the service |
| Map<Long, ConfigGroup> configGroups = service.getCluster().getConfigGroups(); |
| if (!MapUtils.isEmpty(configGroups)) { |
| for (ConfigGroup configGroup : configGroups.values()) { |
| if (configGroup.getServiceName().equalsIgnoreCase(serviceName)) { |
| LOG.info("Deleting ConfigGroup {} for service {}", configGroup.getName(), serviceName); |
| configGroup.delete(); |
| clusterConfigGroups.remove(configGroup.getId()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean canBeRemoved() { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| boolean safeToRemove = true; |
| for (Service service : services.values()) { |
| if (!service.canBeRemoved()) { |
| safeToRemove = false; |
| LOG.warn("Found non removable service" + ", clusterName=" |
| + getClusterName() + ", serviceName=" + service.getName()); |
| } |
| } |
| return safeToRemove; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| @Transactional |
| public void delete() throws AmbariException { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| refresh(); |
| deleteAllServices(); |
| |
| refresh(); // update one-to-many clusterServiceEntities |
| removeEntities(); |
| allConfigs.clear(); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Transactional |
| protected void removeEntities() throws AmbariException { |
| long clusterId = getClusterId(); |
| alertDefinitionDAO.removeAll(clusterId); |
| alertDispatchDAO.removeAllGroups(clusterId); |
| upgradeDAO.removeAll(clusterId); |
| topologyRequestDAO.removeAll(clusterId); |
| clusterDAO.removeByPK(clusterId); |
| } |
| |
| @Override |
| public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs) { |
| return addDesiredConfig(user, configs, null); |
| } |
| |
| @Override |
| public ServiceConfigVersionResponse addDesiredConfig(String user, Set<Config> configs, String serviceConfigVersionNote) { |
| if (null == user) { |
| throw new NullPointerException("User must be specified."); |
| } |
| |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| if (configs == null) { |
| return null; |
| } |
| |
| Iterator<Config> configIterator = configs.iterator(); |
| |
| while (configIterator.hasNext()) { |
| Config config = configIterator.next(); |
| if (config == null) { |
| configIterator.remove(); |
| continue; |
| } |
| Config currentDesired = getDesiredConfigByType(config.getType()); |
| |
| // do not set if it is already the current |
| if (null != currentDesired |
| && currentDesired.getTag().equals(config.getTag())) { |
| configIterator.remove(); |
| } |
| } |
| |
| ServiceConfigVersionResponse serviceConfigVersionResponse = applyConfigs( |
| configs, user, serviceConfigVersionNote); |
| |
| return serviceConfigVersionResponse; |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Gets all versions of the desired configurations for the cluster. |
| * @return a map of type-to-configuration information. |
| */ |
| @Override |
| public Map<String, Set<DesiredConfig>> getAllDesiredConfigVersions() { |
| return getDesiredConfigs(true); |
| } |
| |
| |
| @Override |
| public Map<String, DesiredConfig> getDesiredConfigs() { |
| Map<String, Set<DesiredConfig>> activeConfigsByType = getDesiredConfigs(false); |
| return Maps.transformEntries( |
| activeConfigsByType, |
| new Maps.EntryTransformer<String, Set<DesiredConfig>, DesiredConfig>() { |
| @Override |
| public DesiredConfig transformEntry(@Nullable String key, @Nullable Set<DesiredConfig> value) { |
| return value.iterator().next(); |
| } |
| }); |
| } |
| |
| /** |
| * Gets desired configurations for the cluster. |
| * @param allVersions specifies if all versions of the desired configurations to be returned |
| * or only the active ones. It is expected that there is one and only one active |
| * desired configuration per config type. |
| * @return a map of type-to-configuration information. |
| */ |
| private Map<String, Set<DesiredConfig>> getDesiredConfigs(boolean allVersions) { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| Map<String, Set<DesiredConfig>> map = new HashMap<>(); |
| Collection<String> types = new HashSet<>(); |
| Collection<ClusterConfigEntity> entities = getClusterEntity().getClusterConfigEntities(); |
| |
| for (ClusterConfigEntity configEntity : entities) { |
| if (allVersions || configEntity.isSelected()) { |
| DesiredConfig desiredConfig = new DesiredConfig(); |
| desiredConfig.setServiceName(null); |
| desiredConfig.setTag(configEntity.getTag()); |
| |
| if (!allConfigs.containsKey(configEntity.getType())) { |
| continue; |
| } |
| |
| Map<String, Config> configMap = allConfigs.get(configEntity.getType()); |
| if (!configMap.containsKey(configEntity.getTag())) { |
| LOG.warn("An inconsistency exists for the configuration {} with " + |
| "tag {}", configEntity.getType(), configEntity.getTag()); |
| continue; |
| } |
| |
| Config config = configMap.get(configEntity.getTag()); |
| desiredConfig.setVersion(config.getVersion()); |
| |
| Set<DesiredConfig> configs = map.get(configEntity.getType()); |
| if (configs == null) { |
| configs = new HashSet<>(); |
| } |
| |
| configs.add(desiredConfig); |
| |
| map.put(configEntity.getType(), configs); |
| types.add(configEntity.getType()); |
| } |
| } |
| |
| // TODO AMBARI-10679, need efficient caching from hostId to hostName... |
| Map<Long, String> hostIdToName = new HashMap<>(); |
| |
| if (!map.isEmpty()) { |
| Map<String, List<HostConfigMapping>> hostMappingsByType = |
| hostConfigMappingDAO.findSelectedHostsByTypes(clusterId, types); |
| |
| for (Entry<String, Set<DesiredConfig>> entry : map.entrySet()) { |
| List<DesiredConfig.HostOverride> hostOverrides = new ArrayList<>(); |
| for (HostConfigMapping mappingEntity : hostMappingsByType.get(entry.getKey())) { |
| |
| if (!hostIdToName.containsKey(mappingEntity.getHostId())) { |
| HostEntity hostEntity = hostDAO.findById(mappingEntity.getHostId()); |
| hostIdToName.put(mappingEntity.getHostId(), hostEntity.getHostName()); |
| } |
| |
| hostOverrides.add(new DesiredConfig.HostOverride( |
| hostIdToName.get(mappingEntity.getHostId()), mappingEntity.getVersion())); |
| } |
| |
| for (DesiredConfig c: entry.getValue()) { |
| c.setHostOverrides(hostOverrides); |
| } |
| } |
| } |
| |
| return map; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| |
| |
| @Override |
| public ServiceConfigVersionResponse createServiceConfigVersion( |
| String serviceName, String user, String note, ConfigGroup configGroup) { |
| |
| // Create next service config version |
| ServiceConfigEntity serviceConfigEntity = new ServiceConfigEntity(); |
| |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| // set config group |
| if (configGroup != null) { |
| serviceConfigEntity.setGroupId(configGroup.getId()); |
| Collection<Config> configs = configGroup.getConfigurations().values(); |
| List<ClusterConfigEntity> configEntities = new ArrayList<>( |
| configs.size()); |
| for (Config config : configs) { |
| configEntities.add( |
| clusterDAO.findConfig(getClusterId(), config.getType(), config.getTag())); |
| } |
| |
| serviceConfigEntity.setClusterConfigEntities(configEntities); |
| } else { |
| List<ClusterConfigEntity> configEntities = getClusterConfigEntitiesByService(serviceName); |
| serviceConfigEntity.setClusterConfigEntities(configEntities); |
| } |
| |
| |
| long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion(clusterId, |
| serviceName); |
| |
| // get the correct stack ID to use when creating the service config |
| StackEntity stackEntity = clusterEntity.getDesiredStack(); |
| Service service = services.get(serviceName); |
| if (null != service) { |
| StackId serviceStackId = service.getDesiredStackId(); |
| stackEntity = stackDAO.find(serviceStackId); |
| } |
| |
| serviceConfigEntity.setServiceName(serviceName); |
| serviceConfigEntity.setClusterEntity(clusterEntity); |
| serviceConfigEntity.setVersion(nextServiceConfigVersion); |
| serviceConfigEntity.setUser(user); |
| serviceConfigEntity.setNote(note); |
| serviceConfigEntity.setStack(stackEntity); |
| |
| serviceConfigDAO.create(serviceConfigEntity); |
| if (configGroup != null) { |
| serviceConfigEntity.setHostIds(new ArrayList<>(configGroup.getHosts().keySet())); |
| serviceConfigEntity = serviceConfigDAO.merge(serviceConfigEntity); |
| } |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| |
| String configGroupName = configGroup == null ? ServiceConfigVersionResponse.DEFAULT_CONFIG_GROUP_NAME : configGroup.getName(); |
| configChangeLog.info("(configchange) Creating config version. cluster: '{}', changed by: '{}', " + |
| "service_name: '{}', config_group: '{}', config_group_id: '{}', version: '{}', create_timestamp: '{}', note: '{}'", |
| getClusterName(), user, serviceName, configGroupName, |
| configGroup == null ? "null" : configGroup.getId(), serviceConfigEntity.getVersion(), serviceConfigEntity.getCreateTimestamp(), |
| serviceConfigEntity.getNote()); |
| |
| ServiceConfigVersionResponse response = new ServiceConfigVersionResponse( |
| serviceConfigEntity, configGroupName); |
| |
| return response; |
| } |
| |
| @Override |
| public String getServiceForConfigTypes(Collection<String> configTypes) { |
| //debug |
| LOG.info("Looking for service for config types {}", configTypes); |
| String serviceName = null; |
| for (String configType : configTypes) { |
| for (Entry<String, String> entry : serviceConfigTypes.entries()) { |
| if (StringUtils.equals(entry.getValue(), configType)) { |
| if (serviceName != null) { |
| if (entry.getKey()!=null && !StringUtils.equals(serviceName, entry.getKey())) { |
| throw new IllegalArgumentException(String.format("Config type %s belongs to %s service, " + |
| "but also qualified for %s", configType, serviceName, entry.getKey())); |
| } |
| } else { |
| serviceName = entry.getKey(); |
| } |
| } |
| } |
| } |
| LOG.info("Service {} returning", serviceName); |
| return serviceName; |
| } |
| |
| @Override |
| public String getServiceByConfigType(String configType) { |
| for (Entry<String, String> entry : serviceConfigTypes.entries()) { |
| String serviceName = entry.getKey(); |
| String type = entry.getValue(); |
| if (StringUtils.equals(type, configType)) { |
| return serviceName; |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public ServiceConfigVersionResponse setServiceConfigVersion(String serviceName, Long version, String user, String note) throws AmbariException { |
| if (null == user) { |
| throw new NullPointerException("User must be specified."); |
| } |
| |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| ServiceConfigVersionResponse serviceConfigVersionResponse = applyServiceConfigVersion( |
| serviceName, version, user, note); |
| return serviceConfigVersionResponse; |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public Map<String, Collection<ServiceConfigVersionResponse>> getActiveServiceConfigVersions() { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| Map<String, Collection<ServiceConfigVersionResponse>> map = new HashMap<>(); |
| |
| Set<ServiceConfigVersionResponse> responses = getActiveServiceConfigVersionSet(); |
| for (ServiceConfigVersionResponse response : responses) { |
| if (map.get(response.getServiceName()) == null) { |
| map.put(response.getServiceName(), |
| new ArrayList<ServiceConfigVersionResponse>()); |
| } |
| map.get(response.getServiceName()).add(response); |
| } |
| return map; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public List<ServiceConfigVersionResponse> getServiceConfigVersions() { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| List<ServiceConfigVersionResponse> serviceConfigVersionResponses = new ArrayList<>(); |
| |
| List<ServiceConfigEntity> serviceConfigs = serviceConfigDAO.getServiceConfigs(getClusterId()); |
| |
| // Gather for each service in each config group the active service config response as we |
| // iterate through all service config responses |
| Map<String, Map<String, ServiceConfigVersionResponse>> activeServiceConfigResponses = new HashMap<>(); |
| |
| for (ServiceConfigEntity serviceConfigEntity : serviceConfigs) { |
| ServiceConfigVersionResponse serviceConfigVersionResponse = convertToServiceConfigVersionResponse(serviceConfigEntity); |
| |
| Map<String, ServiceConfigVersionResponse> activeServiceConfigResponseGroups = activeServiceConfigResponses.get(serviceConfigVersionResponse.getServiceName()); |
| |
| if (activeServiceConfigResponseGroups == null) { |
| Map<String, ServiceConfigVersionResponse> serviceConfigGroups = new HashMap<>(); |
| activeServiceConfigResponses.put(serviceConfigVersionResponse.getServiceName(), serviceConfigGroups); |
| |
| activeServiceConfigResponseGroups = serviceConfigGroups; |
| } |
| |
| // the active config within a group |
| ServiceConfigVersionResponse activeServiceConfigResponse = activeServiceConfigResponseGroups.get(serviceConfigVersionResponse.getGroupName()); |
| |
| if (activeServiceConfigResponse == null && !ServiceConfigVersionResponse.DELETED_CONFIG_GROUP_NAME.equals(serviceConfigVersionResponse.getGroupName())) { |
| // service config version with deleted group should always be marked is not current |
| activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse); |
| activeServiceConfigResponse = serviceConfigVersionResponse; |
| } |
| if (serviceConfigEntity.getGroupId() == null) { |
| if (serviceConfigVersionResponse.getCreateTime() > activeServiceConfigResponse.getCreateTime()) { |
| activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse); |
| } |
| } |
| else if (clusterConfigGroups != null && clusterConfigGroups.containsKey(serviceConfigEntity.getGroupId())){ |
| if (serviceConfigVersionResponse.getVersion() > activeServiceConfigResponse.getVersion()) { |
| activeServiceConfigResponseGroups.put(serviceConfigVersionResponse.getGroupName(), serviceConfigVersionResponse); |
| } |
| } |
| |
| serviceConfigVersionResponse.setIsCurrent(false); |
| serviceConfigVersionResponses.add(getServiceConfigVersionResponseWithConfig(serviceConfigVersionResponse, serviceConfigEntity)); |
| } |
| |
| for (Map<String, ServiceConfigVersionResponse> serviceConfigVersionResponseGroup: activeServiceConfigResponses.values()) { |
| for (ServiceConfigVersionResponse serviceConfigVersionResponse : serviceConfigVersionResponseGroup.values()) { |
| serviceConfigVersionResponse.setIsCurrent(true); |
| } |
| } |
| |
| return serviceConfigVersionResponses; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| private Set<ServiceConfigVersionResponse> getActiveServiceConfigVersionSet() { |
| Set<ServiceConfigVersionResponse> responses = new HashSet<>(); |
| List<ServiceConfigEntity> activeServiceConfigVersions = getActiveServiceConfigVersionEntities(); |
| |
| for (ServiceConfigEntity lastServiceConfig : activeServiceConfigVersions) { |
| ServiceConfigVersionResponse response = convertToServiceConfigVersionResponse(lastServiceConfig); |
| response.setIsCurrent(true); //mark these as current, as they are |
| responses.add(response); |
| } |
| return responses; |
| } |
| |
| private List<ServiceConfigEntity> getActiveServiceConfigVersionEntities() { |
| |
| List<ServiceConfigEntity> activeServiceConfigVersions = new ArrayList<>(); |
| //for services |
| activeServiceConfigVersions.addAll(serviceConfigDAO.getLastServiceConfigs(getClusterId())); |
| //for config groups |
| if (clusterConfigGroups != null) { |
| activeServiceConfigVersions.addAll( |
| serviceConfigDAO.getLastServiceConfigVersionsForGroups(clusterConfigGroups.keySet())); |
| } |
| |
| return activeServiceConfigVersions; |
| } |
| |
| @Override |
| public List<ServiceConfigVersionResponse> getActiveServiceConfigVersionResponse(String serviceName) { |
| clusterGlobalLock.readLock().lock(); |
| try { |
| List<ServiceConfigEntity> activeServiceConfigVersionEntities = new ArrayList<>(); |
| List<ServiceConfigVersionResponse> activeServiceConfigVersionResponses = new ArrayList<>(); |
| activeServiceConfigVersionEntities.addAll(serviceConfigDAO.getLastServiceConfigsForService(getClusterId(), serviceName)); |
| for (ServiceConfigEntity serviceConfigEntity : activeServiceConfigVersionEntities) { |
| ServiceConfigVersionResponse serviceConfigVersionResponse = getServiceConfigVersionResponseWithConfig(convertToServiceConfigVersionResponse(serviceConfigEntity), serviceConfigEntity); |
| serviceConfigVersionResponse.setIsCurrent(true); |
| activeServiceConfigVersionResponses.add(serviceConfigVersionResponse); |
| } |
| return activeServiceConfigVersionResponses; |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Adds Configuration data to the serviceConfigVersionResponse |
| * @param serviceConfigVersionResponse |
| * @param serviceConfigEntity |
| * @return serviceConfigVersionResponse |
| */ |
| private ServiceConfigVersionResponse getServiceConfigVersionResponseWithConfig(ServiceConfigVersionResponse serviceConfigVersionResponse, ServiceConfigEntity serviceConfigEntity) { |
| serviceConfigVersionResponse.setConfigurations(new ArrayList<ConfigurationResponse>()); |
| List<ClusterConfigEntity> clusterConfigEntities = serviceConfigEntity.getClusterConfigEntities(); |
| for (ClusterConfigEntity clusterConfigEntity : clusterConfigEntities) { |
| Config config = allConfigs.get(clusterConfigEntity.getType()).get( |
| clusterConfigEntity.getTag()); |
| |
| serviceConfigVersionResponse.getConfigurations().add( |
| new ConfigurationResponse(getClusterName(), config)); |
| } |
| return serviceConfigVersionResponse; |
| } |
| |
| |
| @RequiresSession |
| ServiceConfigVersionResponse getActiveServiceConfigVersion(String serviceName) { |
| ServiceConfigEntity lastServiceConfig = serviceConfigDAO.getLastServiceConfig(getClusterId(), serviceName); |
| if (lastServiceConfig == null) { |
| LOG.debug("No service config version found for service {}", serviceName); |
| return null; |
| } |
| return convertToServiceConfigVersionResponse(lastServiceConfig); |
| } |
| |
| @RequiresSession |
| ServiceConfigVersionResponse convertToServiceConfigVersionResponse(ServiceConfigEntity serviceConfigEntity) { |
| Long groupId = serviceConfigEntity.getGroupId(); |
| |
| String groupName; |
| if (groupId != null) { |
| ConfigGroup configGroup = null; |
| if (clusterConfigGroups != null) { |
| configGroup = clusterConfigGroups.get(groupId); |
| } |
| |
| if (configGroup != null) { |
| groupName = configGroup.getName(); |
| } else { |
| groupName = ServiceConfigVersionResponse.DELETED_CONFIG_GROUP_NAME; |
| } |
| } else { |
| groupName = ServiceConfigVersionResponse.DEFAULT_CONFIG_GROUP_NAME; |
| } |
| |
| ServiceConfigVersionResponse serviceConfigVersionResponse = new ServiceConfigVersionResponse( |
| serviceConfigEntity, groupName); |
| |
| return serviceConfigVersionResponse; |
| } |
| |
| @Transactional |
| ServiceConfigVersionResponse applyServiceConfigVersion(String serviceName, Long serviceConfigVersion, String user, |
| String serviceConfigVersionNote) throws AmbariException { |
| ServiceConfigEntity serviceConfigEntity = serviceConfigDAO.findByServiceAndVersion(serviceName, serviceConfigVersion); |
| if (serviceConfigEntity == null) { |
| throw new ObjectNotFoundException("Service config version with serviceName={} and version={} not found"); |
| } |
| |
| // disable all configs related to service |
| if (serviceConfigEntity.getGroupId() == null) { |
| // Here was fixed bug with entity changes revert. More you can find here AMBARI-21173. |
| // This issue reproduces only if you are changing same entity in first and second loop. |
| // In that case eclipselink will revert changes to cached, if entity has fluchGroup and it |
| // needs to be refreshed. Actually we don't need to change same antities in few steps, so i |
| // decided to filter out. duplicates and do not change them. It will be better for performance and bug will be fixed. |
| Collection<String> configTypes = serviceConfigTypes.get(serviceName); |
| List<ClusterConfigEntity> enabledConfigs = clusterDAO.getEnabledConfigsByTypes(clusterId, configTypes); |
| List<ClusterConfigEntity> serviceConfigEntities = serviceConfigEntity.getClusterConfigEntities(); |
| ArrayList<ClusterConfigEntity> duplicatevalues = new ArrayList<>(serviceConfigEntities); |
| duplicatevalues.retainAll(enabledConfigs); |
| |
| for (ClusterConfigEntity enabledConfig : enabledConfigs) { |
| if (!duplicatevalues.contains(enabledConfig)) { |
| enabledConfig.setSelected(false); |
| clusterDAO.merge(enabledConfig); |
| } |
| } |
| |
| for (ClusterConfigEntity configEntity : serviceConfigEntities) { |
| if (!duplicatevalues.contains(configEntity)) { |
| configEntity.setSelected(true); |
| clusterDAO.merge(configEntity); |
| } |
| } |
| } else { |
| Long configGroupId = serviceConfigEntity.getGroupId(); |
| ConfigGroup configGroup = clusterConfigGroups.get(configGroupId); |
| if (configGroup != null) { |
| Map<String, Config> groupDesiredConfigs = new HashMap<>(); |
| for (ClusterConfigEntity entity : serviceConfigEntity.getClusterConfigEntities()) { |
| Config config = allConfigs.get(entity.getType()).get(entity.getTag()); |
| groupDesiredConfigs.put(config.getType(), config); |
| } |
| configGroup.setConfigurations(groupDesiredConfigs); |
| |
| Map<Long, Host> groupDesiredHosts = new HashMap<>(); |
| if (serviceConfigEntity.getHostIds() != null) { |
| for (Long hostId : serviceConfigEntity.getHostIds()) { |
| Host host = clusters.getHostById(hostId); |
| if (host != null) { |
| groupDesiredHosts.put(hostId, host); |
| } else { |
| LOG.warn("Host with id {} doesn't exist anymore, skipping", hostId); |
| } |
| } |
| } |
| configGroup.setHosts(groupDesiredHosts); |
| } else { |
| throw new IllegalArgumentException("Config group {} doesn't exist"); |
| } |
| } |
| |
| ClusterEntity clusterEntity = getClusterEntity(); |
| long nextServiceConfigVersion = serviceConfigDAO.findNextServiceConfigVersion( |
| clusterEntity.getClusterId(), serviceName); |
| |
| ServiceConfigEntity serviceConfigEntityClone = new ServiceConfigEntity(); |
| serviceConfigEntityClone.setCreateTimestamp(System.currentTimeMillis()); |
| serviceConfigEntityClone.setUser(user); |
| serviceConfigEntityClone.setServiceName(serviceName); |
| serviceConfigEntityClone.setClusterEntity(clusterEntity); |
| serviceConfigEntityClone.setStack(serviceConfigEntity.getStack()); |
| serviceConfigEntityClone.setClusterConfigEntities(serviceConfigEntity.getClusterConfigEntities()); |
| serviceConfigEntityClone.setClusterId(serviceConfigEntity.getClusterId()); |
| serviceConfigEntityClone.setHostIds(serviceConfigEntity.getHostIds()); |
| serviceConfigEntityClone.setGroupId(serviceConfigEntity.getGroupId()); |
| serviceConfigEntityClone.setNote(serviceConfigVersionNote); |
| serviceConfigEntityClone.setVersion(nextServiceConfigVersion); |
| |
| serviceConfigDAO.create(serviceConfigEntityClone); |
| |
| return convertToServiceConfigVersionResponse(serviceConfigEntityClone); |
| } |
| |
| @Transactional |
| ServiceConfigVersionResponse applyConfigs(Set<Config> configs, String user, String serviceConfigVersionNote) { |
| |
| String serviceName = null; |
| for (Config config : configs) { |
| for (Entry<String, String> entry : serviceConfigTypes.entries()) { |
| if (StringUtils.equals(entry.getValue(), config.getType())) { |
| if (serviceName == null) { |
| serviceName = entry.getKey(); |
| break; |
| } else if (!serviceName.equals(entry.getKey())) { |
| String error = String.format("Updating configs for multiple services by a " + |
| "single API request isn't supported. Conflicting services %s and %s for %s", |
| serviceName, entry.getKey(), config.getType()); |
| IllegalArgumentException exception = new IllegalArgumentException(error); |
| LOG.error(error + ", config version not created for {}", serviceName); |
| throw exception; |
| } else { |
| break; |
| } |
| } |
| } |
| } |
| |
| // update the selected flag for every config type |
| ClusterEntity clusterEntity = getClusterEntity(); |
| Collection<ClusterConfigEntity> clusterConfigs = clusterEntity.getClusterConfigEntities(); |
| for (Config config: configs) { |
| for (ClusterConfigEntity clusterConfigEntity : clusterConfigs) { |
| // unset for this config type |
| if (StringUtils.equals(clusterConfigEntity.getType(), config.getType())) { |
| clusterConfigEntity.setSelected(false); |
| |
| // unless both the tag and type match, then enable it |
| if (StringUtils.equals(clusterConfigEntity.getTag(), config.getTag())) { |
| clusterConfigEntity.setSelected(true); |
| } |
| } |
| } |
| } |
| |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| |
| if (serviceName == null) { |
| ArrayList<String> configTypes = new ArrayList<>(); |
| for (Config config: configs) { |
| configTypes.add(config.getType()); |
| } |
| LOG.error("No service found for config types '{}', service config version not created", configTypes); |
| return null; |
| } else { |
| return createServiceConfigVersion(serviceName, user, serviceConfigVersionNote); |
| } |
| |
| } |
| |
| private ServiceConfigVersionResponse createServiceConfigVersion(String serviceName, String user, |
| String serviceConfigVersionNote) { |
| //create next service config version |
| return createServiceConfigVersion(serviceName, user, serviceConfigVersionNote, null); |
| } |
| |
| private List<ClusterConfigEntity> getClusterConfigEntitiesByService(String serviceName) { |
| Collection<String> configTypes = serviceConfigTypes.get(serviceName); |
| return clusterDAO.getEnabledConfigsByTypes(getClusterId(), new ArrayList<>(configTypes)); |
| } |
| |
| @Override |
| public Config getDesiredConfigByType(String configType) { |
| ClusterConfigEntity config = clusterDAO.findEnabledConfigByType(getClusterId(), configType); |
| if (null == config) { |
| return null; |
| } |
| |
| return getConfig(configType, config.getTag()); |
| } |
| |
| @Override |
| public boolean isConfigTypeExists(String configType) { |
| ClusterConfigEntity config = clusterDAO.findEnabledConfigByType(getClusterId(), configType); |
| return null != config; |
| } |
| |
| @Override |
| public Map<Long, Map<String, DesiredConfig>> getHostsDesiredConfigs(Collection<Long> hostIds) { |
| |
| if (hostIds == null || hostIds.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| |
| Set<HostConfigMapping> mappingEntities = |
| hostConfigMappingDAO.findSelectedByHosts(hostIds); |
| |
| Map<Long, Map<String, DesiredConfig>> desiredConfigsByHost = new HashMap<>(); |
| |
| for (Long hostId : hostIds) { |
| desiredConfigsByHost.put(hostId, new HashMap<String, DesiredConfig>()); |
| } |
| |
| for (HostConfigMapping mappingEntity : mappingEntities) { |
| DesiredConfig desiredConfig = new DesiredConfig(); |
| desiredConfig.setTag(mappingEntity.getVersion()); |
| desiredConfig.setServiceName(mappingEntity.getServiceName()); |
| |
| desiredConfigsByHost.get(mappingEntity.getHostId()).put(mappingEntity.getType(), desiredConfig); |
| } |
| |
| return desiredConfigsByHost; |
| } |
| |
| @Override |
| public Map<Long, Map<String, DesiredConfig>> getAllHostsDesiredConfigs() { |
| |
| Collection<Long> hostIds; |
| try { |
| hostIds = clusters.getHostIdsForCluster(clusterName).keySet(); |
| } catch (AmbariException ignored) { |
| return Collections.emptyMap(); |
| } |
| |
| return getHostsDesiredConfigs(hostIds); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Long getNextConfigVersion(String type) { |
| return clusterDAO.findNextConfigVersion(clusterId, type); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Map<ServiceComponentHostEvent, String> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap) { |
| clusterGlobalLock.readLock().lock(); |
| |
| try { |
| return processServiceComponentHostEventsInSingleTransaction(eventMap); |
| } finally { |
| clusterGlobalLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Bulk handle service component host events, wrapping all handling in a |
| * single transaction. This allows |
| * {@link #processServiceComponentHostEvents(ListMultimap)} to lock around the |
| * AoP {@link Transactional} annotation so that the lock is not released |
| * before the transaction is committed. |
| * |
| * @param eventMap |
| * @return the events which failed to be processed. |
| */ |
| @Transactional |
| protected Map<ServiceComponentHostEvent, String> processServiceComponentHostEventsInSingleTransaction( |
| ListMultimap<String, ServiceComponentHostEvent> eventMap) { |
| Map<ServiceComponentHostEvent, String> failedEvents = new HashMap<>(); |
| |
| for (Entry<String, ServiceComponentHostEvent> entry : eventMap.entries()) { |
| String serviceName = entry.getKey(); |
| ServiceComponentHostEvent event = entry.getValue(); |
| String serviceComponentName = event.getServiceComponentName(); |
| |
| // server-side events either don't have a service name or are AMBARI; |
| // either way they are not handled by this method since it expects a |
| // real service and component |
| if (StringUtils.isBlank(serviceName) || Services.AMBARI.name().equals(serviceName)) { |
| continue; |
| } |
| |
| if (StringUtils.isBlank(serviceComponentName)) { |
| continue; |
| } |
| |
| try { |
| Service service = getService(serviceName); |
| ServiceComponent serviceComponent = service.getServiceComponent(serviceComponentName); |
| ServiceComponentHost serviceComponentHost = serviceComponent.getServiceComponentHost( |
| event.getHostName()); |
| serviceComponentHost.handleEvent(event); |
| } catch (ServiceNotFoundException e) { |
| String message = String.format( |
| "ServiceComponentHost lookup exception. Service not found for Service: %s. Error: %s", |
| serviceName, e.getMessage()); |
| LOG.error(message); |
| failedEvents.put(event, message); |
| } catch (ServiceComponentNotFoundException e) { |
| String message = String.format( |
| "ServiceComponentHost lookup exception. Service Component not found for Service: %s, Component: %s. Error: %s", |
| serviceName, serviceComponentName, e.getMessage()); |
| LOG.error(message); |
| failedEvents.put(event, message); |
| } catch (ServiceComponentHostNotFoundException e) { |
| String message = String.format( |
| "ServiceComponentHost lookup exception. Service Component Host not found for Service: %s, Component: %s, Host: %s. Error: %s", |
| serviceName, serviceComponentName, event.getHostName(), e.getMessage()); |
| LOG.error(message); |
| failedEvents.put(event, message); |
| } catch (AmbariException e) { |
| String message = String.format("ServiceComponentHost lookup exception %s", e.getMessage()); |
| LOG.error(message); |
| failedEvents.put(event, message); |
| } catch (InvalidStateTransitionException e) { |
| LOG.error("Invalid transition ", e); |
| |
| boolean isFailure = true; |
| |
| Enum<?> currentState = e.getCurrentState(); |
| Enum<?> failedEvent = e.getEvent(); |
| |
| // skip adding this as a failed event, to work around stack ordering |
| // issues with Hive |
| if (currentState == State.STARTED && |
| failedEvent == ServiceComponentHostEventType.HOST_SVCCOMP_START){ |
| isFailure = false; |
| LOG.warn( |
| "The start request for {} is invalid since the component is already started. Ignoring the request.", |
| serviceComponentName); |
| } |
| |
| // unknown hosts should be able to be put back in progress and let the |
| // action scheduler fail it; don't abort the entire stage just because |
| // this happens |
| if (currentState == State.UNKNOWN |
| && failedEvent == ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS) { |
| isFailure = false; |
| LOG.warn("The host {} is in an unknown state; attempting to put {} back in progress.", |
| event.getHostName(), |
| serviceComponentName); |
| } |
| |
| // fail the event, causing it to automatically abort |
| if (isFailure) { |
| failedEvents.put(event, String.format("Invalid transition. %s", e.getMessage())); |
| } |
| } |
| } |
| |
| return failedEvents; |
| } |
| |
| /** |
| * @param serviceName name of the service |
| * @param componentName name of the component |
| * @return the set of hosts for the provided service and component |
| */ |
| @Override |
| public Set<String> getHosts(String serviceName, String componentName) { |
| Map<String, Service> clusterServices = getServices(); |
| |
| if (!clusterServices.containsKey(serviceName)) { |
| return Collections.emptySet(); |
| } |
| |
| Service service = clusterServices.get(serviceName); |
| Map<String, ServiceComponent> components = service.getServiceComponents(); |
| |
| if (!components.containsKey(componentName) || |
| components.get(componentName).getServiceComponentHosts().size() == 0) { |
| return Collections.emptySet(); |
| } |
| |
| return components.get(componentName).getServiceComponentHosts().keySet(); |
| } |
| |
| @Override |
| public Host getHost(final String hostName) { |
| if (StringUtils.isEmpty(hostName)) { |
| return null; |
| } |
| |
| Collection<Host> hosts = getHosts(); |
| if(hosts != null) { |
| for (Host host : hosts) { |
| String hostString = host.getHostName(); |
| if(hostName.equalsIgnoreCase(hostString)) { |
| return host; |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Collection<Host> getHosts() { |
| Map<String, Host> hosts; |
| |
| try { |
| //todo: why the hell does this method throw AmbariException??? |
| //todo: this is ridiculous that I need to get hosts for this cluster from Clusters!!! |
| //todo: should I getHosts using the same logic as the other getHosts call? At least that doesn't throw AmbariException. |
| hosts = clusters.getHostsForCluster(clusterName); |
| } catch (AmbariException e) { |
| //todo: in what conditions is AmbariException thrown? |
| throw new RuntimeException("Unable to get hosts for cluster: " + clusterName, e); |
| } |
| return hosts == null ? Collections.<Host>emptyList() : hosts.values(); |
| } |
| |
| private ClusterHealthReport getClusterHealthReport( |
| Map<String, Host> clusterHosts) throws AmbariException { |
| |
| int staleConfigsHosts = 0; |
| int maintenanceStateHosts = 0; |
| |
| int healthyStateHosts = 0; |
| int unhealthyStateHosts = 0; |
| int initStateHosts = 0; |
| int healthyStatusHosts = 0; |
| |
| int unhealthyStatusHosts = 0; |
| int unknownStatusHosts = 0; |
| int alertStatusHosts = 0; |
| int heartbeatLostStateHosts = 0; |
| |
| // look this up once so it can be reused in the loop for every SCH |
| Map<String, DesiredConfig> desiredConfigs = getDesiredConfigs(); |
| |
| Collection<Host> hosts = clusterHosts.values(); |
| Iterator<Host> iterator = hosts.iterator(); |
| while (iterator.hasNext()) { |
| Host host = iterator.next(); |
| String hostName = host.getHostName(); |
| |
| switch (host.getState()) { |
| case HEALTHY: |
| healthyStateHosts++; |
| break; |
| case UNHEALTHY: |
| unhealthyStateHosts++; |
| break; |
| case INIT: |
| initStateHosts++; |
| break; |
| case HEARTBEAT_LOST: |
| heartbeatLostStateHosts++; |
| break; |
| } |
| |
| switch (HostHealthStatus.HealthStatus.valueOf(host.getStatus())) { |
| case HEALTHY: |
| healthyStatusHosts++; |
| break; |
| case UNHEALTHY: |
| unhealthyStatusHosts++; |
| break; |
| case UNKNOWN: |
| unknownStatusHosts++; |
| break; |
| case ALERT: |
| alertStatusHosts++; |
| break; |
| } |
| |
| boolean staleConfig = false; |
| boolean maintenanceState = false; |
| |
| if (serviceComponentHostsByHost.containsKey(hostName)) { |
| for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostName)) { |
| staleConfig = staleConfig || configHelper.isStaleConfigs(sch, desiredConfigs); |
| maintenanceState = maintenanceState || |
| maintenanceStateHelper.getEffectiveState(sch) != MaintenanceState.OFF; |
| } |
| } |
| |
| if (staleConfig) { |
| staleConfigsHosts++; |
| } |
| if (maintenanceState) { |
| maintenanceStateHosts++; |
| } |
| } |
| |
| ClusterHealthReport chr = new ClusterHealthReport(); |
| chr.setAlertStatusHosts(alertStatusHosts); |
| chr.setHealthyStateHosts(healthyStateHosts); |
| chr.setUnknownStatusHosts(unknownStatusHosts); |
| chr.setUnhealthyStatusHosts(unhealthyStatusHosts); |
| chr.setUnhealthyStateHosts(unhealthyStateHosts); |
| chr.setStaleConfigsHosts(staleConfigsHosts); |
| chr.setMaintenanceStateHosts(maintenanceStateHosts); |
| chr.setInitStateHosts(initStateHosts); |
| chr.setHeartbeatLostStateHosts(heartbeatLostStateHosts); |
| chr.setHealthyStatusHosts(healthyStatusHosts); |
| |
| return chr; |
| } |
| |
| @Override |
| public boolean checkPermission(PrivilegeEntity privilegeEntity, boolean readOnly) { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| ResourceEntity resourceEntity = clusterEntity.getResource(); |
| if (resourceEntity != null) { |
| Integer permissionId = privilegeEntity.getPermission().getId(); |
| // CLUSTER.USER or CLUSTER.ADMINISTRATOR for the given cluster resource. |
| if (privilegeEntity.getResource().equals(resourceEntity)) { |
| if ((readOnly && permissionId.equals(PermissionEntity.CLUSTER_USER_PERMISSION)) |
| || permissionId.equals(PermissionEntity.CLUSTER_ADMINISTRATOR_PERMISSION)) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void addSessionAttributes(Map<String, Object> attributes) { |
| if (attributes != null && !attributes.isEmpty()) { |
| Map<String, Object> sessionAttributes = new HashMap<>(getSessionAttributes()); |
| sessionAttributes.putAll(attributes); |
| setSessionAttributes(attributes); |
| } |
| } |
| |
| @Override |
| public void setSessionAttribute(String key, Object value){ |
| if (key != null && !key.isEmpty()) { |
| Map<String, Object> sessionAttributes = new HashMap<>(getSessionAttributes()); |
| sessionAttributes.put(key, value); |
| setSessionAttributes(sessionAttributes); |
| } |
| } |
| |
| @Override |
| public void removeSessionAttribute(String key) { |
| if (key != null && !key.isEmpty()) { |
| Map<String, Object> sessionAttributes = new HashMap<>(getSessionAttributes()); |
| sessionAttributes.remove(key); |
| setSessionAttributes(sessionAttributes); |
| } |
| } |
| |
| @Override |
| public Map<String, Object> getSessionAttributes() { |
| Map<String, Object> attributes = |
| (Map<String, Object>) getSessionManager().getAttribute(getClusterSessionAttributeName()); |
| |
| return attributes == null ? Collections.<String, Object>emptyMap() : attributes; |
| } |
| |
| /** |
| * Get the associated session manager. |
| * |
| * @return the session manager |
| */ |
| protected AmbariSessionManager getSessionManager() { |
| return sessionManager; |
| } |
| |
| /** |
| * Set the map of session attributes for this cluster. |
| * <p/> |
| * This is a private method so that it may be used as a utility for add and update operations. |
| * |
| * @param sessionAttributes the map of session attributes for this cluster; never null |
| */ |
| private void setSessionAttributes(Map<String, Object> sessionAttributes) { |
| getSessionManager().setAttribute(getClusterSessionAttributeName(), sessionAttributes); |
| } |
| |
| /** |
| * Generates and returns the cluster-specific attribute name to use to set and get cluster-specific |
| * session attributes. |
| * |
| * @return the name of the cluster-specific session attribute |
| */ |
| private String getClusterSessionAttributeName() { |
| return CLUSTER_SESSION_ATTRIBUTES_PREFIX + getClusterName(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @Transactional |
| public void applyLatestConfigurations(StackId stackId) { |
| clusterGlobalLock.writeLock().lock(); |
| |
| try { |
| // grab all of the configurations and hash them so we can easily update |
| // them when picking and choosing only those from the service |
| ClusterEntity clusterEntity = getClusterEntity(); |
| Collection<ClusterConfigEntity> configEntities = clusterEntity.getClusterConfigEntities(); |
| ImmutableMap<Object, ClusterConfigEntity> clusterConfigEntityMap = Maps.uniqueIndex( |
| configEntities, Functions.identity()); |
| |
| // disable any configurations which are currently selected |
| for (ClusterConfigEntity clusterConfig : configEntities) { |
| if (clusterConfig.isSelected()) { |
| // un-select the latest configuration for the service |
| clusterConfig.setSelected(false); |
| |
| LOG.debug("Disabling configuration {} with tag {}", clusterConfig.getType(), |
| clusterConfig.getTag()); |
| } |
| } |
| |
| // get the latest configurations for the stack so they can be selected |
| Collection<ClusterConfigEntity> latestConfigMappingByStack = clusterDAO.getLatestConfigurations( |
| clusterEntity.getClusterId(), stackId); |
| |
| Set<String> configTypesUpdated = new HashSet<>(); |
| |
| for (ClusterConfigEntity clusterConfig : latestConfigMappingByStack) { |
| // grab the hash'd entity from the map so we're working with the right |
| // one |
| clusterConfig = clusterConfigEntityMap.get(clusterConfig); |
| |
| clusterConfig.setSelected(true); |
| |
| configTypesUpdated.add(clusterConfig.getType()); |
| |
| LOG.info("Setting {} with version tag {} created on {} to selected for stack {}", |
| clusterConfig.getType(), clusterConfig.getTag(), new Date(clusterConfig.getTimestamp()), |
| stackId); |
| } |
| |
| // since the entities which were modified came from the cluster entity's |
| // list to begin with, we can just save them right back - no need for a |
| // new collection since the entity instances were modified directly |
| |
| // !!! without providing the flush here, when this transaction completes it |
| // looks like the database has all unselected configs for some types |
| clusterEntity = clusterDAO.merge(clusterEntity, true); |
| |
| cacheConfigurations(); |
| |
| LOG.info( |
| "Applied latest configurations for stack {}. The the following types were modified: {}", |
| stackId, StringUtils.join(configTypesUpdated, ',')); |
| |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| |
| // publish an event to instruct entity managers to clear cached instances of |
| // ClusterEntity immediately - it takes EclipseLink about 1000ms to update |
| // the L1 caches of other threads and the action scheduler could act upon |
| // stale data |
| EntityManagerCacheInvalidationEvent event = new EntityManagerCacheInvalidationEvent(); |
| jpaEventPublisher.publish(event); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @Transactional |
| public void applyLatestConfigurations(StackId stackId, String serviceName) { |
| clusterGlobalLock.writeLock().lock(); |
| |
| try { |
| // grab all of the configurations and hash them so we can easily update them when picking and choosing only those from the service |
| ClusterEntity clusterEntity = getClusterEntity(); |
| Collection<ClusterConfigEntity> configEntities = clusterEntity.getClusterConfigEntities(); |
| ImmutableMap<Object, ClusterConfigEntity> clusterConfigEntityMap = Maps.uniqueIndex( |
| configEntities, Functions.identity()); |
| |
| // find the latest configurations for the service |
| Set<String> configTypesForService = new HashSet<>(); |
| List<ServiceConfigEntity> latestServiceConfigs = serviceConfigDAO.getLastServiceConfigsForService( |
| getClusterId(), serviceName); |
| |
| // process the current service configurations |
| for (ServiceConfigEntity serviceConfig : latestServiceConfigs) { |
| List<ClusterConfigEntity> latestConfigs = serviceConfig.getClusterConfigEntities(); |
| for( ClusterConfigEntity latestConfig : latestConfigs ){ |
| // grab the hash'd entity from the map so we're working with the right one |
| latestConfig = clusterConfigEntityMap.get(latestConfig); |
| |
| // add the config type to our list for tracking later on |
| configTypesForService.add(latestConfig.getType()); |
| |
| // un-select the latest configuration for the service |
| LOG.debug("Disabling configuration {} with tag {}", latestConfig.getType(), latestConfig.getTag()); |
| latestConfig.setSelected(false); |
| } |
| } |
| |
| // get the latest configurations for the given stack which we're going to make active |
| Collection<ClusterConfigEntity> latestConfigsByStack = clusterDAO.getLatestConfigurations( |
| clusterId, stackId); |
| |
| // set the service configuration for the specified stack to the latest |
| for (ClusterConfigEntity latestConfigByStack : latestConfigsByStack) { |
| // since we're iterating over all configuration types, only work with those that are for our service |
| if (!configTypesForService.contains(latestConfigByStack.getType())) { |
| continue; |
| } |
| |
| // pull the correct latest mapping for the stack out of the cached map |
| // from the cluster entity |
| ClusterConfigEntity entity = clusterConfigEntityMap.get(latestConfigByStack); |
| entity.setSelected(true); |
| |
| LOG.info("Setting {} with version tag {} created on {} to selected for stack {}", |
| entity.getType(), entity.getTag(), new Date(entity.getTimestamp()), |
| stackId |
| ); |
| } |
| |
| // since the entities which were modified came from the cluster entity's |
| // list to begin with, we can just save them right back - no need for a |
| // new collection since the entity instances were modified directly |
| clusterEntity = clusterDAO.merge(clusterEntity, true); |
| |
| cacheConfigurations(); |
| |
| LOG.info( |
| "Applied latest configurations for {} on stack {}. The the following types were modified: {}", |
| serviceName, stackId, StringUtils.join(configTypesForService, ',')); |
| |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| |
| // publish an event to instruct entity managers to clear cached instances of |
| // ClusterEntity immediately - it takes EclipseLink about 1000ms to update |
| // the L1 caches of other threads and the action scheduler could act upon |
| // stale data |
| EntityManagerCacheInvalidationEvent event = new EntityManagerCacheInvalidationEvent(); |
| jpaEventPublisher.publish(event); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Map<PropertyInfo.PropertyType, Set<String>> getConfigPropertiesTypes(String configType){ |
| try { |
| StackId stackId = getCurrentStackVersion(); |
| StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); |
| return stackInfo.getConfigPropertiesTypes(configType); |
| } catch (AmbariException ignored) { |
| } |
| return new HashMap<>(); |
| } |
| |
| /** |
| * Removes all configurations associated with the specified stack for the |
| * specified service. The caller should make sure the cluster global write |
| * lock is acquired. |
| * |
| * @param stackId |
| * the stack to remove configurations for (not {@code null}). |
| * @param serviceName |
| * the service name (not {@code null}). |
| * @see #clusterGlobalLock |
| */ |
| @Transactional |
| void removeAllConfigsForStack(StackId stackId, String serviceName) { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| |
| // make sure the entity isn't stale in the current unit of work. |
| clusterDAO.refresh(clusterEntity); |
| |
| long clusterId = clusterEntity.getClusterId(); |
| |
| // keep track of any types removed for logging purposes |
| Set<String> removedConfigurationTypes = new HashSet<>(); |
| |
| // this will keep track of cluster config mappings that need removal |
| // since there is no relationship between configs and their mappings, we |
| // have to do it manually |
| List<ClusterConfigEntity> removedClusterConfigs = new ArrayList<>(50); |
| Collection<ClusterConfigEntity> allClusterConfigEntities = clusterEntity.getClusterConfigEntities(); |
| Collection<ServiceConfigEntity> allServiceConfigEntities = clusterEntity.getServiceConfigEntities(); |
| |
| // get the service configs only for the service |
| List<ServiceConfigEntity> serviceConfigs = serviceConfigDAO.getServiceConfigsForServiceAndStack( |
| clusterId, stackId, serviceName); |
| |
| // remove all service configurations and associated configs |
| for (ServiceConfigEntity serviceConfig : serviceConfigs) { |
| for (ClusterConfigEntity configEntity : serviceConfig.getClusterConfigEntities()) { |
| removedConfigurationTypes.add(configEntity.getType()); |
| |
| allClusterConfigEntities.remove(configEntity); |
| clusterDAO.removeConfig(configEntity); |
| removedClusterConfigs.add(configEntity); |
| } |
| |
| serviceConfig.getClusterConfigEntities().clear(); |
| serviceConfigDAO.remove(serviceConfig); |
| allServiceConfigEntities.remove(serviceConfig); |
| } |
| |
| clusterEntity.setClusterConfigEntities(allClusterConfigEntities); |
| clusterEntity = clusterDAO.merge(clusterEntity); |
| |
| LOG.info("Removed the following configuration types for {} on stack {}: {}", serviceName, |
| stackId, StringUtils.join(removedConfigurationTypes, ',')); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void removeConfigurations(StackId stackId, String serviceName) { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| removeAllConfigsForStack(stackId, serviceName); |
| cacheConfigurations(); |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Caches all of the {@link ClusterConfigEntity}s in {@link #allConfigs}. |
| */ |
| private void cacheConfigurations() { |
| clusterGlobalLock.writeLock().lock(); |
| try { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| allConfigs.clear(); |
| |
| if (!clusterEntity.getClusterConfigEntities().isEmpty()) { |
| for (ClusterConfigEntity entity : clusterEntity.getClusterConfigEntities()) { |
| |
| if (!allConfigs.containsKey(entity.getType())) { |
| allConfigs.put(entity.getType(), new ConcurrentHashMap<String, Config>()); |
| } |
| |
| Config config = configFactory.createExisting(this, entity); |
| |
| allConfigs.get(entity.getType()).put(entity.getTag(), config); |
| } |
| } |
| } finally { |
| clusterGlobalLock.writeLock().unlock(); |
| } |
| } |
| |
| private void loadStackVersion() { |
| desiredStackVersion = new StackId(getClusterEntity().getDesiredStack()); |
| |
| if (!StringUtils.isEmpty(desiredStackVersion.getStackName()) |
| && !StringUtils.isEmpty(desiredStackVersion.getStackVersion())) { |
| try { |
| loadServiceConfigTypes(); |
| } catch (AmbariException e) { |
| // TODO recheck wrapping exception here, required for lazy loading after |
| // invalidation |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * Returns whether this cluster was provisioned by a Blueprint or not. |
| * @return true if the cluster was deployed with a Blueprint otherwise false. |
| */ |
| @Override |
| public boolean isBluePrintDeployed() { |
| |
| List<TopologyRequestEntity> topologyRequests = topologyRequestDAO.findByClusterId(getClusterId()); |
| |
| // Iterate through the topology requests associated with this cluster and look for PROVISION request |
| for (TopologyRequestEntity topologyRequest: topologyRequests) { |
| TopologyRequest.Type requestAction = TopologyRequest.Type.valueOf(topologyRequest.getAction()); |
| if (requestAction == TopologyRequest.Type.PROVISION) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Gets the {@link ClusterEntity} for this {@link Cluster} from the |
| * {@link EntityManager} cache. |
| * |
| * @return |
| */ |
| private ClusterEntity getClusterEntity() { |
| return clusterDAO.findById(clusterId); |
| } |
| |
| /** |
| * Returns the number of hosts that form the cluster. |
| * |
| * @return number of hosts that form the cluster |
| */ |
| @Override |
| public int getClusterSize() { |
| return clusters.getClusterSize(clusterName); |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public UpgradeEntity getUpgradeInProgress() { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| return clusterEntity.getUpgradeEntity(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @Transactional |
| public void setUpgradeEntity(UpgradeEntity upgradeEntity) throws AmbariException { |
| try { |
| ClusterEntity clusterEntity = getClusterEntity(); |
| clusterEntity.setUpgradeEntity(upgradeEntity); |
| clusterDAO.merge(clusterEntity); |
| } catch (RollbackException e) { |
| throw new AmbariException("Unable to update the associated upgrade with the cluster", e); |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean isUpgradeSuspended() { |
| UpgradeEntity upgrade = getUpgradeInProgress(); |
| if (null != upgrade) { |
| return upgrade.isSuspended(); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public String getClusterProperty(String propertyName, String defaultValue) { |
| String cachedValue = m_clusterPropertyCache.get(propertyName); |
| if (null != cachedValue) { |
| return cachedValue; |
| } |
| |
| // start with the default |
| cachedValue = defaultValue; |
| |
| Config clusterEnv = getDesiredConfigByType(ConfigHelper.CLUSTER_ENV); |
| if (null != clusterEnv) { |
| Map<String, String> clusterEnvProperties = clusterEnv.getProperties(); |
| if (clusterEnvProperties.containsKey(propertyName)) { |
| String value = clusterEnvProperties.get(propertyName); |
| if (null != value) { |
| cachedValue = value; |
| } |
| } |
| } |
| |
| // cache the value and return it |
| m_clusterPropertyCache.put(propertyName, cachedValue); |
| return cachedValue; |
| } |
| |
| /** |
| * Gets whether the specified cluster property is already cached. |
| * |
| * @param propertyName |
| * the property to check. |
| * @return {@code true} if the property is cached. |
| */ |
| boolean isClusterPropertyCached(String propertyName) { |
| return m_clusterPropertyCache.containsKey(propertyName); |
| } |
| |
| /** |
| * Handles {@link ClusterConfigChangedEvent} which means that the |
| * {{cluster-env}} may have changed. |
| * |
| * @param event |
| * the change event. |
| */ |
| @Subscribe |
| public void handleClusterEnvConfigChangedEvent(ClusterConfigChangedEvent event) { |
| if (!StringUtils.equals(event.getConfigType(), ConfigHelper.CLUSTER_ENV)) { |
| return; |
| } |
| |
| m_clusterPropertyCache.clear(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public RoleCommandOrder getRoleCommandOrder() { |
| return roleCommandOrderProvider.getRoleCommandOrder(this); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void addSuspendedUpgradeParameters(Map<String, String> commandParams, |
| Map<String, String> roleParams) { |
| |
| // build some command params from the upgrade, including direction, |
| // type, version, etc |
| UpgradeEntity suspendedUpgrade = getUpgradeInProgress(); |
| if( null == suspendedUpgrade ){ |
| LOG.warn( |
| "An upgrade is not currently suspended. The command and role parameters will not be modified."); |
| |
| return; |
| } |
| |
| UpgradeContext upgradeContext = upgradeContextFactory.create(this, suspendedUpgrade); |
| commandParams.putAll(upgradeContext.getInitializedCommandParameters()); |
| |
| // suspended goes in role params |
| roleParams.put(KeyNames.UPGRADE_SUSPENDED, Boolean.TRUE.toString().toLowerCase()); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Map<String, Map<String, String>> getComponentVersionMap() { |
| Map<String, Map<String, String>> componentVersionMap = new HashMap<>(); |
| |
| for (Service service : getServices().values()) { |
| Map<String, String> componentMap = new HashMap<>(); |
| for (ServiceComponent component : service.getServiceComponents().values()) { |
| // skip components which don't advertise a version |
| if (!component.isVersionAdvertised()) { |
| continue; |
| } |
| |
| // if the repo isn't resolved, then we can't trust the version |
| if (!component.getDesiredRepositoryVersion().isResolved()) { |
| continue; |
| } |
| |
| componentMap.put(component.getName(), component.getDesiredVersion()); |
| } |
| |
| if (!componentMap.isEmpty()) { |
| componentVersionMap.put(service.getName(), componentMap); |
| } |
| } |
| |
| return componentVersionMap; |
| } |
| } |