blob: b3ff8433ea42493789e6d7823a3e4daa986cb81f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.state;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import javax.annotation.Nullable;
import org.apache.ambari.server.events.ClusterConfigChangedEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.logging.LockFactory;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
import org.apache.ambari.server.orm.dao.StackDAO;
import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
import org.apache.ambari.server.orm.entities.ClusterEntity;
import org.apache.ambari.server.orm.entities.StackEntity;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.persist.Transactional;
public class ConfigImpl implements Config {
/**
* Logger.
*/
private final static Logger LOG = LoggerFactory.getLogger(ConfigImpl.class);
/**
* A label for {@link #propertyLock} to use with the {@link LockFactory}.
*/
private static final String PROPERTY_LOCK_LABEL = "configurationPropertyLock";
public static final String GENERATED_TAG_PREFIX = "generatedTag_";
private final long configId;
private final Cluster cluster;
private final StackId stackId;
private final String type;
private final String tag;
private final Long version;
/**
* The properties of this configuration. This cannot be a
* {@link ConcurrentMap} since we allow null values. Therefore, it must be
* synchronized externally.
*/
private Map<String, String> properties;
/**
* A lock for reading/writing of {@link #properties} concurrently.
*
* @see #properties
*/
private final ReadWriteLock propertyLock;
/**
* The property attributes for this configuration.
*/
private Map<String, Map<String, String>> propertiesAttributes;
private Map<PropertyInfo.PropertyType, Set<String>> propertiesTypes;
private final ClusterDAO clusterDAO;
private final Gson gson;
@Inject
private ServiceConfigDAO serviceConfigDAO;
private final AmbariEventPublisher eventPublisher;
@AssistedInject
ConfigImpl(@Assisted Cluster cluster, @Assisted("type") String type,
@Assisted("tag") @Nullable String tag,
@Assisted Map<String, String> properties,
@Assisted @Nullable Map<String, Map<String, String>> propertiesAttributes,
ClusterDAO clusterDAO, StackDAO stackDAO,
Gson gson, AmbariEventPublisher eventPublisher, LockFactory lockFactory) {
this(cluster.getDesiredStackVersion(), cluster, type, tag, properties, propertiesAttributes,
clusterDAO, stackDAO, gson, eventPublisher, lockFactory);
}
@AssistedInject
ConfigImpl(@Assisted @Nullable StackId stackId, @Assisted Cluster cluster, @Assisted("type") String type,
@Assisted("tag") @Nullable String tag,
@Assisted Map<String, String> properties,
@Assisted @Nullable Map<String, Map<String, String>> propertiesAttributes,
ClusterDAO clusterDAO, StackDAO stackDAO,
Gson gson, AmbariEventPublisher eventPublisher, LockFactory lockFactory) {
propertyLock = lockFactory.newReadWriteLock(PROPERTY_LOCK_LABEL);
this.cluster = cluster;
this.type = type;
this.properties = properties;
// only set this if it's non-null
this.propertiesAttributes = null == propertiesAttributes ? null
: new HashMap<>(propertiesAttributes);
this.clusterDAO = clusterDAO;
this.gson = gson;
this.eventPublisher = eventPublisher;
version = cluster.getNextConfigVersion(type);
// tag is nullable from factory but not in the DB, so ensure we generate something
tag = StringUtils.isBlank(tag) ? UUID.randomUUID().toString() : tag;
this.tag = tag;
ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion());
ClusterConfigEntity entity = new ClusterConfigEntity();
entity.setClusterEntity(clusterEntity);
entity.setClusterId(cluster.getClusterId());
entity.setType(type);
entity.setVersion(version);
entity.setTag(this.tag);
entity.setTimestamp(System.currentTimeMillis());
entity.setStack(stackEntity);
entity.setData(gson.toJson(properties));
if (null != propertiesAttributes) {
entity.setAttributes(gson.toJson(propertiesAttributes));
}
// when creating a brand new config without a backing entity, use the
// cluster's desired stack as the config's stack
this.stackId = stackId;
propertiesTypes = cluster.getConfigPropertiesTypes(type);
persist(entity);
configId = entity.getConfigId();
}
@AssistedInject
ConfigImpl(@Assisted Cluster cluster, @Assisted ClusterConfigEntity entity,
ClusterDAO clusterDAO, Gson gson, AmbariEventPublisher eventPublisher,
LockFactory lockFactory) {
propertyLock = lockFactory.newReadWriteLock(PROPERTY_LOCK_LABEL);
this.cluster = cluster;
this.clusterDAO = clusterDAO;
this.gson = gson;
this.eventPublisher = eventPublisher;
configId = entity.getConfigId();
type = entity.getType();
tag = entity.getTag();
version = entity.getVersion();
// when using an existing entity, use the actual value of the entity's stack
stackId = new StackId(entity.getStack());
propertiesTypes = cluster.getConfigPropertiesTypes(type);
// incur the hit on deserialization since this business object is stored locally
try {
Map<String, String> deserializedProperties = gson.<Map<String, String>> fromJson(
entity.getData(), Map.class);
if (null == deserializedProperties) {
deserializedProperties = new HashMap<>();
}
properties = deserializedProperties;
} catch (JsonSyntaxException e) {
LOG.error("Malformed configuration JSON stored in the database for {}/{}", entity.getType(),
entity.getTag());
}
// incur the hit on deserialization since this business object is stored locally
try {
Map<String, Map<String, String>> deserializedAttributes = gson.<Map<String, Map<String, String>>> fromJson(
entity.getAttributes(), Map.class);
if (null != deserializedAttributes) {
propertiesAttributes = new HashMap<>(deserializedAttributes);
}
} catch (JsonSyntaxException e) {
LOG.error("Malformed configuration attribute JSON stored in the database for {}/{}",
entity.getType(), entity.getTag());
}
}
/**
* Constructor. This will create an instance suitable only for
* representation/serialization as it is incomplete.
*
* @param type
* @param tag
* @param properties
* @param propertiesAttributes
* @param clusterDAO
* @param gson
* @param eventPublisher
*/
@AssistedInject
ConfigImpl(@Assisted("type") String type,
@Assisted("tag") @Nullable String tag,
@Assisted Map<String, String> properties,
@Assisted @Nullable Map<String, Map<String, String>> propertiesAttributes, ClusterDAO clusterDAO,
Gson gson, AmbariEventPublisher eventPublisher, LockFactory lockFactory) {
propertyLock = lockFactory.newReadWriteLock(PROPERTY_LOCK_LABEL);
this.tag = tag;
this.type = type;
this.properties = new HashMap<>(properties);
this.propertiesAttributes = null == propertiesAttributes ? null
: new HashMap<>(propertiesAttributes);
this.clusterDAO = clusterDAO;
this.gson = gson;
this.eventPublisher = eventPublisher;
cluster = null;
configId = 0;
version = 0L;
stackId = null;
}
/**
* {@inheritDoc}
*/
@Override
public StackId getStackId() {
return stackId;
}
@Override
public Map<PropertyInfo.PropertyType, Set<String>> getPropertiesTypes() {
return propertiesTypes;
}
@Override
public void setPropertiesTypes(Map<PropertyInfo.PropertyType, Set<String>> propertiesTypes) {
this.propertiesTypes = propertiesTypes;
}
@Override
public String getType() {
return type;
}
@Override
public String getTag() {
return tag;
}
@Override
public Long getVersion() {
return version;
}
@Override
public Map<String, String> getProperties() {
propertyLock.readLock().lock();
try {
return properties == null ? new HashMap<>() : new HashMap<>(properties);
} finally {
propertyLock.readLock().unlock();
}
}
@Override
public Map<String, Map<String, String>> getPropertiesAttributes() {
return null == propertiesAttributes ? null
: new HashMap<>(propertiesAttributes);
}
@Override
public void setProperties(Map<String, String> properties) {
propertyLock.writeLock().lock();
try {
this.properties = properties;
} finally {
propertyLock.writeLock().unlock();
}
}
@Override
public void setPropertiesAttributes(Map<String, Map<String, String>> propertiesAttributes) {
this.propertiesAttributes = propertiesAttributes;
}
@Override
public void updateProperties(Map<String, String> propertiesToUpdate) {
propertyLock.writeLock().lock();
try {
properties.putAll(propertiesToUpdate);
} finally {
propertyLock.writeLock().unlock();
}
}
@Override
public List<Long> getServiceConfigVersions() {
return serviceConfigDAO.getServiceConfigVersionsByConfig(cluster.getClusterId(), type, version);
}
@Override
public void deleteProperties(List<String> propertyKeysToRemove) {
propertyLock.writeLock().lock();
try {
Set<String> keySet = properties.keySet();
keySet.removeAll(propertyKeysToRemove);
} finally {
propertyLock.writeLock().unlock();
}
}
/**
* Persist the entity and update the internal state relationships once the
* transaction has been committed.
*/
private void persist(ClusterConfigEntity entity) {
persistEntitiesInTransaction(entity);
// ensure that the in-memory state of the cluster is kept consistent
cluster.addConfig(this);
// re-load the entity associations for the cluster
cluster.refresh();
// broadcast the change event for the configuration
ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(cluster.getClusterName(),
getType(), getTag(), getVersion());
eventPublisher.publish(event);
}
/**
* Persist the cluster and configuration entities in their own transaction.
*/
@Transactional
void persistEntitiesInTransaction(ClusterConfigEntity entity) {
ClusterEntity clusterEntity = entity.getClusterEntity();
clusterDAO.createConfig(entity);
clusterEntity.getClusterConfigEntities().add(entity);
// save the entity, forcing a flush to ensure the refresh picks up the
// newest data
clusterEntity = clusterDAO.merge(clusterEntity, true);
LOG.info("Persisted config entity with id {} and cluster entity {}", entity.getConfigId(),
clusterEntity.toString());
}
/**
* {@inheritDoc}
*/
@Override
@Transactional
public void save() {
ClusterConfigEntity entity = clusterDAO.findConfig(configId);
ClusterEntity clusterEntity = clusterDAO.findById(entity.getClusterId());
// if the configuration was found, then update it
if (null != entity) {
LOG.debug("Updating {} version {} with new configurations; a new version will not be created",
getType(), getVersion());
entity.setData(gson.toJson(getProperties()));
// save the entity, forcing a flush to ensure the refresh picks up the
// newest data
clusterDAO.merge(clusterEntity, true);
// re-load the entity associations for the cluster
cluster.refresh();
// broadcast the change event for the configuration
ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(cluster.getClusterName(),
getType(), getTag(), getVersion());
eventPublisher.publish(event);
}
}
}